

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 程式碼範例：使用 ResolveChoice、Lambda 和 ApplyMapping 的資料準備
<a name="aws-glue-programming-python-samples-medicaid"></a>

此範例使用的資料集，包含從兩個 [Data.CMS.gov](https://data.cms.gov) 資料集下載的美國聯邦醫療保險 (Medicare) 供應商付款資料：「住院患者預期付款系統供應商前 100 大診斷相關群組摘要 - FY2011」和「住院患者費用資料 FY 2011」。下載資料之後，我們修改了資料集，以在檔案結尾處引入幾個錯誤記錄。上述經修改的檔案位於公有 Amazon S3 儲存貯體，位置在 `s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv`。

您可以在 `data_cleaning_and_lambda.py`[ examplesAWS Glue GitHub 儲存庫的 ](https://github.com/awslabs/aws-glue-samples) 檔案中找到此範例的原始碼。

在 上執行時偵錯 Python 或 PySpark 指令碼的偏好方法是 AWS 在 [Glue Studio AWS 上使用筆記本](https://docs.aws.amazon.com/glue/latest/ug/notebooks-chapter.html)。

## 步驟 1：在 Amazon S3 儲存貯體中網路爬取資料
<a name="aws-glue-programming-python-samples-medicaid-crawling"></a>

1. 登入 AWS 管理主控台 並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) 開啟 AWS Glue主控台。

1. 遵循 中所述的程序[設定編目程式](define-crawler.md)，建立新的爬蟲程式，該爬蟲程式可以編目`s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv`檔案，並將產生的中繼資料放入 Glue Data Catalog `payments`中名為 AWS 的資料庫。

1. 執行新的爬蟲程式，接著查看 `payments` 資料庫。在讀取檔案開頭，確定其格式和分隔符號後，爬蟲程式應會於資料庫建立一個命名為 `medicare` 的中繼資料資料表。

   新 `medicare` 資料表的結構描述如下：

   ```
   Column  name                            Data type
   ==================================================
   drg definition                             string
   provider id                                bigint
   provider name                              string
   provider street address                    string
   provider city                              string
   provider state                             string
   provider zip code                          bigint
   hospital referral region description       string
   total discharges                           bigint
   average covered charges                    string
   average total payments                     string
   average medicare payments                  string
   ```

## 步驟 2：新增樣板指令碼至開發端點筆記本
<a name="aws-glue-programming-python-samples-medicaid-boilerplate"></a>

將以下樣板指令碼貼至開發端點以匯入您需要的 AWS Glue 程式庫，並且設定單一的 `GlueContext`：

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
```

## 步驟 3：比較不同的結構描述剖析
<a name="aws-glue-programming-python-samples-medicaid-schemas"></a>

接著，您可以查看 Apache Spark `DataFrame` 辨識出的結構描述是否跟 AWS Glue 爬蟲程式所記錄的相同。執行此程式碼：

```
medicare = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").load(
   's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv')
medicare.printSchema()
```

以下為 `printSchema` 呼叫的輸出：

```
root
 |-- DRG Definition: string (nullable = true)
 |-- Provider Id: string (nullable = true)
 |-- Provider Name: string (nullable = true)
 |-- Provider Street Address: string (nullable = true)
 |-- Provider City: string (nullable = true)
 |-- Provider State: string (nullable = true)
 |-- Provider Zip Code: integer (nullable = true)
 |-- Hospital Referral Region Description: string (nullable = true)
 |--  Total Discharges : integer (nullable = true)
 |--  Average Covered Charges : string (nullable = true)
 |--  Average Total Payments : string (nullable = true)
 |-- Average Medicare Payments: string (nullable = true)
```

接著，查看 AWS Glue `DynamicFrame` 產生的結構描述：

```
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
       database = "payments",
       table_name = "medicare")
