程式碼範例:使用 ResolveChoice Lambda 和 ApplyMapping - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

程式碼範例:使用 ResolveChoice Lambda 和 ApplyMapping

此範例使用的資料集,包含從兩個 Data.CMS.gov 資料集下載的美國聯邦醫療保險 (Medicare) 供應商付款資料:「住院患者預期付款系統供應商前 100 大診斷相關群組摘要 - FY2011」和「住院患者費用資料 FY 2011」。下載資料之後,我們修改了資料集,以在檔案結尾處引入幾個錯誤記錄。上述經修改的檔案位於公有 Amazon S3 儲存貯體,位置在 s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv

您可以在AWS Glue範例 GitHub 儲存庫的data_cleaning_and_lambda.py檔案中找到此範例的原始程式碼。

在執行時偵錯 Python 或 PySpark 指令碼的建議方式 AWS 是在 AWS Glue Studio 上使用筆記型電腦

步驟 1:在 Amazon S3 儲存貯體中網路爬取資料

  1. 請登入 AWS Management Console 並開啟AWS Glue主控台,網址為 https://console.aws.amazon.com/glue/

  2. 遵循中所述的程序設定爬蟲程式,建立可編目s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv檔案的新爬行者程式,並將產生的中繼資料放入 AWS Glue Data Catalog payments 中名為的資料庫中。

  3. 執行新的爬蟲程式,接著查看 payments 資料庫。在讀取檔案開頭,確定其格式和分隔符號後,爬蟲程式應會於資料庫建立一個命名為 medicare 的中繼資料資料表。

    medicare 資料表的結構描述如下:

    Column name Data type ================================================== drg definition string provider id bigint provider name string provider street address string provider city string provider state string provider zip code bigint hospital referral region description string total discharges bigint average covered charges string average total payments string average medicare payments string

步驟 2:新增樣板指令碼至開發端點筆記本

將以下樣板指令碼貼至開發端點以匯入您需要的 AWS Glue 程式庫,並且設定單一的 GlueContext

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

步驟 3:比較不同的結構描述剖析

接著,您可以查看 Apache Spark DataFrame 辨識出的結構描述是否跟 AWS Glue 爬蟲程式所記錄的相同。執行此程式碼:

