

# コード例: ResolveChoice、Lambda、および ApplyMapping を使用したデータ準備
<a name="aws-glue-programming-python-samples-medicaid"></a>

この例で使用されているデータセットは、2 つの [Data.CMS.gov](https://data.cms.gov) データセット (Inpatient Prospective Payment System Provider Summary for the Top 100 Diagnosis-Related Groups - FY2011 および Inpatient Charge Data FY 2011) からダウンロードされた、メディケアプロバイダの支払いデータで構成されています。ダウンロードした後、データセットを修正してファイルの最後にいくつかの誤ったレコードを追加しました。この変更されたファイルは、パブリックな Amazon S3 バケット (`s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv`) 内に置かれています。

この例のソースコードは、`data_cleaning_and_lambda.py`[ 例AWS Glue GitHub リポジトリの ](https://github.com/awslabs/aws-glue-samples) ファイルにあります。

AWS で実行中に Python または PySpark スクリプトをデバッグするための推奨方法は、[AWS Glue Studio のノートブック](https://docs.aws.amazon.com/glue/latest/ug/notebooks-chapter.html)を使用することです。

## ステップ 1: Amazon S3 バケット内のデータをクロールする
<a name="aws-glue-programming-python-samples-medicaid-crawling"></a>

1. AWS マネジメントコンソール にサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)) を開きます。

1. [クローラーの設定](define-crawler.md) で説明されているプロセスに従って `s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv` ファイルをクロールできる新しいクローラを作成し、得られた結果のメタデータを AWS Glue Data Catalog の `payments` という名前のデータベースに配置します。

1. 新しいクローラを実行し、`payments` データベースを確認します。クローラは、最初のファイルを読み込んでファイルの形式と区切り記号を判断してから、データベースに `medicare` という名前のメタデータテーブルを作成したことが分かります。

   新しい `medicare` テーブルのスキーマは次のようになります。

   ```
   Column  name                            Data type
   ==================================================
   drg definition                             string
   provider id                                bigint
   provider name                              string
   provider street address                    string
   provider city                              string
   provider state                             string
   provider zip code                          bigint
   hospital referral region description       string
   total discharges                           bigint
   average covered charges                    string
   average total payments                     string
   average medicare payments                  string
   ```

## ステップ 2: 開発エンドポイントノートブックに共通スクリプトを追加する
<a name="aws-glue-programming-python-samples-medicaid-boilerplate"></a>

次の共通スクリプトを開発エンドポイントノートブックに貼り付けて、必要な AWS Glue ライブラリをインポートし、単一の `GlueContext` を設定します。

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
```

## ステップ 3: 異なるスキーマ解析を比較する
<a name="aws-glue-programming-python-samples-medicaid-schemas"></a>

次に、Apache Spark `DataFrame` によって認識されたスキーマが、AWS Glue クローラによって記録されたスキーマと同じかどうかを確認できます。以下のコードを実行します。

```
medicare = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").load(
   's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv')
medicare.printSchema()
```

`printSchema` 呼び出しの出力は次のとおりです。

```
root
 |-- DRG Definition: string (nullable = true)
 |-- Provider Id: string (nullable = true)
 |-- Provider Name: string (nullable = true)
 |-- Provider Street Address: string (nullable = true)
 |-- Provider City: string (nullable = true)
 |-- Provider State: string (nullable = true)
 |-- Provider Zip Code: integer (nullable = true)
 |-- Hospital Referral Region Description: string (nullable = true)
 |--  Total Discharges : integer (nullable = true)
 |--  Average Covered Charges : string (nullable = true)
 |--  Average Total Payments : string (nullable = true)
 |-- Average Medicare Payments: string (nullable = true)
```

次に、AWS Glue `DynamicFrame` によって生成されるスキーマを確認します。

```
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
       database = "payments",
       table_name = "medicare")
medicare_dynamicframe.printSchema()
```

`printSchema` の出力は次のとおりです。

```
root
 |-- drg definition: string
 |-- provider id: choice
 |    |-- long
 |    |-- string
 |-- provider name: string
 |-- provider street address: string
 |-- provider city: string
 |-- provider state: string
 |-- provider zip code: long
 |-- hospital referral region description: string
 |-- total discharges: long
 |-- average covered charges: string
 |-- average total payments: string
 |-- average medicare payments: string
