Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
DynamicFrame kelas
Salah satu abstraksi utama dalam Apache Spark adalah Spark SQLDataFrame
, yang mirip dengan konstruksi yang ditemukan di R dan DataFrame
Pandas. A DataFrame
mirip dengan tabel dan mendukung operasi dan operasi gaya fungsional (map/reduce/filter/dll.) (Pilih, proyek, agregat). SQL
DataFrames
kuat dan banyak digunakan, tetapi mereka memiliki keterbatasan sehubungan dengan operasi ekstrak, transformasi, dan beban (ETL). Yang paling signifikan, mereka memerlukan skema yang akan ditentukan sebelum data dimuat. Spark SQL mengatasi ini dengan membuat dua lintasan data — yang pertama menyimpulkan skema, dan yang kedua memuat data. Namun demikian, kesimpulan ini terbatas dan tidak mengatasi realitas data yang berantakan. Misalnya, bidang yang sama mungkin mempunyai jenis yang berbeda dalam catatan yang berbeda. Apache Spark sering kali menyerah dan melaporkan jenis sebagai string
dengan menggunakan teks bidang asli. Ini mungkin tidak benar, dan Anda mungkin ingin kontrol yang lebih baik atas bagaimana perbedaan skema diselesaikan. Dan untuk set data besar, pemberian tambahan atas sumber data mungkin mahal.
Untuk mengatasi keterbatasan ini, AWS Glue memperkenalkan. DynamicFrame
Sebuah DynamicFrame
mirip dengan DataFrame
, kecuali bahwa setiap catatan bersifat self-describing, sehingga tidak ada skema yang diperlukan diawal. Sebagai gantinya, AWS Glue menghitung skema on-the-fly bila diperlukan, dan secara eksplisit mengkodekan inkonsistensi skema menggunakan jenis pilihan (atau gabungan). Anda dapat mengatasi inkonsistensi ini untuk membuat set data Anda kompatibel dengan penyimpanan data yang memerlukan sebuah skema tetap.
Demikian pula, DynamicRecord
mewakili sebuah catatan logis dalam sebuah DynamicFrame
. Hal ini seperti sebuah baris dalam DataFrame
Spark, kecuali bahwa ia bersifat self-describing dan dapat digunakan untuk data yang tidak sesuai dengan sebuah skema tetap. Saat menggunakan AWS Glue with PySpark, Anda biasanya tidak memanipulasi secara independenDynamicRecords
. Sebaliknya, Anda akan mengubah kumpulan data bersama-sama melaluinyaDynamicFrame
.
Anda dapat mengonversi DynamicFrames
ke dan dari DataFrames
setelah Anda menyelesaikan inkonsistensi skema.
— konstruksi —
__init__
__init__(jdf, glue_ctx, name)
-
jdf
— Referensi ke bingkai data di Java Virtual Machine (JVM). -
glue_ctx
— Sebuah objek GlueContext kelas. -
name
— Sebuah nama string opsional, secara default kosong.
fromDF
fromDF(dataframe, glue_ctx, name)
Mengkonversi DataFrame
ke DynamicFrame
dengan mengkonversi bidang DataFrame
ke bidang DynamicRecord
. Mengembalikan DynamicFrame
yang baru.
Sebuah DynamicRecord
mewakili catatan logis dalam sebuah DynamicFrame
. Hal ini seperti sebuah baris dalam DataFrame
Spark, kecuali bahwa ia bersifat self-describing dan dapat digunakan untuk data yang tidak sesuai dengan sebuah skema tetap.
Fungsi ini mengharapkan kolom dengan nama duplikat di Anda DataFrame
telah diselesaikan.
-
dataframe
— Apache Spark SQLDataFrame
untuk dikonversi (wajib). -
glue_ctx
— Objek GlueContext kelas yang menentukan konteks untuk transformasi ini (wajib). -
name
— Nama yang dihasilkanDynamicFrame
(opsional sejak AWS Glue 3.0).
toDF
toDF(options)
Mengkonversi sebuah DynamicFrame
ke sebuah DataFrame
Apache Spark dengan mengkonversi bidang DynamicRecords
ke bidang DataFrame
. Mengembalikan DataFrame
yang baru.
Sebuah DynamicRecord
mewakili catatan logis dalam sebuah DynamicFrame
. Hal ini seperti sebuah baris dalam DataFrame
Spark, kecuali bahwa ia bersifat self-describing dan dapat digunakan untuk data yang tidak sesuai dengan sebuah skema tetap.
-
options
— Daftar pilihan. Memungkinkan Anda menentukan opsi tambahan untuk proses konversi. Beberapa opsi valid yang dapat Anda gunakan dengan parameter `options`:-
format
— menentukan format data, seperti json, csv, parket). -
separater or sep
— untuk CSV file, menentukan pembatas. -
header
— untuk CSV file, menunjukkan apakah baris pertama adalah header (benar/salah). -
inferSchema
— mengarahkan Spark untuk menyimpulkan skema secara otomatis (benar/salah).
Berikut adalah contoh penggunaan parameter `options` dengan metode `toDF`:
from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) csv_dyf = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": ["s3://my-bucket/path/to/csv/"]}, format="csv" ) csv_cf = csv_dyf.toDF(options={ "separator": ",", "header": "true", "ïnferSchema": "true" })
Menentukan jenis target jika Anda memilih jenis tindakan
Project
danCast
. Contohnya meliputi hal berikut.>>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])
-
— Informasi —
count
count( )
— Mengembalikan jumlah baris dalam DataFrame
yang mendasari.
schema
schema( )
— Mengembalikan skema dari DynamicFrame
ini, atau jika itu tidak tersedia, skema dari DataFrame
yang mendasari.
Untuk informasi selengkapnya tentang DynamicFrame
jenis yang membentuk skema ini, lihatPySpark jenis ekstensi.
printSchema
printSchema( )
— Mencetak skema dari DataFrame
yang mendasari.
show
show(num_rows)
— Mencetak sejumlah baris yang ditentukan dari DataFrame
yang mendasari.
repartition
repartition(numPartitions)
— Mengembalikan DynamicFrame
yang baru dengan partisi numPartitions
.
coalesce
coalesce(numPartitions)
— Mengembalikan DynamicFrame
yang baru dengan partisi numPartitions
.
— transformasi —
apply_mapping
apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Menerapkan pemetaan deklaratif ke DynamicFrame
dan mengembalikan yang baru DynamicFrame
dengan pemetaan yang diterapkan ke bidang yang Anda tentukan. Bidang yang tidak ditentukan dihilangkan dari yang baru. DynamicFrame
-
mappings
— Daftar tupel pemetaan (wajib). Masing-masing terdiri dari: (kolom sumber, tipe sumber, kolom target, tipe target).Jika kolom sumber memiliki titik "
.
" dalam nama, Anda harus menempatkan backticks "``
" di sekitarnya. Misalnya, untuk memetakanthis.old.name
(string) kethisNewName
, Anda akan menggunakan tupel berikut:("`this.old.name`", "string", "thisNewName", "string")
-
transformation_ctx
— Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional). -
info
— Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional). -
stageThreshold
— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error. -
totalThreshold
— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.
Contoh: Gunakan apply_mapping untuk mengganti nama bidang dan mengubah jenis bidang
Contoh kode berikut menunjukkan cara menggunakan apply_mapping
metode untuk mengganti nama bidang yang dipilih dan mengubah jenis bidang.
catatan
Untuk mengakses kumpulan data yang digunakan dalam contoh ini, lihat Contoh kode: Bergabung dan menghubungkan data dan ikuti petunjuk diLangkah 1: Merayapi data di bucket Amazon S3.
# Example: Use apply_mapping to reshape source data into # the desired column names and types as a new DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Select and rename fields, change field type print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:") persons_mapped = persons.apply_mapping( [ ("family_name", "String", "last_name", "String"), ("name", "String", "first_name", "String"), ("birth_date", "String", "date_of_birth", "Date"), ] ) persons_mapped.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the persons_mapped DynamicFrame, created with apply_mapping: root |-- last_name: string |-- first_name: string |-- date_of_birth: date
drop_fields
drop_fields(paths, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Panggil transformasi FlatMap kelas untuk menghapus bidang dari DynamicFrame
. Mengembalikan sebuah DynamicFrame
baru dengan bidang tertentu yang dibuang.
-
paths
— Daftar string. Masing-masing berisi jalur lengkap ke node bidang yang ingin Anda jatuhkan. Anda dapat menggunakan notasi titik untuk menentukan bidang bersarang. Misalnya, jika bidangfirst
adalah anak dari bidangname
di pohon, Anda menentukan"name.first"
untuk jalurnya.Jika node bidang memiliki literal
.
dalam nama, Anda harus melampirkan nama di backticks ().`
-
transformation_ctx
— Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional). -
info
— Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional). -
stageThreshold
— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error. -
totalThreshold
— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.
Contoh: Gunakan drop_fields untuk menghapus bidang dari DynamicFrame
Contoh kode ini menggunakan drop_fields
metode untuk menghapus bidang tingkat atas dan bersarang yang dipilih dari a. DynamicFrame
Contoh dataset
Contoh menggunakan dataset berikut yang diwakili oleh EXAMPLE-FRIENDS-DATA
tabel dalam kode:
{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []} {"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]} {"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]} {"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}} {"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}
Contoh kode
# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame. # Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name. # Replace EXAMPLE-FRIENDS-DATA with your table name. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame from Glue Data Catalog glue_source_database = "
MY-EXAMPLE-DATABASE
" glue_source_table = "EXAMPLE-FRIENDS-DATA
" friends = glueContext.create_dynamic_frame.from_catalog( database=glue_source_database, table_name=glue_source_table ) print("Schema for friends DynamicFrame before calling drop_fields:") friends.printSchema() # Remove location.county, remove friends.age, remove age friends = friends.drop_fields(paths=["age", "location.county", "friends.age"]) print("Schema for friends DynamicFrame after removing age, county, and friend age:") friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields: root |-- name: string |-- age: int |-- location: struct | |-- state: string | |-- county: string |-- friends: array | |-- element: struct | | |-- name: string | | |-- age: int Schema for friends DynamicFrame after removing age, county, and friend age: root |-- name: string |-- location: struct | |-- state: string |-- friends: array | |-- element: struct | | |-- name: string
filter
filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Mengembalikan baru DynamicFrame
yang berisi semua DynamicRecords
dalam input DynamicFrame
yang memenuhi fungsi f
predikat tertentu.
-
f
— Fungsi predikat yang akan diterapkan padaDynamicFrame
. Fungsi tersebut harus mengambilDynamicRecord
sebagai argumen dan mengembalikan BETUL jikaDynamicRecord
memenuhi persyaratan filter, atau SALAH jika tidak (wajib).Sebuah
DynamicRecord
mewakili catatan logis dalam sebuahDynamicFrame
. Ini mirip dengan baris di SparkDataFrame
, kecuali bahwa itu menggambarkan diri sendiri dan dapat digunakan untuk data yang tidak sesuai dengan skema tetap. -
transformation_ctx
— Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional). -
info
— Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional). -
stageThreshold
— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error. -
totalThreshold
— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.
Contoh: Gunakan filter untuk mendapatkan pilihan bidang yang difilter
Contoh ini menggunakan filter
metode untuk membuat baru DynamicFrame
yang mencakup pemilihan bidang lain DynamicFrame
yang difilter.
Seperti map
metode, filter
mengambil fungsi sebagai argumen yang akan diterapkan untuk setiap record dalam aslinyaDynamicFrame
. Fungsi ini mengambil catatan sebagai input dan mengembalikan nilai Boolean. Jika nilai pengembalian benar, catatan akan dimasukkan dalam hasilDynamicFrame
. Jika salah, catatan ditinggalkan.
catatan
Untuk mengakses kumpulan data yang digunakan dalam contoh ini, lihat Contoh kode: Persiapan data menggunakan ResolveChoice, Lambda, dan ApplyMapping dan ikuti petunjuk diLangkah 1: Merayapi data di bucket Amazon S3.
# Example: Use filter to create a new DynamicFrame # with a filtered selection of records from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create DynamicFrame from Glue Data Catalog medicare = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv" ] }, "csv", {"withHeader": True}, ) # Create filtered DynamicFrame with custom lambda # to filter records by Provider State and Provider City sac_or_mon = medicare.filter( f=lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"] ) # Compare record counts print("Unfiltered record count: ", medicare.count()) print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065 Filtered record count: 564
join
join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Melakukan kesetaraan penggabungan dengan DynamicFrame
yang lain dan mengembalikan DynamicFrame
yang dihasilkan.
-
paths1
— Daftar kunci dalam bingkai ini yang akan digabungkan. -
paths2
— Daftar kunci dalam bingkai lain yang akan digabungkan. -
frame2
—DynamicFrame
yang lainnya yang akan digabungkan. -
transformation_ctx
— Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional). -
info
— Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional). -
stageThreshold
— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error. -
totalThreshold
— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.
Contoh: Gunakan bergabung untuk menggabungkan DynamicFrames
Contoh ini menggunakan join
metode untuk melakukan join pada tigaDynamicFrames
. AWS Glue melakukan gabungan berdasarkan tombol bidang yang Anda berikan. Hasilnya DynamicFrame
berisi baris dari dua frame asli di mana kunci yang ditentukan cocok.
Perhatikan bahwa join
transformasi membuat semua bidang tetap utuh. Ini berarti bahwa bidang yang Anda tentukan untuk dicocokkan muncul di hasil DynamicFrame, meskipun mereka berlebihan dan berisi kunci yang sama. Dalam contoh ini, kita gunakan drop_fields
untuk menghapus kunci redundan ini setelah bergabung.
catatan
Untuk mengakses kumpulan data yang digunakan dalam contoh ini, lihat Contoh kode: Bergabung dan menghubungkan data dan ikuti petunjuk diLangkah 1: Merayapi data di bucket Amazon S3.
# Example: Use join to combine data from three DynamicFrames from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load DynamicFrames from Glue Data Catalog persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json" ) orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() print("Schema for the memberships DynamicFrame:") memberships.printSchema() print("Schema for the orgs DynamicFrame:") orgs.printSchema() # Join persons and memberships by ID persons_memberships = persons.join( paths1=["id"], paths2=["person_id"], frame2=memberships ) # Rename and drop fields from orgs # to prevent field name collisions with persons_memberships orgs = ( orgs.drop_fields(["other_names", "identifiers"]) .rename_field("id", "org_id") .rename_field("name", "org_name") ) # Create final join of all three DynamicFrames legislators_combined = orgs.join( paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships ).drop_fields(["person_id", "org_id"]) # Inspect the schema for the joined data print("Schema for the new legislators_combined DynamicFrame:") legislators_combined.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the memberships DynamicFrame: root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string Schema for the orgs DynamicFrame: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string Schema for the new legislators_combined DynamicFrame: root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string
map
map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Mengembalikan DynamicFrame
baru dihasilkan dari penerapan fungsi pemetaan yang ditentukan untuk semua catatan dalam DynamicFrame
aslinya.
-
f
— Fungsi pemetaan yang akan diterapkan ke semua catatan diDynamicFrame
. Fungsi harus mengambilDynamicRecord
sebagai sebuah argumen dan mengembalikan sebuahDynamicRecord
baru (wajib).Sebuah
DynamicRecord
mewakili catatan logis dalam sebuahDynamicFrame
. Ini mirip dengan baris di Apache SparkDataFrame
, kecuali bahwa itu menggambarkan diri sendiri dan dapat digunakan untuk data yang tidak sesuai dengan skema tetap. transformation_ctx
— Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).info
— String yang dikaitkan dengan kesalahan dalam transformasi (opsional).stageThreshold
— Jumlah maksimum kesalahan yang dapat terjadi dalam transformasi sebelum kesalahan keluar (opsional). Default-nya adalah nol.totalThreshold
— Jumlah maksimum kesalahan yang dapat terjadi secara keseluruhan sebelum memproses kesalahan keluar (opsional). Default-nya adalah nol.
Contoh: Gunakan peta untuk menerapkan fungsi ke setiap catatan dalam DynamicFrame
Contoh ini menunjukkan bagaimana menggunakan map
metode untuk menerapkan fungsi untuk setiap record dari sebuahDynamicFrame
. Secara khusus, contoh ini menerapkan fungsi yang dipanggil MergeAddress
ke setiap catatan untuk menggabungkan beberapa bidang alamat menjadi satu struct
jenis.
catatan
Untuk mengakses kumpulan data yang digunakan dalam contoh ini, lihat Contoh kode: Persiapan data menggunakan ResolveChoice, Lambda, dan ApplyMapping dan ikuti petunjuk diLangkah 1: Merayapi data di bucket Amazon S3.
# Example: Use map to combine fields in all records # of a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema medicare = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]}, "csv", {"withHeader": True}) print("Schema for medicare DynamicFrame:") medicare.printSchema() # Define a function to supply to the map transform # that merges address fields into a single field def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec # Use map to apply MergeAddress to every record mapped_medicare = medicare.map(f = MergeAddress) print("Schema for mapped_medicare DynamicFrame:") mapped_medicare.printSchema()
Schema for medicare DynamicFrame: root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string Schema for mapped_medicare DynamicFrame: root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
mergeDynamicFrame
mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "",
options = {}, info = "", stageThreshold = 0, totalThreshold = 0)
Menggabungkan DynamicFrame
ini dengan pentahapan DynamicFrame
berdasarkan kunci primer yang ditentukan untuk mengidentifikasi catatan. Rekaman duplikat (catatan dengan kunci utama yang sama) tidak dideduplikasi. Jika tidak ada catatan yang cocok dalam bingkai pentahapan, semua catatan (termasuk duplikat) akan dipertahankan dari sumber. Jika bingkai pementasan memiliki catatan yang cocok, catatan dari bingkai pementasan menimpa catatan di sumber AWS Glue.
-
stage_dynamic_frame
— PentahapanDynamicFrame
yang akan digabungkan. -
primary_keys
— Daftar bidang kunci primer untuk mencocokkan catatan dari sumber dan pentahapan bingkai dinamis. -
transformation_ctx
— Sebuah string unik yang digunakan untuk mengambil metadata tentang transformasi saat ini (opsional). -
options
— Serangkaian pasangan JSON nama-nilai yang memberikan informasi tambahan untuk transformasi ini. Argumen ini saat ini tidak digunakan. -
info
— SebuahString
. Setiap string yang akan dikaitkan dengan kesalahan dalam transformasi ini. -
stageThreshold
— SebuahLong
. Jumlah kesalahan dalam transformasi yang ditentukan yang memerlukan pengolahan untuk membersihkan kesalahan. -
totalThreshold
— SebuahLong
. Jumlah total kesalahan hingga dan termasuk transformasi ini yang perlu disalahartikan oleh pemrosesan.
Metode ini mengembalikan baru DynamicFrame
yang diperoleh dengan menggabungkan ini DynamicFrame
dengan DynamicFrame
pementasan.
DynamicFrame
yang dikembalikan berisi catatan A dalam kasus ini:
-
Jika
A
ada di bingkai sumber dan bingkai pentahapan, makaA
dalam bingkai pentahapan akan dikembalikan. -
Jika
A
ada di tabel sumber dan tidakA.primaryKeys
ada distagingDynamicFrame
, tidakA
diperbarui dalam tabel pementasan.
Bingkai sumber dan bingkai pementasan tidak perlu memiliki skema yang sama.
Contoh: Gunakan mergeDynamicFrame untuk menggabungkan dua DynamicFrames
berdasarkan kunci utama
Contoh kode berikut menunjukkan bagaimana menggunakan mergeDynamicFrame
metode untuk menggabungkan DynamicFrame
dengan “pementasan”DynamicFrame
, berdasarkan kunci utama. id
Contoh dataset
Contoh menggunakan dua DynamicFrames
dari yang DynamicFrameCollection
dipanggilsplit_rows_collection
. Berikut ini adalah daftar kunci displit_rows_collection
.
dict_keys(['high', 'low'])
Contoh kode
# Example: Use mergeDynamicFrame to merge DynamicFrames # based on a set of specified primary keys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import SelectFromCollection # Inspect the original DynamicFrames frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low") print("Inspect the DynamicFrame that contains rows where ID < 10") frame_low.toDF().show() frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high") print("Inspect the DynamicFrame that contains rows where ID > 10") frame_high.toDF().show() # Merge the DynamicFrames based on the "id" primary key merged_high_low = frame_high.mergeDynamicFrame( stage_dynamic_frame=frame_low, primary_keys=["id"] ) # View the results where the ID is 1 or 20 print("Inspect the merged DynamicFrame that contains the combined rows") merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 2| 0| fax| 202-225-3307| | 2| 1| phone| 202-225-5731| | 3| 0| fax| 202-225-3307| | 3| 1| phone| 202-225-5731| | 4| 0| fax| 202-225-3307| | 4| 1| phone| 202-225-5731| | 5| 0| fax| 202-225-3307| | 5| 1| phone| 202-225-5731| | 6| 0| fax| 202-225-3307| | 6| 1| phone| 202-225-5731| | 7| 0| fax| 202-225-3307| | 7| 1| phone| 202-225-5731| | 8| 0| fax| 202-225-3307| | 8| 1| phone| 202-225-5731| | 9| 0| fax| 202-225-3307| | 9| 1| phone| 202-225-5731| | 10| 0| fax| 202-225-6328| | 10| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains rows where ID > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| fax| 202-225-6328| | 11| 1| phone| 202-225-4576| | 11| 2| twitter| RepTrentFranks| | 12| 0| fax| 202-225-6328| | 12| 1| phone| 202-225-4576| | 12| 2| twitter| RepTrentFranks| | 13| 0| fax| 202-225-6328| | 13| 1| phone| 202-225-4576| | 13| 2| twitter| RepTrentFranks| | 14| 0| fax| 202-225-6328| | 14| 1| phone| 202-225-4576| | 14| 2| twitter| RepTrentFranks| | 15| 0| fax| 202-225-6328| | 15| 1| phone| 202-225-4576| | 15| 2| twitter| RepTrentFranks| | 16| 0| fax| 202-225-6328| | 16| 1| phone| 202-225-4576| | 16| 2| twitter| RepTrentFranks| | 17| 0| fax| 202-225-6328| | 17| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the merged DynamicFrame that contains the combined rows +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 20| 0| fax| 202-225-5604| | 20| 1| phone| 202-225-6536| | 20| 2| twitter| USRepLong| +---+-----+------------------------+-------------------------+
relationalize
relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Mengkonversi DynamicFrame
ke dalam bentuk yang cocok dalam database relasional. Relationalisasi a DynamicFrame
sangat berguna ketika Anda ingin memindahkan data dari SQL lingkungan No seperti DynamoDB ke database relasional seperti My. SQL
Transformasi menghasilkan daftar bingkai dengan membuka kolom bersarang dan kolom array berputar. Anda dapat menggabungkan kolom array berputar ke tabel root dengan menggunakan tombol gabungan yang dihasilkan selama fase unnest.
root_table_name
— Nama untuk tabel akar.staging_path
— Jalur di mana metode dapat menyimpan partisi tabel berputar dalam CSV format (opsional). Tabel berputar dibaca kembali dari path ini.options
— Kamus parameter opsional.-
transformation_ctx
— Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional). -
info
— Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional). -
stageThreshold
— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error. -
totalThreshold
— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.
Contoh: Gunakan relationalize untuk meratakan skema bersarang di DynamicFrame
Contoh kode ini menggunakan relationalize
metode untuk meratakan skema bersarang menjadi bentuk yang cocok dengan database relasional.
Contoh dataset
Contoh menggunakan DynamicFrame
dipanggil legislators_combined
dengan skema berikut. legislators_combined
memiliki beberapa bidang bersarang sepertilinks
,images
, dancontact_details
, yang akan diratakan oleh transformasi. relationalize
root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string
Contoh kode
# Example: Use relationalize to flatten # a nested schema into a format that fits # into a relational database. # Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Apply relationalize and inspect new tables legislators_relationalized = legislators_combined.relationalize( "l_root", "
s3://DOC-EXAMPLE-BUCKET/tmpDir
" ) legislators_relationalized.keys() # Compare the schema of the contact_details # nested field to the new relationalized table that # represents it legislators_combined.select_fields("contact_details").printSchema() legislators_relationalized.select("l_root_contact_details").toDF().where( "id = 10 or id = 75" ).orderBy(["id", "index"]).show()
Output berikut memungkinkan Anda membandingkan skema bidang bersarang yang dipanggil contact_details
ke tabel yang dibuat oleh relationalize
transformasi. Perhatikan bahwa catatan tabel menautkan kembali ke tabel utama menggunakan kunci asing yang disebut id
dan index
kolom yang mewakili posisi array.
dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers']) root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| 202-225-4160| | 10| 1| phone| 202-225-3436| | 75| 0| fax| 202-225-6791| | 75| 1| phone| 202-225-2861| | 75| 2| twitter| RepSamFarr| +---+-----+------------------------+-------------------------+
rename_field
rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Mengganti nama bidang dalam DynamicFrame
hal ini dan mengembalikan DynamicFrame
baru dengan bidang yang diganti namanya.
-
oldName
— Jalur lengkap ke simpul yang ingin Anda ganti namanya.Jika nama lama memiliki titik di dalamnya,
RenameField
tidak berfungsi kecuali Anda menempatkan backticks di sekitarnya ()`
. Sebagai contoh, untuk menggantikanthis.old.name
denganthisNewName
, Anda akan memanggil rename_field sebagai berikut.newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
-
newName
— Nama baru, sebagai path lengkap. -
transformation_ctx
— Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional). -
info
— Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional). -
stageThreshold
— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error. -
totalThreshold
— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.
Contoh: Gunakan rename_field untuk mengganti nama bidang dalam DynamicFrame
Contoh kode ini menggunakan rename_field
metode untuk mengganti nama bidang dalamDynamicFrame
. Perhatikan bahwa contoh menggunakan metode chaining untuk mengganti nama beberapa bidang secara bersamaan.
catatan
Untuk mengakses kumpulan data yang digunakan dalam contoh ini, lihat Contoh kode: Bergabung dan menghubungkan data dan ikuti petunjuk diLangkah 1: Merayapi data di bucket Amazon S3.
Contoh kode
# Example: Use rename_field to rename fields # in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Inspect the original orgs schema orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Original orgs schema: ") orgs.printSchema() # Rename fields and view the new schema orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name") print("New orgs schema with renamed fields: ") orgs.printSchema()
Original orgs schema: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string New orgs schema with renamed fields: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- classification: string |-- org_id: string |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string
resolveChoice
resolveChoice(specs = None, choice = "" , database = None , table_name = None ,
transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id =
None)
Mengubah jenis pilihan dalam DynamicFrame
ini dan mengembalikan DynamicFrame
.
-
specs
— Daftar ambiguitas spesifik yang akan diubah, masing-masing dalam bentuk tupel:(field_path, action)
.Ada dua cara untuk menggunakan
resolveChoice
. Yang pertama adalah dengan menggunakan argumenspecs
untuk menentukan urutan bidang tertentu dan cara mengubahnya. Mode lainnyaresolveChoice
adalah menggunakanchoice
argumen untuk menentukan resolusi tunggal untuk semuaChoiceTypes
.Nilai untuk
specs
ditetapkan sebagai tupel terdiri dari pasangan(field_path, action)
. Nilaifield_path
mengidentifikasi elemen ambigu tertentu, dan nilaiaction
mengidentifikasi resolusi yang sesuai. Berikut ini adalah tindakan yang mungkin:-
cast:
— Upaya untuk mengubah semua nilai ke jenis tertentu. Sebagai contoh:type
cast:int
. -
make_cols
— Mengkonversi setiap jenis yang berbeda menjadi kolom dengan nama
. Ini menyelesaikan ambiguitas potensial dengan meratakan data. Misalnya, jikacolumnName
_type
columnA
bisa berupaint
ataustring
, maka resolusi akan menghasilkan dua kolom bernamacolumnA_int
dancolumnA_string
dalamDynamicFrame
yang dihasilkan. -
make_struct
— Mengubah ambiguitas potensial dengan menggunakanstruct
untuk mewakili data. Misalnya, jika data dalam kolom bisa berupaint
atau astring
,make_struct
tindakan menghasilkan kolom struktur yang dihasilkanDynamicFrame
. Setiap struktur berisi aint
dan astring
. -
project:
— Mengubah ambiguitas potensial dengan memproyeksikan semua data ke salah satu jenis data yang mungkin. Sebagai contoh, jika data dalam kolom bisa berupatype
int
ataustring
, dengan menggunakan tindakanproject:string
menghasilkan kolom dalamDynamicFrame
yang dihasilkan di mana semua nilaiint
telah dikonversi menjadi string.
Jika
field_path
mengidentifikasi sebuah array, menempatkan kurung persegi kosong setelah nama array untuk menghindari ambiguitas. Misalnya, anggap Anda bekerja dengan data yang terstruktur sebagai berikut:"myList": [ { "price": 100.00 }, { "price": "$100.00" } ]
Anda dapat memilih numerik daripada versi string harga dengan menyetel
field_path
ke"myList[].price"
, dan menyetelaction
ke"cast:double"
.catatan
Anda hanya dapat menggunakan salah satu
choice
parameterspecs
dan. Jika parameterspecs
bukanNone
, maka parameterchoice
harus string kosong. Sebaliknya, jikachoice
bukan string kosong, makaspecs
parameternya harusNone
. -
choice
— Menentukan resolusi tunggal untuk semuaChoiceTypes
. Anda dapat menggunakan ini dalam kasus di mana daftar lengkap tidakChoiceTypes
diketahui sebelum runtime. Selain tindakan-tindakan yang tercantum sebelumnya untukspecs
, argumen ini juga mendukung tindakan berikut:-
match_catalog
— Upaya untuk mengubah setiapChoiceType
menjadi jenis yang sesuai dalam tabel Katalog Data yang ditentukan.
-
-
database
— Basis data Katalog Data yang akan digunakan dengan tindakanmatch_catalog
. -
table_name
— Tabel Katalog Data yang akan digunakan dengan tindakanmatch_catalog
. -
transformation_ctx
— Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional). -
info
— Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional). -
stageThreshold
— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error. -
totalThreshold
— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error out. -
catalog_id
— ID katalog Katalog Data yang sedang diakses (ID akun Katalog Data). Ketika diatur keNone
(nilai default), ia menggunakan ID katalog akun yang memanggil.
Contoh: Gunakan resolveChoice untuk menangani kolom yang berisi beberapa jenis
Contoh kode ini menggunakan resolveChoice
metode untuk menentukan cara menangani DynamicFrame
kolom yang berisi nilai-nilai dari beberapa jenis. Contoh menunjukkan dua cara umum untuk menangani kolom dengan jenis yang berbeda:
Transmisikan kolom ke satu tipe data.
Pertahankan semua jenis di kolom terpisah.
Contoh dataset
catatan
Untuk mengakses kumpulan data yang digunakan dalam contoh ini, lihat Contoh kode: Persiapan data menggunakan ResolveChoice, Lambda, dan ApplyMapping dan ikuti petunjuk diLangkah 1: Merayapi data di bucket Amazon S3.
Contoh menggunakan DynamicFrame
dipanggil medicare
dengan skema berikut:
root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string
Contoh kode
# Example: Use resolveChoice to handle # a column that contains multiple types from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input data and inspect the "provider id" column medicare = glueContext.create_dynamic_frame.from_catalog( database="payments", table_name="medicare_hospital_provider_csv" ) print("Inspect the provider id column:") medicare.toDF().select("provider id").show() # Cast provider id to type long medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")]) print("Schema after casting provider id to type long:") medicare_resolved_long.printSchema() medicare_resolved_long.toDF().select("provider id").show() # Create separate columns # for each provider id type medicare_resolved_cols = medicare.resolveChoice(choice="make_cols") print("Schema after creating separate columns for each type:") medicare_resolved_cols.printSchema() medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column: +-----------+ |provider id| +-----------+ | [10001,]| | [10005,]| | [10006,]| | [10011,]| | [10016,]| | [10023,]| | [10029,]| | [10033,]| | [10039,]| | [10040,]| | [10046,]| | [10055,]| | [10056,]| | [10078,]| | [10083,]| | [10085,]| | [10090,]| | [10092,]| | [10100,]| | [10103,]| +-----------+ only showing top 20 rows Schema after casting 'provider id' to type long: root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +-----------+ |provider id| +-----------+ | 10001| | 10005| | 10006| | 10011| | 10016| | 10023| | 10029| | 10033| | 10039| | 10040| | 10046| | 10055| | 10056| | 10078| | 10083| | 10085| | 10090| | 10092| | 10100| | 10103| +-----------+ only showing top 20 rows Schema after creating separate columns for each type: root |-- drg definition: string |-- provider id_string: string |-- provider id_long: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +----------------+------------------+ |provider id_long|provider id_string| +----------------+------------------+ | 10001| null| | 10005| null| | 10006| null| | 10011| null| | 10016| null| | 10023| null| | 10029| null| | 10033| null| | 10039| null| | 10040| null| | 10046| null| | 10055| null| | 10056| null| | 10078| null| | 10083| null| | 10085| null| | 10090| null| | 10092| null| | 10100| null| | 10103| null| +----------------+------------------+ only showing top 20 rows
select_fields
select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Mengembalikan baru DynamicFrame
yang berisi bidang yang dipilih.
-
paths
— Daftar string. Setiap string adalah jalur ke node tingkat atas yang ingin Anda pilih. -
transformation_ctx
— Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional). -
info
— Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional). -
stageThreshold
— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error. -
totalThreshold
— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.
Contoh: Gunakan select_fields untuk membuat baru DynamicFrame
dengan bidang yang dipilih
Contoh kode berikut menunjukkan cara menggunakan select_fields
metode untuk membuat yang baru DynamicFrame
dengan daftar bidang yang dipilih dari yang sudah adaDynamicFrame
.
catatan
Untuk mengakses kumpulan data yang digunakan dalam contoh ini, lihat Contoh kode: Bergabung dan menghubungkan data dan ikuti petunjuk diLangkah 1: Merayapi data di bucket Amazon S3.
# Example: Use select_fields to select specific fields from a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Create a new DynamicFrame with chosen fields names = persons.select_fields(paths=["family_name", "given_name"]) print("Schema for the names DynamicFrame, created with select_fields:") names.printSchema() names.toDF().show()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the names DynamicFrame: root |-- family_name: string |-- given_name: string +-----------+----------+ |family_name|given_name| +-----------+----------+ | Collins| Michael| | Huizenga| Bill| | Clawson| Curtis| | Solomon| Gerald| | Rigell| Edward| | Crapo| Michael| | Hutto| Earl| | Ertel| Allen| | Minish| Joseph| | Andrews| Robert| | Walden| Greg| | Kazen| Abraham| | Turner| Michael| | Kolbe| James| | Lowenthal| Alan| | Capuano| Michael| | Schrader| Kurt| | Nadler| Jerrold| | Graves| Tom| | McMillan| John| +-----------+----------+ only showing top 20 rows
sederhana_ddb_json
simplify_ddb_json(): DynamicFrame
Menyederhanakan kolom bersarang dalam DynamicFrame
yang secara khusus dalam JSON struktur DynamoDB, dan mengembalikan disederhanakan baru. DynamicFrame
Jika ada beberapa jenis atau tipe Peta dalam tipe Daftar, elemen dalam Daftar tidak akan disederhanakan. Perhatikan bahwa ini adalah jenis transformasi tertentu yang berperilaku berbeda dari unnest
transformasi biasa dan mengharuskan data sudah berada dalam struktur DynamoDBJSON. Untuk informasi selengkapnya, lihat DynamoDB JSON.
Misalnya, skema pembacaan ekspor dengan struktur JSON DynamoDB mungkin terlihat seperti berikut:
root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean
simplify_ddb_json()
Transformasi akan mengubah ini menjadi:
root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
Contoh: Gunakan simplify_ddb_json untuk memanggil DynamoDB menyederhanakan JSON
Contoh kode ini menggunakan simplify_ddb_json
metode untuk menggunakan konektor ekspor AWS Glue DynamoDB, memanggil DynamoDB menyederhanakan, dan mencetak JSON jumlah partisi.
Contoh kode
from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "dynamodb", connection_options = { 'dynamodb.export': 'ddb', 'dynamodb.tableArn': '<table arn>', 'dynamodb.s3.bucket': '<bucket name>', 'dynamodb.s3.prefix': '<bucket prefix>', 'dynamodb.s3.bucketOwner': '<account_id of bucket>' } ) simplified = dynamicFrame.simplify_ddb_json() print(simplified.getNumPartitions())
spigot
spigot(path, options={})
Menulis catatan sampel ke tujuan tertentu untuk membantu Anda memverifikasi transformasi yang dilakukan oleh pekerjaan Anda.
-
path
— Jalur tujuan untuk menulis (wajib). -
options
— Pasangan kunci-nilai yang menentukan opsi (opsional). Pilihan"topk"
menentukan bahwa catatank
pertama harus ditulis."prob"
Opsi menentukan probabilitas (sebagai desimal) untuk memilih catatan yang diberikan. Anda dapat menggunakannya dalam memilih catatan untuk menulis. transformation_ctx
— Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).
Contoh: Gunakan keran untuk menulis bidang contoh dari a DynamicFrame
ke Amazon S3
Contoh kode ini menggunakan spigot
metode untuk menulis catatan sampel ke bucket Amazon S3 setelah menerapkan transformasi. select_fields
Contoh dataset
catatan
Untuk mengakses kumpulan data yang digunakan dalam contoh ini, lihat Contoh kode: Bergabung dan menghubungkan data dan ikuti petunjuk diLangkah 1: Merayapi data di bucket Amazon S3.
Contoh menggunakan DynamicFrame
dipanggil persons
dengan skema berikut:
root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string
Contoh kode
# Example: Use spigot to write sample records # to a destination during a transformation # from pyspark.context import SparkContext. # Replace DOC-EXAMPLE-BUCKET with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load table data into a DynamicFrame persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) # Perform the select_fields on the DynamicFrame persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"]) # Use spigot to write a sample of the transformed data # (the first 10 records) spigot_output = persons.spigot( path="
s3://DOC-EXAMPLE-BUCKET
", options={"topk": 10} )
Berikut ini adalah contoh data yang spigot
menulis ke Amazon S3. Karena kode contoh yang ditentukanoptions={"topk": 10}
, data sampel berisi 10 catatan pertama.
{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"} {"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"} {"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"} {"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"} {"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"} {"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"} {"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"} {"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"} {"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"} {"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}
split_fields
split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
Mengembalikan baru DynamicFrameCollection
yang berisi duaDynamicFrames
. Yang pertama DynamicFrame
berisi semua node yang telah dipisahkan, dan yang kedua berisi node yang tersisa.
-
paths
— Daftar string, masing-masing merupakan path lengkap ke sebuah simpul yang ingin Anda bagi menjadi sebuahDynamicFrame
. -
name1
— Sebuah string nama untukDynamicFrame
yang terbelah. -
name2
— Sebuah string nama untukDynamicFrame
yang tersisa setelah simpul yang ditentukan telah dipecah. -
transformation_ctx
— Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional). -
info
— Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional). -
stageThreshold
— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error. -
totalThreshold
— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.
Contoh: Gunakan split_fields untuk membagi bidang yang dipilih menjadi terpisah DynamicFrame
Contoh kode ini menggunakan split_fields
metode untuk membagi daftar bidang tertentu menjadi terpisahDynamicFrame
.
Contoh dataset
Contoh menggunakan panggilan DynamicFrame
l_root_contact_details
yang berasal dari koleksi bernamalegislators_relationalized
.
l_root_contact_details
memiliki skema dan entri berikut.
root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| ...
Contoh kode
# Example: Use split_fields to split selected # fields into a separate DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input DynamicFrame and inspect its schema frame_to_split = legislators_relationalized.select("l_root_contact_details") print("Inspect the input DynamicFrame schema:") frame_to_split.printSchema() # Split id and index fields into a separate DynamicFrame split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right") # Inspect the resulting DynamicFrames print("Inspect the schemas of the DynamicFrames created with split_fields:") split_fields_collection.select("left").printSchema() split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema: root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string Inspect the schemas of the DynamicFrames created with split_fields: root |-- id: long |-- index: int root |-- contact_details.val.type: string |-- contact_details.val.value: string
split_rows
split_rows(comparison_dict, name1, name2, transformation_ctx="", info="",
stageThreshold=0, totalThreshold=0)
Membagi satu atau beberapa baris dalam DynamicFrame
ke sebuah DynamicFrame
yang baru.
Metode mengembalikan baru DynamicFrameCollection
yang berisi duaDynamicFrames
. Yang pertama DynamicFrame
berisi semua baris yang telah dipisahkan, dan yang kedua berisi baris yang tersisa.
-
comparison_dict
— Kamus di mana kuncinya adalah jalur ke kolom, dan nilainya adalah kamus lain untuk pemetaan pembanding ke nilai yang dibandingkan dengan nilai kolom. Misalnya,{"age": {">": 10, "<": 20}}
membagi semua baris yang nilainya di kolom usia lebih besar dari 10 dan kurang dari 20. -
name1
— Sebuah string nama untukDynamicFrame
yang terbelah. -
name2
— Sebuah string nama untukDynamicFrame
yang tersisa setelah simpul yang ditentukan telah dipecah. -
transformation_ctx
— Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional). -
info
— Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional). -
stageThreshold
— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error. -
totalThreshold
— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.
Contoh: Gunakan split_rows untuk membagi baris dalam DynamicFrame
Contoh kode ini menggunakan split_rows
metode untuk membagi baris DynamicFrame
berdasarkan nilai id
bidang.
Contoh dataset
Contoh menggunakan panggilan DynamicFrame
l_root_contact_details
yang dipilih dari koleksi bernamalegislators_relationalized
.
l_root_contact_details
memiliki skema dan entri berikut.
root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+
Contoh kode
# Example: Use split_rows to split up # rows in a DynamicFrame based on value from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Retrieve the DynamicFrame to split frame_to_split = legislators_relationalized.select("l_root_contact_details") # Split up rows by ID split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low") # Inspect the resulting DynamicFrames print("Inspect the DynamicFrame that contains IDs < 10") split_rows_collection.select("low").toDF().show() print("Inspect the DynamicFrame that contains IDs > 10") split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains IDs > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| phone| 202-225-5476| | 11| 1| twitter| RepDavidYoung| | 12| 0| phone| 202-225-4035| | 12| 1| twitter| RepStephMurphy| | 13| 0| fax| 202-226-0774| | 13| 1| phone| 202-225-6335| | 14| 0| fax| 202-226-0774| | 14| 1| phone| 202-225-6335| | 15| 0| fax| 202-226-0774| | 15| 1| phone| 202-225-6335| | 16| 0| fax| 202-226-0774| | 16| 1| phone| 202-225-6335| | 17| 0| fax| 202-226-0774| | 17| 1| phone| 202-225-6335| | 18| 0| fax| 202-226-0774| | 18| 1| phone| 202-225-6335| | 19| 0| fax| 202-226-0774| | 19| 1| phone| 202-225-6335| | 20| 0| fax| 202-226-0774| | 20| 1| phone| 202-225-6335| +---+-----+------------------------+-------------------------+ only showing top 20 rows
unbox
unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)
Membuka kotak (memformat ulang) bidang string di a DynamicFrame
dan mengembalikan yang baru yang berisi kotak yang DynamicFrame
tidak dikotaknya. DynamicRecords
Sebuah DynamicRecord
mewakili catatan logis dalam sebuah DynamicFrame
. Ini mirip dengan baris di Apache SparkDataFrame
, kecuali bahwa itu menggambarkan diri sendiri dan dapat digunakan untuk data yang tidak sesuai dengan skema tetap.
-
path
— Sebuah path lengkap ke simpul string ingin Anda buka kotaknya. format
— Format spesifikasi (opsional). Anda menggunakan ini untuk Amazon S3 atau AWS Glue koneksi yang mendukung berbagai format. Untuk format yang didukung, lihatOpsi format data untuk input dan output untuk Spark AWS Glue.-
transformation_ctx
— Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional). -
info
— Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional). -
stageThreshold
— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error. -
totalThreshold
— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error. -
options
— Satu atau beberapa hal berikut:separator
— String yang berisi karakter pemisah.escaper
— Sebuah string yang berisi karakter escape.skipFirst
— Nilai Boolean yang menunjukkan apakah akan melewatkan contoh pertama.-
withSchema
— Sebuah string yang berisi JSON representasi dari skema node. Format JSON representasi skema didefinisikan oleh output dari.StructType.json()
withHeader
— Nilai Boolean yang menunjukkan apakah header disertakan.
Contoh: Gunakan unbox untuk membuka kotak bidang string ke dalam struct
Contoh kode ini menggunakan unbox
metode untuk membuka kotak, atau memformat ulang, bidang string dalam bidang DynamicFrame
ke dalam bidang tipe struct.
Contoh dataset
Contoh menggunakan DynamicFrame
dipanggil mapped_with_string
dengan skema dan entri berikut.
Perhatikan bidang bernamaAddressString
. Ini adalah bidang yang membuka kotak contoh menjadi struct.
root |-- Average Total Payments: string |-- AddressString: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| ...
Contoh kode
# Example: Use unbox to unbox a string field # into a struct in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) unboxed = mapped_with_string.unbox("AddressString", "json") unboxed.printSchema() unboxed.toDF().show()
root |-- Average Total Payments: string |-- AddressString: struct | |-- Street: string | |-- City: string | |-- State: string | |-- Zip.Code: string | |-- Array: array | | |-- element: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| | $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...| | $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...| | $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...| | $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...| | $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL| | $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...| | $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...| | $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL| | $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...| | $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...| | $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...| | $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...| | $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL| | $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...| | $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL| | $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ only showing top 20 rows
serikat
union(frame1, frame2, transformation_ctx = "",
info = "", stageThreshold = 0, totalThreshold = 0)
Serikat dua DynamicFrames. Pengembalian DynamicFrame yang berisi semua catatan dari kedua input DynamicFrames. Transformasi ini dapat mengembalikan hasil yang berbeda dari penyatuan dua DataFrames dengan data yang setara. Jika Anda membutuhkan perilaku DataFrame serikat Spark, pertimbangkan untuk menggunakantoDF
.
-
frame1
Pertama DynamicFrame ke serikat pekerja. -
frame2
Kedua setelah DynamicFrame serikat pekerja. -
transformation_ctx
— (opsional) String unik yang digunakan untuk mengidentifikasi statistik/informasi negara -
info
— (opsional) String apa pun yang akan dikaitkan dengan kesalahan dalam transformasi -
stageThreshold
— (opsional) Jumlah maksimum kesalahan dalam transformasi sampai pemrosesan akan error -
totalThreshold
— (opsional) Jumlah maksimum kesalahan total sampai pemrosesan akan error.
unnest
unnest(transformation_ctx="", info="", stageThreshold=0,
totalThreshold=0)
Menyingkirkan objek bersarang di aDynamicFrame
, yang menjadikannya objek tingkat atas, dan mengembalikan unnested baru. DynamicFrame
-
transformation_ctx
— Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional). -
info
— Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional). -
stageThreshold
— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error. -
totalThreshold
— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.
Contoh: Gunakan unnest untuk mengubah bidang bersarang menjadi bidang tingkat atas
Contoh kode ini menggunakan unnest
metode untuk meratakan semua bidang bersarang di dalam bidang tingkat atas. DynamicFrame
Contoh dataset
Contoh menggunakan DynamicFrame
dipanggil mapped_medicare
dengan skema berikut. Perhatikan bahwa Address
bidang adalah satu-satunya bidang yang berisi data bersarang.
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
Contoh kode
# Example: Use unnest to unnest nested # objects in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Unnest all nested fields unnested = mapped_medicare.unnest() unnested.printSchema()
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address.Zip.Code: string |-- Address.City: string |-- Address.Array: array | |-- element: string |-- Address.State: string |-- Address.Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string
unnest_ddb_json
Unnests kolom bersarang di a DynamicFrame
yang secara khusus dalam JSON struktur DynamoDB, dan mengembalikan unnested baru. DynamicFrame
Kolom yang terdiri dari array tipe struct tidak akan di-unnested. Perhatikan bahwa ini adalah jenis transformasi unnesting tertentu yang berperilaku berbeda dari unnest
transformasi biasa dan mengharuskan data sudah berada dalam struktur DynamoDB. JSON Untuk informasi selengkapnya, lihat DynamoDB JSON.
unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
-
transformation_ctx
— Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional). -
info
— Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional). -
stageThreshold
— Jumlah kesalahan yang dihadapi selama transformasi ini di mana proses mengeluarkan kesalahan (opsional: nol secara default, menunjukkan bahwa proses tidak mengeluarkan kesalahan). -
totalThreshold
— Jumlah kesalahan yang dihadapi hingga dan termasuk transformasi ini di mana proses mengeluarkan kesalahan (opsional: nol secara default, menunjukkan bahwa proses tidak mengeluarkan kesalahan).
Misalnya, skema pembacaan ekspor dengan struktur JSON DynamoDB mungkin terlihat seperti berikut:
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
unnest_ddb_json()
Transformasi akan mengubah ini menjadi:
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
Contoh kode berikut menunjukkan cara menggunakan konektor ekspor AWS Glue DynamoDB, memanggil DynamoDB unnest, dan mencetak JSON jumlah partisi:
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()
tulis
write(connection_type, connection_options, format, format_options, accumulator_size)
Mendapatkan DataSink(objek) dari jenis koneksi yang ditentukan dari GlueContext kelas dari DynamicFrame
ini, dan menggunakannya untuk memformat dan menulis isi dari DynamicFrame
ini. Mengembalikan DynamicFrame
baru yang sudah diformat dan ditulis sebagaimana ditentukan.
-
connection_type
— Jenis koneksi yang akan digunakan. Nilai yang valid termasuks3
,mysql
,postgresql
,redshift
,sqlserver
, danoracle
. -
connection_options
— Opsi koneksi yang akan digunakan (opsional). Untukconnection_type
daris3
, path Amazon S3 didefinisikan.connection_options = {"path": "
s3://aws-glue-target/temp
"}Untuk JDBC koneksi, beberapa properti harus didefinisikan. Perhatikan bahwa nama database harus menjadi bagian dariURL. Secara opsional dapat disertakan dalam opsi koneksi.
Awas
Menyimpan kata sandi dalam skrip Anda tidak disarankan. Pertimbangkan
boto3
untuk menggunakan untuk mengambilnya dari AWS Secrets Manager atau Katalog Data AWS Glue.connection_options = {"url": "
jdbc-url/database
", "user": "username
", "password":passwordVariable
,"dbtable": "table-name
", "redshiftTmpDir": "s3-tempdir-path
"} format
— Format spesifikasi (opsional). Ini digunakan untuk Amazon Simple Storage Service (Amazon S3) atau AWS Glue koneksi yang mendukung berbagai format. Lihat Opsi format data untuk input dan output untuk Spark AWS Glue untuk format yang didukung.format_options
— Pilihan format untuk format yang ditentukan. Lihat Opsi format data untuk input dan output untuk Spark AWS Glue untuk format yang didukung.accumulator_size
— Ukuran akumulasi untuk digunakan, dalam byte (opsional).
— kesalahan —
assertErrorThreshold
assertErrorThreshold( )
— Sebuah pernyataan untuk kesalahan dalam transformasi yang menciptakan DynamicFrame
ini. Mengembalikan sebuah Exception
dari DataFrame
yang mendasari.
errorsAsDynamicBingkai
errorsAsDynamicFrame( )
— Mengembalikan DynamicFrame
yang memiliki catatan kesalahan yang di-nest di dalamnya.
Contoh: Gunakan errorsAsDynamic Frame untuk melihat catatan kesalahan
Contoh kode berikut menunjukkan cara menggunakan errorsAsDynamicFrame
metode untuk melihat catatan kesalahan untuk fileDynamicFrame
.
Contoh dataset
Contoh menggunakan kumpulan data berikut yang dapat Anda unggah ke Amazon JSON S3 sebagai. Perhatikan bahwa catatan kedua cacat. Data yang salah bentuk biasanya merusak penguraian file saat Anda menggunakan Spark. SQL Namun, DynamicFrame
mengenali masalah malformasi dan mengubah garis cacat menjadi catatan kesalahan yang dapat Anda tangani satu per satu.
{"id": 1, "name": "george", "surname": "washington", "height": 178} {"id": 2, "name": "benjamin", "surname": "franklin", {"id": 3, "name": "alexander", "surname": "hamilton", "height": 171} {"id": 4, "name": "john", "surname": "jay", "height": 190}
Contoh kode
# Example: Use errorsAsDynamicFrame to view error records. # Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create errors DynamicFrame, view schema errors = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["
s3://DOC-EXAMPLE-S3-BUCKET/error_data.json
"]}, "json" ) print("Schema of errors DynamicFrame:") errors.printSchema() # Show that errors only contains valid entries from the dataset print("errors contains only valid records from the input dataset (2 of 4 records)") errors.toDF().show() # View errors print("Errors count:", str(errors.errorsCount())) print("Errors:") errors.errorsAsDynamicFrame().toDF().show() # View error fields and error data error_record = errors.errorsAsDynamicFrame().toDF().head() error_fields = error_record["error"] print("Error fields: ") print(error_fields.asDict().keys()) print("\nError record data:") for key in error_fields.asDict().keys(): print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame: root |-- id: int |-- name: string |-- surname: string |-- height: int errors contains only valid records from the input dataset (2 of 4 records) +---+------+----------+------+ | id| name| surname|height| +---+------+----------+------+ | 1|george|washington| 178| | 4| john| jay| 190| +---+------+----------+------+ Errors count: 1 Errors: +--------------------+ | error| +--------------------+ |[[ File "/tmp/20...| +--------------------+ Error fields: dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord']) Error record data: callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='') msg : error in jackson reader stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at scala.collection.Iterator$$anon$9.next(Iterator.scala:162) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202) at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116) at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109) at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247) at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) input : bytesread : 252 source : dynamicRecord : Row(id=2, name='benjamin', surname='franklin')
errorsCount
errorsCount( )
— Mengembalikan jumlah kesalahan dalam sebuah DynamicFrame
.
stageErrorsCount
stageErrorsCount
— Mengembalikan jumlah kesalahan yang terjadi dalam proses membuat DynamicFrame
ini.