

# AWS Glue Python コードサンプル
<a name="aws-glue-programming-python-samples"></a>
+ [コード例: データの結合と関係付け](aws-glue-programming-python-samples-legislators.md)
+ [コード例: ResolveChoice、Lambda、および ApplyMapping を使用したデータ準備](aws-glue-programming-python-samples-medicaid.md)

# コード例: データの結合と関係付け
<a name="aws-glue-programming-python-samples-legislators"></a>

この例では、[http://everypolitician.org/](http://everypolitician.org/) から、Amazon Simple Storage Service (Amazon S3) の `sample-dataset` バケットにデータセットをダウンロードして使用します。`s3://awsglue-datasets/examples/us-legislators/all`このデータセットには、米国議会議員や米国下院および上院議員の議席に関する JSON 形式のデータが含まれており、このチュートリアルの目的のため少し変更され、パブリック Amazon S3 バケットで利用可能になりました。

この例のソースコードは、GitHub ウェブサイトの [`join_and_relationalize.py` Glue サンプルリポジトリ](https://github.com/awslabs/aws-glue-samples)の AWS Glue ファイルにあります。

このデータを使用して、このチュートリアルでは以下のことを実行する方法を示します。
+ AWS Glue クローラを使用して、パブリックな Amazon S3 バケットに保存されているオブジェクトを分類し、それらのスキーマを AWS Glue Data Catalog に保存します。
+ クロールの結果のテーブルのメタデータとスキーマを調べます。
+ Data Catalog のメタデータを使用して Python の抽出、転送、およびロード (ETL) スクリプトを記述し、以下の操作を行います。
  + 異なるソースファイル内のデータをまとめて単一のデータテーブルに結合します (つまり、データを非正規化します)。
  + 議員のタイプ別に、結合テーブルを別のテーブルにフィルタリングします。
  + 生成されたデータを後で分析するために Apache Parquet ファイルに分割して書き出します。

AWS で実行中に Python または PySpark スクリプトをデバッグするための推奨方法は、[AWS Glue Studio のノートブック](https://docs.aws.amazon.com/glue/latest/ug/notebooks-chapter.html)を使用することです。

## ステップ 1: Amazon S3 バケット内のデータをクロールする
<a name="aws-glue-programming-python-samples-legislators-crawling"></a>

1. AWS マネジメントコンソール にサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)) を開きます。

1. [クローラーの設定](define-crawler.md) の手順に従って、`s3://awsglue-datasets/examples/us-legislators/all` データセットをクロールできる新しいクローラを、AWS Glue Data Catalog 内のデータベース `legislators` に作成します。サンプルデータは既に、このパブリックな Amazon S3 バケットに用意されています。

1. 新しいクローラを実行し、`legislators` データベースを確認します。

   クローラは、次のメタデータテーブルを作成します。
   + `persons_json`
   + `memberships_json`
   + `organizations_json`
   + `events_json`
   + `areas_json`
   + `countries_r_json`

   これは、議員とその履歴を含むテーブルの半正規化されたテーブルの集合です。

## ステップ 2: 開発エンドポイントノートブックに共通スクリプトを追加する
<a name="aws-glue-programming-python-samples-legislators-boilerplate"></a>

次の共通スクリプトを開発エンドポイントノートブックに貼り付けて、必要な AWS Glue ライブラリをインポートし、単一の `GlueContext` を設定します。

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
```

## ステップ 3: Data Catalog 内のデータでスキーマを確認する
<a name="aws-glue-programming-python-samples-legislators-schemas"></a>

次に、簡単な手順で AWS Glue Data Catalog から DynamicFrame を作成し、そのデータのスキーマを調べます。例えば、`persons_json` テーブルのスキーマを表示するには、ノートブックに以下を追加します。

```
persons = glueContext.create_dynamic_frame.from_catalog(
             database="legislators",
             table_name="persons_json")
print "Count: ", persons.count()
persons.printSchema()
```

プリントコールの出力を以下に示します。

```
Count:  1961
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string
```

テーブル内の各人は、米国議会のメンバーです。

`memberships_json` テーブルのスキーマを表示するには、次のように入力します。

```
memberships = glueContext.create_dynamic_frame.from_catalog(
                 database="legislators",
                 table_name="memberships_json")
print "Count: ", memberships.count()
memberships.printSchema()
```

出力は次のとおりです。

```
Count:  10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
```

`organizations` は政党および上院と下院の 2 つの議会です。`organizations_json` テーブルのスキーマを表示するには、次のように入力します。

```
orgs = glueContext.create_dynamic_frame.from_catalog(
           database="legislators",
           table_name="organizations_json")
print "Count: ", orgs.count()
orgs.printSchema()
```

出力は次のとおりです。

```
Count:  13
root
|-- classification: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string
```

## ステップ 4: データをフィルタリングする
<a name="aws-glue-programming-python-samples-legislators-filtering"></a>

次に、必要なフィールドのみを保持し、`id` の名前を `org_id` に変更します。データセットは、小さいため全体を表示することができます。

`toDF()` は `DynamicFrame` を Apache Spark に変換するので、Apache Spark SQL に既に存在する `DataFrame` 変換を適用できます。

```
orgs = orgs.drop_fields(['other_names',
                        'identifiers']).rename_field(
                            'id', 'org_id').rename_field(
                               'name', 'org_name')
orgs.toDF().show()
```

以下に出力を示します。

```
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification|              org_id|            org_name|               links|seats|       type|               image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|         party|            party/al|                  AL|                null| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/democrat-li...|    Democrat-Liberal|[[website,http://...| null|       null|                null|
|   legislature|d56acebe-8fdc-47b...|House of Represen...|                null|  435|lower house|                null|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|party/new_progres...|     New Progressive|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/popular_dem...|    Popular Democrat|[[website,http://...| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/republican-...|Republican-Conser...|[[website,http://...| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|   legislature|8fa6c3d2-71dc-478...|              Senate|                null|  100|upper house|                null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
```

`memberships` に表示される `organizations` を表示するには、次のように入力します。

```
memberships.select_fields(['organization_id']).toDF().distinct().show()
```

以下に出力を示します。

```
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+
```

## ステップ 5: すべてをまとめる
<a name="aws-glue-programming-python-samples-legislators-joining"></a>

ここで AWS Glue を使用して、これらのリレーショナルテーブルを結合し、議員の `memberships` とそれに対応する `organizations` の 1 つの完全な履歴テーブルを作成します。

1. まず、`persons` および `memberships` を `id` および `person_id` と結合します。

1. 次に、結果を `orgs` と `org_id` および `organization_id` と結合します。

1. 次に、冗長なフィールド `person_id` および `org_id` を削除します。

これらの操作はすべて、1 行の (拡張された) コードで行うことができます。

```
l_history = Join.apply(orgs,
                       Join.apply(persons, memberships, 'id', 'person_id'),
                       'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])
print "Count: ", l_history.count()
l_history.printSchema()
```

出力は次のとおりです。

```
Count:  10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string
```

これで、分析に使用できる最終テーブルが作成されました。これは、分析のためのコンパクトで効率的な形式 (つまり Parquet) で記述することができ、AWS Glue、Amazon Athena、または Amazon Redshift Spectrum で SQL を実行できます。

次の呼び出しは、複数のファイルにわたってテーブルを書き込んで、後で解析するときに高速な並列読み込みをサポートします。

```
glueContext.write_dynamic_frame.from_options(frame = l_history,
          connection_type = "s3",
          connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"},
          format = "parquet")
```

すべての履歴データを単一のファイルにまとめるには、データフレームに変換し、再パーティション化して書き出す必要があります。

```
s_history = l_history.toDF().repartition(1)
s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')
```

または、上院と下院でそれを分けたい場合。

```
l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part',
                               partitionBy=['org_name'])
```

## ステップ 6: リレーショナルデータベース向けにデータを変換する
<a name="aws-glue-programming-python-samples-legislators-writing"></a>

AWS Glue では半構造化データでも Amazon Redshift のようなリレーショナルデータベースに簡単に書き込むことができるのです。これにより、フレーム内のオブジェクトがどれほど複雑であっても、`DynamicFrames` をフラット化する変換 `relationalize` が提供されます。

この例の `l_history` `DynamicFrame` を使用して、ルートテーブル (`hist_root`) の名前と一時的な作業パスを `relationalize` に渡します。これにより、`DynamicFrameCollection` が返されます。その後、そのコレクション内の `DynamicFrames` の名前を一覧表示できます。

```
dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/")
dfc.keys()
```

`keys` 呼び出しの出力は次のとおりです。

```
[u'hist_root', u'hist_root_contact_details', u'hist_root_links',
 u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']
```

`Relationalize` は、履歴テーブルを 6 つの新しいテーブルに分割します。`DynamicFrame` の各オブジェクトのレコードを含むルートテーブル、および配列の補助テーブルです。リレーショナルデータベースでの配列の処理は、特に配列が大きくなる場合に、最適ではないことがあります。配列を別のテーブルに分けることで、クエリの実行速度が大幅に向上します。

次に、`contact_details` を調べて分離を確認します。

```
l_history.select_fields('contact_details').printSchema()
dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()
```

`show` 呼び出しの出力は次のとおりです。

```
root
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10|    0|                     fax|                         |
| 10|    1|                        |             202-225-1314|
| 10|    2|                   phone|                         |
| 10|    3|                        |             202-225-3772|
| 10|    4|                 twitter|                         |
| 10|    5|                        |          MikeRossUpdates|
| 75|    0|                     fax|                         |
| 75|    1|                        |             202-225-7856|
| 75|    2|                   phone|                         |
| 75|    3|                        |             202-225-2711|
| 75|    4|                 twitter|                         |
| 75|    5|                        |                SenCapito|
+---+-----+------------------------+-------------------------+
```

`contact_details` フィールドは、元の `DynamicFrame` の構造体の配列です。これらの配列の各要素は、`index` によってインデックス化された、補助テーブルの個別の行です。ここで `id` は、`contact_details` キーを使用する `hist_root` テーブルの外部キーです。

```
dfc.select('hist_root').toDF().where(
    "contact_details = 10 or contact_details = 75").select(
       ['id', 'given_name', 'family_name', 'contact_details']).show()
```

出力を次に示します。

```
+--------------------+----------+-----------+---------------+
|                  id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...|      Mike|       Ross|             10|
|e3c60f34-7d1b-4c0...|   Shelley|     Capito|             75|
+--------------------+----------+-----------+---------------+
```

これらのコマンドでは、`toDF()` および `where` 式を使用して、表示する行をフィルタリングすることに注意してください。

したがって、`hist_root` テーブルを補助テーブルと結合すると、次のことが可能になります。
+ 配列をサポートせずにデータベースにデータをロードします。
+ SQL を使用して配列内の各項目にクエリを実行します。

AWS Glue 接続を使用して、Amazon Redshift の認証情報を安全に保存してアクセスします。独自の接続の作成方法については、「[データへの接続](glue-connections.md)」を参照してください。

`DynamicFrames` を 1 つずつ切り替えて、接続にデータを書き込みできるようになりました。

```
for df_name in dfc.keys():
  m_df = dfc.select(df_name)
  print "Writing to table: ", df_name
  glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)
```

接続設定は、リレーショナルデータベースのタイプによって異なります。
+ Amazon Redshift への書き込み手順については、「[Redshift 接続](aws-glue-programming-etl-connect-redshift-home.md)」を参照してください。
+ その他のデータベースについては、「[AWS Glue for Spark での ETL の接続タイプとオプション](aws-glue-programming-etl-connect.md)」を参照してください。

## 結論
<a name="aws-glue-programming-python-samples-legislators-conclusion"></a>

全体として、AWS Glue は非常に柔軟です。通常は書くのに数日かかるところを、数行のコードで達成できます。ソースからターゲットへの ETL スクリプトの全体は、GitHub の [AWS Glue サンプル](https://github.com/awslabs/aws-glue-samples)内の Python ファイル `join_and_relationalize.py` にあります。

# コード例: ResolveChoice、Lambda、および ApplyMapping を使用したデータ準備
<a name="aws-glue-programming-python-samples-medicaid"></a>

この例で使用されているデータセットは、2 つの [Data.CMS.gov](https://data.cms.gov) データセット (Inpatient Prospective Payment System Provider Summary for the Top 100 Diagnosis-Related Groups - FY2011 および Inpatient Charge Data FY 2011) からダウンロードされた、メディケアプロバイダの支払いデータで構成されています。ダウンロードした後、データセットを修正してファイルの最後にいくつかの誤ったレコードを追加しました。この変更されたファイルは、パブリックな Amazon S3 バケット (`s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv`) 内に置かれています。

この例のソースコードは、`data_cleaning_and_lambda.py`[ 例AWS Glue GitHub リポジトリの ](https://github.com/awslabs/aws-glue-samples) ファイルにあります。

AWS で実行中に Python または PySpark スクリプトをデバッグするための推奨方法は、[AWS Glue Studio のノートブック](https://docs.aws.amazon.com/glue/latest/ug/notebooks-chapter.html)を使用することです。

## ステップ 1: Amazon S3 バケット内のデータをクロールする
<a name="aws-glue-programming-python-samples-medicaid-crawling"></a>

1. AWS マネジメントコンソール にサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)) を開きます。

1. [クローラーの設定](define-crawler.md) で説明されているプロセスに従って `s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv` ファイルをクロールできる新しいクローラを作成し、得られた結果のメタデータを AWS Glue Data Catalog の `payments` という名前のデータベースに配置します。

1. 新しいクローラを実行し、`payments` データベースを確認します。クローラは、最初のファイルを読み込んでファイルの形式と区切り記号を判断してから、データベースに `medicare` という名前のメタデータテーブルを作成したことが分かります。

   新しい `medicare` テーブルのスキーマは次のようになります。

   ```
   Column  name                            Data type
   ==================================================
   drg definition                             string
   provider id                                bigint
   provider name                              string
   provider street address                    string
   provider city                              string
   provider state                             string
   provider zip code                          bigint
   hospital referral region description       string
   total discharges                           bigint
   average covered charges                    string
   average total payments                     string
   average medicare payments                  string
   ```

## ステップ 2: 開発エンドポイントノートブックに共通スクリプトを追加する
<a name="aws-glue-programming-python-samples-medicaid-boilerplate"></a>

次の共通スクリプトを開発エンドポイントノートブックに貼り付けて、必要な AWS Glue ライブラリをインポートし、単一の `GlueContext` を設定します。

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
```

## ステップ 3: 異なるスキーマ解析を比較する
<a name="aws-glue-programming-python-samples-medicaid-schemas"></a>

次に、Apache Spark `DataFrame` によって認識されたスキーマが、AWS Glue クローラによって記録されたスキーマと同じかどうかを確認できます。以下のコードを実行します。

```
medicare = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").load(
   's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv')
medicare.printSchema()
```

`printSchema` 呼び出しの出力は次のとおりです。

```
root
 |-- DRG Definition: string (nullable = true)
 |-- Provider Id: string (nullable = true)
 |-- Provider Name: string (nullable = true)
 |-- Provider Street Address: string (nullable = true)
 |-- Provider City: string (nullable = true)
 |-- Provider State: string (nullable = true)
 |-- Provider Zip Code: integer (nullable = true)
 |-- Hospital Referral Region Description: string (nullable = true)
 |--  Total Discharges : integer (nullable = true)
 |--  Average Covered Charges : string (nullable = true)
 |--  Average Total Payments : string (nullable = true)
 |-- Average Medicare Payments: string (nullable = true)
```

次に、AWS Glue `DynamicFrame` によって生成されるスキーマを確認します。

```
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
       database = "payments",
       table_name = "medicare")
medicare_dynamicframe.printSchema()
```

`printSchema` の出力は次のとおりです。

```
root
 |-- drg definition: string
 |-- provider id: choice
 |    |-- long
 |    |-- string
 |-- provider name: string
 |-- provider street address: string
 |-- provider city: string
 |-- provider state: string
 |-- provider zip code: long
 |-- hospital referral region description: string
 |-- total discharges: long
 |-- average covered charges: string
 |-- average total payments: string
 |-- average medicare payments: string
```

`DynamicFrame` は、`provider id` が `long` 型または `string` 型のいずれかであるスキーマを生成します。`DataFrame` スキーマは `Provider Id` を `string` 型としてリストし、Data Catalog は `provider id` を `bigint` 型としてリストします。

正しいものはどちらでしょうか。ファイルの末尾には、その列に `string` 値を持つ 2 つのレコード (160,000 レコードのうち) があります。これらは、問題を説明するために導入されたエラーのあるレコードです。

このような問題に対処するために、AWS Glue `DynamicFrame` では *Choice* 型の概念を導入しています。この場合、`DynamicFrame` は、その列に `long` 値と `string` 値の両方が存在することを示しています。AWS Glue クローラはデータの 2 MB のプレフィックスのみを考慮しているため、`string` 値を見落としました。Apache Spark `DataFrame` はデータセット全体を考慮しましたが、最も一般的な型、つまり `string` 型を強制的に列に割り当てました。実際、慣れていない複雑な型やバリエーションがある場合にも、Spark は最も一般的なケースを使用することがあります。

`provider id` 列のクエリを実行するには、Choice 型をまず解決する必要があります。`DynamicFrame` で、`cast:long` オプションを指定して `resolveChoice` 変換メソッドを使用すると、これらの `string` 値を `long` 値に変換できます。

```
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
medicare_res.printSchema()
```

この場合、`printSchema` の出力は次のようになります。

```
root
 |-- drg definition: string
 |-- provider id: long
 |-- provider name: string
 |-- provider street address: string
 |-- provider city: string
 |-- provider state: string
 |-- provider zip code: long
 |-- hospital referral region description: string
 |-- total discharges: long
 |-- average covered charges: string
 |-- average total payments: string
 |-- average medicare payments: string
```

値がキャストできない `string` だった場合に、AWS Glue は `null` を挿入しました。

もう 1 つのオプションは、両方のタイプの値を保持する `struct` に Choice 型を変換することです。

次に、異常だった行を確認してみましょう。

```
medicare_res.toDF().where("'provider id' is NULL").show()
```

次のように表示されています。

```
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|      drg definition|provider id|  provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|948 - SIGNS & SYM...|       null|            INC|       1050 DIVISION ST|      MAUSTON|            WI|            53948|                        WI - Madison|              12|              $11961.41|              $4619.00|                 $3775.33|
|948 - SIGNS & SYM...|       null| INC- ST JOSEPH|     5000 W CHAMBERS ST|    MILWAUKEE|            WI|            53210|                      WI - Milwaukee|              14|              $10514.28|              $5562.50|                 $4522.78|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
```

次のように、2 つの不正な形式のレコードを削除します。

```
medicare_dataframe = medicare_res.toDF()
medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")
```

## ステップ 4: データのマッピングと Apache Spark Lambda 関数の使用
<a name="aws-glue-programming-python-samples-medicaid-lambda-mapping"></a>

AWS Glue では、まだユーザー定義関数とも呼ばれる Lambda 関数が直接サポートされていません。しかし、いつでも `DynamicFrame` を Apache Spark `DataFrame` との間で変換して、`DynamicFrames` の特殊な機能に加えて Spark の機能を利用できます。

次に、支払い情報を数字に変換すると、Amazon Redshift や Amazon Athena のような分析エンジンが、より迅速に数値処理を実行できるようになります。

```
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

chop_f = udf(lambda x: x[1:], StringType())
medicare_dataframe = medicare_dataframe.withColumn(
        "ACC", chop_f(
            medicare_dataframe["average covered charges"])).withColumn(
                "ATP", chop_f(
                    medicare_dataframe["average total payments"])).withColumn(
                        "AMP", chop_f(
                            medicare_dataframe["average medicare payments"]))
medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
```

`show` 呼び出しの出力は次のとおりです。

```
+--------+-------+-------+
|     ACC|    ATP|    AMP|
+--------+-------+-------+
|32963.07|5777.24|4763.73|
|15131.85|5787.57|4976.71|
|37560.37|5434.95|4453.79|
|13998.28|5417.56|4129.16|
|31633.27|5658.33|4851.44|
|16920.79|6653.80|5374.14|
|11977.13|5834.74|4761.41|
|35841.09|8031.12|5858.50|
|28523.39|6113.38|5228.40|
|75233.38|5541.05|4386.94|
|67327.92|5461.57|4493.57|
|39607.28|5356.28|4408.20|
|22862.23|5374.65|4186.02|
|31110.85|5366.23|4376.23|
|25411.33|5282.93|4383.73|
| 9234.51|5676.55|4509.11|
|15895.85|5930.11|3972.85|
|19721.16|6192.54|5179.38|
|10710.88|4968.00|3898.88|
|51343.75|5996.00|4962.45|
+--------+-------+-------+
only showing top 20 rows
```

これらは、すべてデータ内ではまだ文字列です。強力な `apply_mapping` 変換メソッドを使用して、データをドロップ、名前変更、キャスト、およびネストし、他のデータプログラミング言語やシステムで容易にアクセスできるようにします。

```
from awsglue.dynamicframe import DynamicFrame
medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested")
medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'),
                 ('provider id', 'long', 'provider.id', 'long'),
                 ('provider name', 'string', 'provider.name', 'string'),
                 ('provider city', 'string', 'provider.city', 'string'),
                 ('provider state', 'string', 'provider.state', 'string'),
                 ('provider zip code', 'long', 'provider.zip', 'long'),
                 ('hospital referral region description', 'string','rr', 'string'),
                 ('ACC', 'string', 'charges.covered', 'double'),
                 ('ATP', 'string', 'charges.total_pay', 'double'),
                 ('AMP', 'string', 'charges.medicare_pay', 'double')])
medicare_nest_dyf.printSchema()
```

`printSchema` の出力は次のとおりです。

```
root
 |-- drg: string
 |-- provider: struct
 |    |-- id: long
 |    |-- name: string
 |    |-- city: string
 |    |-- state: string
 |    |-- zip: long
 |-- rr: string
 |-- charges: struct
 |    |-- covered: double
 |    |-- total_pay: double
 |    |-- medicare_pay: double
```

データを Spark `DataFrame` に戻すと、現在どのような状態かが分かります。

```
medicare_nest_dyf.toDF().show()
```

出力は次のとおりです。

```
+--------------------+--------------------+---------------+--------------------+
|                 drg|            provider|             rr|             charges|
+--------------------+--------------------+---------------+--------------------+
|039 - EXTRACRANIA...|[10001,SOUTHEAST ...|    AL - Dothan|[32963.07,5777.24...|
|039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...|
|039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...|
|039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...|
|039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...|
|039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...|
|039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...|
|039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...|
|039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...|
|039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...|
|039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...|
|039 - EXTRACRANIA...|[10055,FLOWERS HO...|    AL - Dothan|[39607.28,5356.28...|
|039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...|
|039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...|
|039 - EXTRACRANIA...|[10083,SOUTH BALD...|    AL - Mobile|[25411.33,5282.93...|
|039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...|
|039 - EXTRACRANIA...|[10090,PROVIDENCE...|    AL - Mobile|[15895.85,5930.11...|
|039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...|
|039 - EXTRACRANIA...|[10100,THOMAS HOS...|    AL - Mobile|[10710.88,4968.0,...|
|039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...|
+--------------------+--------------------+---------------+--------------------+
only showing top 20 rows
```

## ステップ 5: Apache Parquet にデータを書き込む
<a name="aws-glue-programming-python-samples-medicaid-writing"></a>

AWS Glue は、リレーショナルデータベースが効果的に消費できる Apache Parquet のような形式でデータを書き込むことを容易にします。

```
glueContext.write_dynamic_frame.from_options(
       frame = medicare_nest_dyf,
       connection_type = "s3",
       connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"},
       format = "parquet")
```