```

`DynamicFrame` は、`provider id` が `long` 型または `string` 型のいずれかであるスキーマを生成します。`DataFrame` スキーマは `Provider Id` を `string` 型としてリストし、Data Catalog は `provider id` を `bigint` 型としてリストします。

正しいものはどちらでしょうか。ファイルの末尾には、その列に `string` 値を持つ 2 つのレコード (160,000 レコードのうち) があります。これらは、問題を説明するために導入されたエラーのあるレコードです。

このような問題に対処するために、AWS Glue `DynamicFrame` では *Choice* 型の概念を導入しています。この場合、`DynamicFrame` は、その列に `long` 値と `string` 値の両方が存在することを示しています。AWS Glue クローラはデータの 2 MB のプレフィックスのみを考慮しているため、`string` 値を見落としました。Apache Spark `DataFrame` はデータセット全体を考慮しましたが、最も一般的な型、つまり `string` 型を強制的に列に割り当てました。実際、慣れていない複雑な型やバリエーションがある場合にも、Spark は最も一般的なケースを使用することがあります。

`provider id` 列のクエリを実行するには、Choice 型をまず解決する必要があります。`DynamicFrame` で、`cast:long` オプションを指定して `resolveChoice` 変換メソッドを使用すると、これらの `string` 値を `long` 値に変換できます。

```
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
medicare_res.printSchema()
```

この場合、`printSchema` の出力は次のようになります。

```
root
 |-- drg definition: string
 |-- provider id: long
 |-- provider name: string
 |-- provider street address: string
 |-- provider city: string
 |-- provider state: string
 |-- provider zip code: long
 |-- hospital referral region description: string
 |-- total discharges: long
 |-- average covered charges: string
 |-- average total payments: string
 |-- average medicare payments: string
```

値がキャストできない `string` だった場合に、AWS Glue は `null` を挿入しました。

もう 1 つのオプションは、両方のタイプの値を保持する `struct` に Choice 型を変換することです。

次に、異常だった行を確認してみましょう。

```
medicare_res.toDF().where("'provider id' is NULL").show()
```

次のように表示されています。

```
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|      drg definition|provider id|  provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|948 - SIGNS & SYM...|       null|            INC|       1050 DIVISION ST|      MAUSTON|            WI|            53948|                        WI - Madison|              12|              $11961.41|              $4619.00|                 $3775.33|
|948 - SIGNS & SYM...|       null| INC- ST JOSEPH|     5000 W CHAMBERS ST|    MILWAUKEE|            WI|            53210|                      WI - Milwaukee|              14|              $10514.28|              $5562.50|                 $4522.78|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
```

次のように、2 つの不正な形式のレコードを削除します。

```
medicare_dataframe = medicare_res.toDF()
medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")
```

## ステップ 4: データのマッピングと Apache Spark Lambda 関数の使用
<a name="aws-glue-programming-python-samples-medicaid-lambda-mapping"></a>

AWS Glue では、まだユーザー定義関数とも呼ばれる Lambda 関数が直接サポートされていません。しかし、いつでも `DynamicFrame` を Apache Spark `DataFrame` との間で変換して、`DynamicFrames` の特殊な機能に加えて Spark の機能を利用できます。

次に、支払い情報を数字に変換すると、Amazon Redshift や Amazon Athena のような分析エンジンが、より迅速に数値処理を実行できるようになります。

```
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

chop_f = udf(lambda x: x[1:], StringType())
medicare_dataframe = medicare_dataframe.withColumn(
        "ACC", chop_f(
            medicare_dataframe["average covered charges"])).withColumn(
                "ATP", chop_f(
                    medicare_dataframe["average total payments"])).withColumn(
                        "AMP", chop_f(
                            medicare_dataframe["average medicare payments"]))
medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
```

`show` 呼び出しの出力は次のとおりです。

```
+--------+-------+-------+
|     ACC|    ATP|    AMP|
+--------+-------+-------+
|32963.07|5777.24|4763.73|
|15131.85|5787.57|4976.71|
|37560.37|5434.95|4453.79|
|13998.28|5417.56|4129.16|
|31633.27|5658.33|4851.44|
|16920.79|6653.80|5374.14|
|11977.13|5834.74|4761.41|
|35841.09|8031.12|5858.50|
|28523.39|6113.38|5228.40|
|75233.38|5541.05|4386.94|
|67327.92|5461.57|4493.57|
|39607.28|5356.28|4408.20|
|22862.23|5374.65|4186.02|
|31110.85|5366.23|4376.23|
|25411.33|5282.93|4383.73|
| 9234.51|5676.55|4509.11|
|15895.85|5930.11|3972.85|
|19721.16|6192.54|5179.38|
|10710.88|4968.00|3898.88|
|51343.75|5996.00|4962.45|
+--------+-------+-------+
only showing top 20 rows
```

これらは、すべてデータ内ではまだ文字列です。強力な `apply_mapping` 変換メソッドを使用して、データをドロップ、名前変更、キャスト、およびネストし、他のデータプログラミング言語やシステムで容易にアクセスできるようにします。

```
from awsglue.dynamicframe import DynamicFrame
medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested")
medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'),
                 ('provider id', 'long', 'provider.id', 'long'),
                 ('provider name', 'string', 'provider.name', 'string'),
                 ('provider city', 'string', 'provider.city', 'string'),
                 ('provider state', 'string', 'provider.state', 'string'),
                 ('provider zip code', 'long', 'provider.zip', 'long'),
                 ('hospital referral region description', 'string','rr', 'string'),
                 ('ACC', 'string', 'charges.covered', 'double'),
                 ('ATP', 'string', 'charges.total_pay', 'double'),
                 ('AMP', 'string', 'charges.medicare_pay', 'double')])
medicare_nest_dyf.printSchema()
```

`printSchema` の出力は次のとおりです。

```
root
 |-- drg: string
 |-- provider: struct
 |    |-- id: long
 |    |-- name: string
 |    |-- city: string
 |    |-- state: string
 |    |-- zip: long
 |-- rr: string
 |-- charges: struct
 |    |-- covered: double
 |    |-- total_pay: double
 |    |-- medicare_pay: double
```

データを Spark `DataFrame` に戻すと、現在どのような状態かが分かります。

```
medicare_nest_dyf.toDF().show()
```

出力は次のとおりです。

```
+--------------------+--------------------+---------------+--------------------+
|                 drg|            provider|             rr|             charges|
+--------------------+--------------------+---------------+--------------------+
|039 - EXTRACRANIA...|[10001,SOUTHEAST ...|    AL - Dothan|[32963.07,5777.24...|
|039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...|
|039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...|
|039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...|
|039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...|
|039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...|
|039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...|
|039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...|
|039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...|
|039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...|
|039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...|
|039 - EXTRACRANIA...|[10055,FLOWERS HO...|    AL - Dothan|[39607.28,5356.28...|
|039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...|
|039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...|
|039 - EXTRACRANIA...|[10083,SOUTH BALD...|    AL - Mobile|[25411.33,5282.93...|
|039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...|
|039 - EXTRACRANIA...|[10090,PROVIDENCE...|    AL - Mobile|[15895.85,5930.11...|
|039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...|
|039 - EXTRACRANIA...|[10100,THOMAS HOS...|    AL - Mobile|[10710.88,4968.0,...|
|039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...|
+--------------------+--------------------+---------------+--------------------+
only showing top 20 rows
```

## ステップ 5: Apache Parquet にデータを書き込む
<a name="aws-glue-programming-python-samples-medicaid-writing"></a>

AWS Glue は、リレーショナルデータベースが効果的に消費できる Apache Parquet のような形式でデータを書き込むことを容易にします。

```
glueContext.write_dynamic_frame.from_options(
       frame = medicare_nest_dyf,
       connection_type = "s3",
       connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"},
       format = "parquet")
```