medicare = spark.read.format( "com.databricks.spark.csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()

以下為 printSchema 呼叫的輸出:

root |-- DRG Definition: string (nullable = true) |-- Provider Id: string (nullable = true) |-- Provider Name: string (nullable = true) |-- Provider Street Address: string (nullable = true) |-- Provider City: string (nullable = true) |-- Provider State: string (nullable = true) |-- Provider Zip Code: integer (nullable = true) |-- Hospital Referral Region Description: string (nullable = true) |-- Total Discharges : integer (nullable = true) |-- Average Covered Charges : string (nullable = true) |-- Average Total Payments : string (nullable = true) |-- Average Medicare Payments: string (nullable = true)

接著,查看 AWS Glue DynamicFrame 產生的結構描述:

medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog( database = "payments", table_name = "medicare") medicare_dynamicframe.printSchema()

printSchema 的輸出如下:

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

DynamicFrame 產生的結構描述中,provider id 可以是 longstring 類型。DataFrame 結構描述將 Provider Id 列為 string 類型, Data Catalog 則是將 provider id 列為 bigint 類型。

何者正確? 在檔案的最後有兩筆記錄 (總共 160,000 筆記錄),且該欄位中有 string 值。這些就是之前引入以示範產生問題的錯誤記錄。

為了解決這種問題,AWS GlueDynamicFrame 採用 choice (選擇) 類型的概念。在此例中,DynamicFrame 展示了 longstring 值都會在該欄出現。AWS Glue 爬蟲程式遺漏了 string 值,原因是只考量到資料的前 2 MB。Apache Spark DataFrame 會考量整個資料集,但被強制將最普遍的類型指派給該欄位,亦即 string。事實上,Spark 在遇到複雜類型或不熟悉的變化時,通常都會採取最普遍的作法。

要查詢 provider id 欄,請先解決選擇類型。您可以使用 DynamicFrame 中的 resolveChoice 轉換方法,藉由以下的 cast:long 選項將 string 值轉換為 long 值。

medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')]) medicare_res.printSchema()

printSchema 輸出就會是:

root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

如果有無法轉換的 string 值,AWS Glue 會插入 null

另一個選項是將選擇類型轉換為 struct,這會保留兩種類型的值。

接著,查看異常的資料列。

medicare_res.toDF().where("'provider id' is NULL").show()

您會見到以下情況:

+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+ | drg definition|provider id| provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments| +--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+ |948 - SIGNS & SYM...| null| INC| 1050 DIVISION ST| MAUSTON| WI| 53948| WI - Madison| 12| $11961.41| $4619.00| $3775.33| |948 - SIGNS & SYM...| null| INC- ST JOSEPH| 5000 W CHAMBERS ST| MILWAUKEE| WI| 53210| WI - Milwaukee| 14| $10514.28| $5562.50| $4522.78| +--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+

現在移除兩筆不正確的記錄,如下所示:

medicare_dataframe = medicare_res.toDF() medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")

步驟 4:映射資料並使用 Apache Spark Lambda 函數

AWS Glue 尚未直接支援 Lambda 函式,亦即使用者定義的函式。但是您隨時可以從 Apache Spark DataFrame 來回轉換 DynamicFrame,以利用除了 DynamicFrames 特殊功能之外的 Spark 功能。

接著,將付款資訊轉為數字,讓 Amazon Redshift 或 Amazon Athena 等分析引擎可以更快處理。

from pyspark.sql.functions import udf from pyspark.sql.types import StringType chop_f = udf(lambda x: x[1:], StringType()) medicare_dataframe = medicare_dataframe.withColumn( "ACC", chop_f( medicare_dataframe["average covered charges"])).withColumn( "ATP", chop_f( medicare_dataframe["average total payments"])).withColumn( "AMP", chop_f( medicare_dataframe["average medicare payments"])) medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()

show 呼叫的輸出如下:

+--------+-------+-------+ | ACC| ATP| AMP| +--------+-------+-------+ |32963.07|5777.24|4763.73| |15131.85|5787.57|4976.71| |37560.37|5434.95|4453.79| |13998.28|5417.56|4129.16| |31633.27|5658.33|4851.44| |16920.79|6653.80|5374.14| |11977.13|5834.74|4761.41| |35841.09|8031.12|5858.50| |28523.39|6113.38|5228.40| |75233.38|5541.05|4386.94| |67327.92|5461.57|4493.57| |39607.28|5356.28|4408.20| |22862.23|5374.65|4186.02| |31110.85|5366.23|4376.23| |25411.33|5282.93|4383.73| | 9234.51|5676.55|4509.11| |15895.85|5930.11|3972.85| |19721.16|6192.54|5179.38| |10710.88|4968.00|3898.88| |51343.75|5996.00|4962.45| +--------+-------+-------+ only showing top 20 rows

資料仍然全是字串。我們可以使用強大的 apply_mapping 轉換方法來捨棄、重新命名、轉換、巢套資料,讓其他資料程式設計語言和系統能夠輕易存取:

from awsglue.dynamicframe import DynamicFrame medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested") medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'), ('provider id', 'long', 'provider.id', 'long'), ('provider name', 'string', 'provider.name', 'string'), ('provider city', 'string', 'provider.city', 'string'), ('provider state', 'string', 'provider.state', 'string'), ('provider zip code', 'long', 'provider.zip', 'long'), ('hospital referral region description', 'string','rr', 'string'), ('ACC', 'string', 'charges.covered', 'double'), ('ATP', 'string', 'charges.total_pay', 'double'), ('AMP', 'string', 'charges.medicare_pay', 'double')]) medicare_nest_dyf.printSchema()

printSchema 輸出如下:

root |-- drg: string |-- provider: struct | |-- id: long | |-- name: string | |-- city: string | |-- state: string | |-- zip: long |-- rr: string |-- charges: struct | |-- covered: double | |-- total_pay: double | |-- medicare_pay: double

將資料轉回 Spark DataFrame 後,您就可以顯示其樣貌:

medicare_nest_dyf.toDF().show()

其輸出如下:

+--------------------+--------------------+---------------+--------------------+ | drg| provider| rr| charges| +--------------------+--------------------+---------------+--------------------+ |039 - EXTRACRANIA...|[10001,SOUTHEAST ...| AL - Dothan|[32963.07,5777.24...| |039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...| |039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...| |039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...| |039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...| |039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...| |039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...| |039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...| |039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...| |039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...| |039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...| |039 - EXTRACRANIA...|[10055,FLOWERS HO...| AL - Dothan|[39607.28,5356.28...| |039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...| |039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...| |039 - EXTRACRANIA...|[10083,SOUTH BALD...| AL - Mobile|[25411.33,5282.93...| |039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...| |039 - EXTRACRANIA...|[10090,PROVIDENCE...| AL - Mobile|[15895.85,5930.11...| |039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...| |039 - EXTRACRANIA...|[10100,THOMAS HOS...| AL - Mobile|[10710.88,4968.0,...| |039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...| +--------------------+--------------------+---------------+--------------------+ only showing top 20 rows

步驟 5:寫入資料至 Apache Parquet

AWS Glue 可讓您輕鬆以關聯式資料庫能有效取用的格式 (例如 Apache Parquet) 撰寫資料:

glueContext.write_dynamic_frame.from_options( frame = medicare_nest_dyf, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")