Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Contoh kode: Persiapan data menggunakan ResolveChoice, Lambda, dan ApplyMapping
Dataset yang digunakan dalam contoh ini terdiri dari data pembayaran Penyedia Medicare yang diunduh dari dua kumpulan data data.cms.govs3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
.
Anda dapat menemukan kode sumber untuk contoh ini dalam data_cleaning_and_lambda.py
file di GitHub repositori AWS Gluecontoh
Cara yang lebih disukai untuk men-debug Python PySpark atau skrip saat berjalan adalah dengan menggunakan Notebook AWS di Glue Studio. AWS
Langkah 1: Merayapi data di bucket Amazon S3
Masuk ke AWS Management Console dan buka AWS Glue konsol di https://console.aws.amazon.com/glue/
. -
Mengikuti proses yang dijelaskan dalamMengkonfigurasi crawler, buat crawler baru yang dapat merayapi
s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv
file, dan dapat menempatkan metadata yang dihasilkan ke dalam database bernama dalampayments
Katalog Data Glue AWS . -
Jalankan crawler baru tersebut, dan kemudian periksa basis data
payments
. Anda akan menemukan bahwa crawler tersebut telah membuat tabel metadata bernamamedicare
dalam basis data setelah pembacaan awal file untuk menentukan format dan pembatas.Skema baru tabel
medicare
adalah sebagai berikut:Column name Data type ================================================== drg definition string provider id bigint provider name string provider street address string provider city string provider state string provider zip code bigint hospital referral region description string total discharges bigint average covered charges string average total payments string average medicare payments string
Langkah 2: Tambahkan skrip boilerplate ke notebook endpoint pengembangan
Rekatkan skrip boilerplate berikut ke notebook endpoint pengembangan untuk mengimpor AWS Glue pustaka yang Anda butuhkan, dan siapkan satu: GlueContext
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())
Langkah 3: Bandingkan parsing skema yang berbeda
Selanjutnya, Anda dapat melihat apakah skema yang dikenali oleh Apache Spark sama dengan skema DataFrame
yang direkam oleh crawler AndaAWS Glue. Jalankan kode ini:
medicare = spark.read.format( "com.databricks.spark.csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()
Berikut adalah output dari panggilan printSchema
:
root
|-- DRG Definition: string (nullable = true)
|-- Provider Id: string (nullable = true)
|-- Provider Name: string (nullable = true)
|-- Provider Street Address: string (nullable = true)
|-- Provider City: string (nullable = true)
|-- Provider State: string (nullable = true)
|-- Provider Zip Code: integer (nullable = true)
|-- Hospital Referral Region Description: string (nullable = true)
|-- Total Discharges : integer (nullable = true)
|-- Average Covered Charges : string (nullable = true)
|-- Average Total Payments : string (nullable = true)
|-- Average Medicare Payments: string (nullable = true)
Selanjutnya, lihat skema yang AWS Glue DynamicFrame
dihasilkan oleh:
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog( database = "payments", table_name = "medicare") medicare_dynamicframe.printSchema()
Output dari printSchema
adalah sebagai berikut:
root
|-- drg definition: string
|-- provider id: choice
| |-- long
| |-- string
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
DynamicFrame
menghasilkan skema di mana provider id
bisa berupa jenis long
atau string
. Skema DataFrame
mencantumkan skema Provider Id
sebagai jenis string
, dan Katalog Data mencantumkan provider id
sebagai jenis bigint
.
Yang mana yang benar? Ada dua catatan di akhir file (dari 160.000 catatan) dengan nilai string
dalam kolom tersebut. Ini adalah catatan-catatan yang keliru yang diperkenalkan untuk menggambarkan masalah.
Untuk mengatasi masalah semacam ini, AWS Glue DynamicFrame
memperkenalkan konsep jenis pilihan. Dalam kasus ini, DynamicFrame
menunjukkan bahwa nilai long
dan string
dapat muncul di kolom tersebut. AWS GlueCrawler melewatkan string
nilai karena hanya dianggap awalan 2 MB data. DataFrame
Apache Spark dianggap sebagai seluruh set data, tetapi ia dipaksa untuk menetapkan jenis yang paling umum ke kolom, yaitu string
. Sebenarnya, Spark sering menggunakan kasus yang paling umum bila ada jenis atau variasi kompleks yang tidak biasa.
Untuk meng-kueri kolom provider id
, selesaikan jenis pilihan terlebih dahulu. Anda dapat menggunakan metode transformasi resolveChoice
di DynamicFrame
untuk mengkonversi nilai string
ke nilai long
dengan pilihan cast:long
:
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')]) medicare_res.printSchema()
Output printSchema
sekarang:
root
|-- drg definition: string
|-- provider id: long
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
Dimana nilainya adalah a string
yang tidak bisa dilemparkan, AWS Glue dimasukkan anull
.
Pilihan lain adalah mengkonversi jenis pilihan ke struct
, yang menyimpan nilai dari kedua jenis tersebut.
Selanjutnya, lihat baris yang mengalami anomali:
medicare_res.toDF().where("'provider id' is NULL").show()
Jika Anda melihat seperti berikut:
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
| drg definition|provider id| provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|948 - SIGNS & SYM...| null| INC| 1050 DIVISION ST| MAUSTON| WI| 53948| WI - Madison| 12| $11961.41| $4619.00| $3775.33|
|948 - SIGNS & SYM...| null| INC- ST JOSEPH| 5000 W CHAMBERS ST| MILWAUKEE| WI| 53210| WI - Milwaukee| 14| $10514.28| $5562.50| $4522.78|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
Sekarang hapus dua catatan yang cacat, sebagai berikut:
medicare_dataframe = medicare_res.toDF() medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")
Langkah 4: Petakan data dan gunakan fungsi Apache Spark Lambda
AWS Gluebelum secara langsung mendukung fungsi Lambda, juga dikenal sebagai fungsi yang ditentukan pengguna. Tapi Anda selalu dapat mengkonversi DynamicFrame
ke dan dari DataFrame
Apache Spark untuk memanfaatkan dari fungsionalitas Spark selain fitur khusus DynamicFrames
.
Selanjutnya, ubah informasi pembayaran menjadi angka, sehingga mesin analitik seperti Amazon Redshift atau Amazon Athena dapat melakukan penderakan angkanya lebih cepat:
from pyspark.sql.functions import udf from pyspark.sql.types import StringType chop_f = udf(lambda x: x[1:], StringType()) medicare_dataframe = medicare_dataframe.withColumn( "ACC", chop_f( medicare_dataframe["average covered charges"])).withColumn( "ATP", chop_f( medicare_dataframe["average total payments"])).withColumn( "AMP", chop_f( medicare_dataframe["average medicare payments"])) medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
Output dari panggilan show
adalah sebagai berikut:
+--------+-------+-------+
| ACC| ATP| AMP|
+--------+-------+-------+
|32963.07|5777.24|4763.73|
|15131.85|5787.57|4976.71|
|37560.37|5434.95|4453.79|
|13998.28|5417.56|4129.16|
|31633.27|5658.33|4851.44|
|16920.79|6653.80|5374.14|
|11977.13|5834.74|4761.41|
|35841.09|8031.12|5858.50|
|28523.39|6113.38|5228.40|
|75233.38|5541.05|4386.94|
|67327.92|5461.57|4493.57|
|39607.28|5356.28|4408.20|
|22862.23|5374.65|4186.02|
|31110.85|5366.23|4376.23|
|25411.33|5282.93|4383.73|
| 9234.51|5676.55|4509.11|
|15895.85|5930.11|3972.85|
|19721.16|6192.54|5179.38|
|10710.88|4968.00|3898.88|
|51343.75|5996.00|4962.45|
+--------+-------+-------+
only showing top 20 rows
Ini semua masih string dalam data. Kita bisa menggunakan metode transformasi apply_mapping
yang kuat untuk membuang, mengubah nama, mengubah, dan meng-nest data sehingga bahasa pemrograman data dan sistem lain dapat dengan mudah mengaksesnya:
from awsglue.dynamicframe import DynamicFrame medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested") medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'), ('provider id', 'long', 'provider.id', 'long'), ('provider name', 'string', 'provider.name', 'string'), ('provider city', 'string', 'provider.city', 'string'), ('provider state', 'string', 'provider.state', 'string'), ('provider zip code', 'long', 'provider.zip', 'long'), ('hospital referral region description', 'string','rr', 'string'), ('ACC', 'string', 'charges.covered', 'double'), ('ATP', 'string', 'charges.total_pay', 'double'), ('AMP', 'string', 'charges.medicare_pay', 'double')]) medicare_nest_dyf.printSchema()
Output printSchema
adalah sebagai berikut:
root
|-- drg: string
|-- provider: struct
| |-- id: long
| |-- name: string
| |-- city: string
| |-- state: string
| |-- zip: long
|-- rr: string
|-- charges: struct
| |-- covered: double
| |-- total_pay: double
| |-- medicare_pay: double
Mengubah data kembali menjadi DataFrame
Spark, Anda dapat menunjukkan apa yang terlihat seperti sekarang:
medicare_nest_dyf.toDF().show()
Output adalah sebagai berikut:
+--------------------+--------------------+---------------+--------------------+
| drg| provider| rr| charges|
+--------------------+--------------------+---------------+--------------------+
|039 - EXTRACRANIA...|[10001,SOUTHEAST ...| AL - Dothan|[32963.07,5777.24...|
|039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...|
|039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...|
|039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...|
|039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...|
|039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...|
|039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...|
|039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...|
|039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...|
|039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...|
|039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...|
|039 - EXTRACRANIA...|[10055,FLOWERS HO...| AL - Dothan|[39607.28,5356.28...|
|039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...|
|039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...|
|039 - EXTRACRANIA...|[10083,SOUTH BALD...| AL - Mobile|[25411.33,5282.93...|
|039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...|
|039 - EXTRACRANIA...|[10090,PROVIDENCE...| AL - Mobile|[15895.85,5930.11...|
|039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...|
|039 - EXTRACRANIA...|[10100,THOMAS HOS...| AL - Mobile|[10710.88,4968.0,...|
|039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...|
+--------------------+--------------------+---------------+--------------------+
only showing top 20 rows
Langkah 5: Tulis data ke Apache Parquet
AWS Gluemembuatnya mudah untuk menulis data dalam format seperti Apache Parquet yang database relasional dapat secara efektif mengkonsumsi:
glueContext.write_dynamic_frame.from_options( frame = medicare_nest_dyf, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")