medicare_dynamicframe.printSchema()
```

`printSchema` 的輸出如下：

```
root
 |-- drg definition: string
 |-- provider id: choice
 |    |-- long
 |    |-- string
 |-- provider name: string
 |-- provider street address: string
 |-- provider city: string
 |-- provider state: string
 |-- provider zip code: long
 |-- hospital referral region description: string
 |-- total discharges: long
 |-- average covered charges: string
 |-- average total payments: string
 |-- average medicare payments: string
```

在 `DynamicFrame` 產生的結構描述中，`provider id` 可以是 `long` 或 `string` 類型。`DataFrame` 結構描述將 `Provider Id` 列為 `string` 類型， Data Catalog 則是將 `provider id` 列為 `bigint` 類型。

何者正確？ 在檔案的最後有兩筆記錄 (總共 160,000 筆記錄)，且該欄位中有 `string` 值。這些就是之前引入以示範產生問題的錯誤記錄。

為了解決這種問題，AWS Glue`DynamicFrame` 採用 *choice (選擇)* 類型的概念。在此例中，`DynamicFrame` 展示了 `long` 和 `string` 值都會在該欄出現。AWS Glue 爬蟲程式遺漏了 `string` 值，原因是只考量到資料的前 2 MB。Apache Spark `DataFrame` 會考量整個資料集，但被強制將最普遍的類型指派給該欄位，亦即 `string`。事實上，Spark 在遇到複雜類型或不熟悉的變化時，通常都會採取最普遍的作法。

要查詢 `provider id` 欄，請先解決選擇類型。您可以使用 `DynamicFrame` 中的 `resolveChoice` 轉換方法，藉由以下的 `cast:long` 選項將 `string` 值轉換為 `long` 值。

```
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
medicare_res.printSchema()
```

`printSchema` 輸出就會是：

```
root
 |-- drg definition: string
 |-- provider id: long
 |-- provider name: string
 |-- provider street address: string
 |-- provider city: string
 |-- provider state: string
 |-- provider zip code: long
 |-- hospital referral region description: string
 |-- total discharges: long
 |-- average covered charges: string
 |-- average total payments: string
 |-- average medicare payments: string
```

如果有無法轉換的 `string` 值，AWS Glue 會插入 `null`。

另一個選項是將選擇類型轉換為 `struct`，這會保留兩種類型的值。

接著，查看異常的資料列。

```
medicare_res.toDF().where("'provider id' is NULL").show()
```

您會見到以下情況：

```
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|      drg definition|provider id|  provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|948 - SIGNS & SYM...|       null|            INC|       1050 DIVISION ST|      MAUSTON|            WI|            53948|                        WI - Madison|              12|              $11961.41|              $4619.00|                 $3775.33|
|948 - SIGNS & SYM...|       null| INC- ST JOSEPH|     5000 W CHAMBERS ST|    MILWAUKEE|            WI|            53210|                      WI - Milwaukee|              14|              $10514.28|              $5562.50|                 $4522.78|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
```

現在移除兩筆不正確的記錄，如下所示：

```
medicare_dataframe = medicare_res.toDF()
medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")
```

## 步驟 4：映射資料並使用 Apache Spark Lambda 函數
<a name="aws-glue-programming-python-samples-medicaid-lambda-mapping"></a>

AWS Glue 尚未直接支援 Lambda 函式，亦即使用者定義的函式。但是您隨時可以從 Apache Spark `DataFrame` 來回轉換 `DynamicFrame`，以利用除了 `DynamicFrames` 特殊功能之外的 Spark 功能。

接著，將付款資訊轉為數字，讓 Amazon Redshift 或 Amazon Athena 等分析引擎可以更快處理。

```
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

chop_f = udf(lambda x: x[1:], StringType())
medicare_dataframe = medicare_dataframe.withColumn(
        "ACC", chop_f(
            medicare_dataframe["average covered charges"])).withColumn(
                "ATP", chop_f(
                    medicare_dataframe["average total payments"])).withColumn(
                        "AMP", chop_f(
                            medicare_dataframe["average medicare payments"]))
medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
```

`show` 呼叫的輸出如下：

```
+--------+-------+-------+
|     ACC|    ATP|    AMP|
+--------+-------+-------+
|32963.07|5777.24|4763.73|
|15131.85|5787.57|4976.71|
|37560.37|5434.95|4453.79|
|13998.28|5417.56|4129.16|
|31633.27|5658.33|4851.44|
|16920.79|6653.80|5374.14|
|11977.13|5834.74|4761.41|
|35841.09|8031.12|5858.50|
|28523.39|6113.38|5228.40|
|75233.38|5541.05|4386.94|
|67327.92|5461.57|4493.57|
|39607.28|5356.28|4408.20|
|22862.23|5374.65|4186.02|
|31110.85|5366.23|4376.23|
|25411.33|5282.93|4383.73|
| 9234.51|5676.55|4509.11|
|15895.85|5930.11|3972.85|
|19721.16|6192.54|5179.38|
|10710.88|4968.00|3898.88|
|51343.75|5996.00|4962.45|
+--------+-------+-------+
only showing top 20 rows
```

資料仍然全是字串。我們可以使用強大的 `apply_mapping` 轉換方法來捨棄、重新命名、轉換、巢套資料，讓其他資料程式設計語言和系統能夠輕易存取：

```
from awsglue.dynamicframe import DynamicFrame
medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested")
medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'),
                 ('provider id', 'long', 'provider.id', 'long'),
                 ('provider name', 'string', 'provider.name', 'string'),
                 ('provider city', 'string', 'provider.city', 'string'),
                 ('provider state', 'string', 'provider.state', 'string'),
                 ('provider zip code', 'long', 'provider.zip', 'long'),
                 ('hospital referral region description', 'string','rr', 'string'),
                 ('ACC', 'string', 'charges.covered', 'double'),
                 ('ATP', 'string', 'charges.total_pay', 'double'),
                 ('AMP', 'string', 'charges.medicare_pay', 'double')])
medicare_nest_dyf.printSchema()
```

`printSchema` 輸出如下：

```
root
 |-- drg: string
 |-- provider: struct
 |    |-- id: long
 |    |-- name: string
 |    |-- city: string
 |    |-- state: string
 |    |-- zip: long
 |-- rr: string
 |-- charges: struct
 |    |-- covered: double
 |    |-- total_pay: double
 |    |-- medicare_pay: double
```

將資料轉回 Spark `DataFrame` 後，您就可以顯示其樣貌：

```
medicare_nest_dyf.toDF().show()
```

其輸出如下：

```
+--------------------+--------------------+---------------+--------------------+
|                 drg|            provider|             rr|             charges|
+--------------------+--------------------+---------------+--------------------+
|039 - EXTRACRANIA...|[10001,SOUTHEAST ...|    AL - Dothan|[32963.07,5777.24...|
|039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...|
|039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...|
|039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...|
|039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...|
|039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...|
|039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...|
|039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...|
|039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...|
|039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...|
|039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...|
|039 - EXTRACRANIA...|[10055,FLOWERS HO...|    AL - Dothan|[39607.28,5356.28...|
|039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...|
|039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...|
|039 - EXTRACRANIA...|[10083,SOUTH BALD...|    AL - Mobile|[25411.33,5282.93...|
|039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...|
|039 - EXTRACRANIA...|[10090,PROVIDENCE...|    AL - Mobile|[15895.85,5930.11...|
|039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...|
|039 - EXTRACRANIA...|[10100,THOMAS HOS...|    AL - Mobile|[10710.88,4968.0,...|
|039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...|
+--------------------+--------------------+---------------+--------------------+
only showing top 20 rows
```

## 步驟 5：寫入資料至 Apache Parquet
<a name="aws-glue-programming-python-samples-medicaid-writing"></a>

AWS Glue 可讓您輕鬆以關聯式資料庫能有效取用的格式 (例如 Apache Parquet) 撰寫資料：

```
glueContext.write_dynamic_frame.from_options(
       frame = medicare_nest_dyf,
       connection_type = "s3",
       connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"},
       format = "parquet")
```