DynamicFrame kelas - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

DynamicFrame kelas

Salah satu abstraksi utama dalam Apache Spark adalah Spark SQLDataFrame, yang mirip dengan konstruksi yang ditemukan di R dan DataFrame Pandas. A DataFrame mirip dengan tabel dan mendukung operasi dan operasi gaya fungsional (map/reduce/filter/dll.) (Pilih, proyek, agregat). SQL

DataFrameskuat dan banyak digunakan, tetapi mereka memiliki keterbatasan sehubungan dengan operasi ekstrak, transformasi, dan beban (ETL). Yang paling signifikan, mereka memerlukan skema yang akan ditentukan sebelum data dimuat. Spark SQL mengatasi ini dengan membuat dua lintasan data — yang pertama menyimpulkan skema, dan yang kedua memuat data. Namun demikian, kesimpulan ini terbatas dan tidak mengatasi realitas data yang berantakan. Misalnya, bidang yang sama mungkin mempunyai jenis yang berbeda dalam catatan yang berbeda. Apache Spark sering kali menyerah dan melaporkan jenis sebagai string dengan menggunakan teks bidang asli. Ini mungkin tidak benar, dan Anda mungkin ingin kontrol yang lebih baik atas bagaimana perbedaan skema diselesaikan. Dan untuk set data besar, pemberian tambahan atas sumber data mungkin mahal.

Untuk mengatasi keterbatasan ini, AWS Glue memperkenalkan. DynamicFrame Sebuah DynamicFrame mirip dengan DataFrame, kecuali bahwa setiap catatan bersifat self-describing, sehingga tidak ada skema yang diperlukan diawal. Sebagai gantinya, AWS Glue menghitung skema on-the-fly bila diperlukan, dan secara eksplisit mengkodekan inkonsistensi skema menggunakan jenis pilihan (atau gabungan). Anda dapat mengatasi inkonsistensi ini untuk membuat set data Anda kompatibel dengan penyimpanan data yang memerlukan sebuah skema tetap.

Demikian pula, DynamicRecord mewakili sebuah catatan logis dalam sebuah DynamicFrame. Hal ini seperti sebuah baris dalam DataFrame Spark, kecuali bahwa ia bersifat self-describing dan dapat digunakan untuk data yang tidak sesuai dengan sebuah skema tetap. Saat menggunakan AWS Glue with PySpark, Anda biasanya tidak memanipulasi secara independenDynamicRecords. Sebaliknya, Anda akan mengubah kumpulan data bersama-sama melaluinyaDynamicFrame.

Anda dapat mengonversi DynamicFrames ke dan dari DataFrames setelah Anda menyelesaikan inkonsistensi skema.

 — konstruksi —

__init__

__init__(jdf, glue_ctx, name)
  • jdf— Referensi ke bingkai data di Java Virtual Machine (JVM).

  • glue_ctx — Sebuah objek GlueContext kelas.

  • name — Sebuah nama string opsional, secara default kosong.

fromDF

fromDF(dataframe, glue_ctx, name)

Mengkonversi DataFrame ke DynamicFrame dengan mengkonversi bidang DataFrame ke bidang DynamicRecord. Mengembalikan DynamicFrame yang baru.

Sebuah DynamicRecord mewakili catatan logis dalam sebuah DynamicFrame. Hal ini seperti sebuah baris dalam DataFrame Spark, kecuali bahwa ia bersifat self-describing dan dapat digunakan untuk data yang tidak sesuai dengan sebuah skema tetap.

Fungsi ini mengharapkan kolom dengan nama duplikat di Anda DataFrame telah diselesaikan.

  • dataframe— Apache Spark SQL DataFrame untuk dikonversi (wajib).

  • glue_ctx — Objek GlueContext kelas yang menentukan konteks untuk transformasi ini (wajib).

  • name— Nama yang dihasilkan DynamicFrame (opsional sejak AWS Glue 3.0).

toDF

toDF(options)

Mengkonversi sebuah DynamicFrame ke sebuah DataFrame Apache Spark dengan mengkonversi bidang DynamicRecords ke bidang DataFrame. Mengembalikan DataFrame yang baru.

Sebuah DynamicRecord mewakili catatan logis dalam sebuah DynamicFrame. Hal ini seperti sebuah baris dalam DataFrame Spark, kecuali bahwa ia bersifat self-describing dan dapat digunakan untuk data yang tidak sesuai dengan sebuah skema tetap.

  • options — Daftar pilihan. Memungkinkan Anda menentukan opsi tambahan untuk proses konversi. Beberapa opsi valid yang dapat Anda gunakan dengan parameter `options`:

    • format— menentukan format data, seperti json, csv, parket).

    • separater or sep— untuk CSV file, menentukan pembatas.

    • header— untuk CSV file, menunjukkan apakah baris pertama adalah header (benar/salah).

    • inferSchema— mengarahkan Spark untuk menyimpulkan skema secara otomatis (benar/salah).

    Berikut adalah contoh penggunaan parameter `options` dengan metode `toDF`:

    from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) csv_dyf = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": ["s3://my-bucket/path/to/csv/"]}, format="csv" ) csv_cf = csv_dyf.toDF(options={ "separator": ",", "header": "true", "ïnferSchema": "true" })

    Menentukan jenis target jika Anda memilih jenis tindakan Project dan Cast. Contohnya meliputi hal berikut.

    >>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])

 — Informasi —

count

count( ) — Mengembalikan jumlah baris dalam DataFrame yang mendasari.

schema

schema( ) — Mengembalikan skema dari DynamicFrame ini, atau jika itu tidak tersedia, skema dari DataFrame yang mendasari.

Untuk informasi selengkapnya tentang DynamicFrame jenis yang membentuk skema ini, lihatPySpark jenis ekstensi.

printSchema

printSchema( ) — Mencetak skema dari DataFrame yang mendasari.

show

show(num_rows) — Mencetak sejumlah baris yang ditentukan dari DataFrame yang mendasari.

repartition

repartition(numPartitions) — Mengembalikan DynamicFrame yang baru dengan partisi numPartitions.

coalesce

coalesce(numPartitions) — Mengembalikan DynamicFrame yang baru dengan partisi numPartitions.

 — transformasi —

apply_mapping

apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Menerapkan pemetaan deklaratif ke DynamicFrame dan mengembalikan yang baru DynamicFrame dengan pemetaan yang diterapkan ke bidang yang Anda tentukan. Bidang yang tidak ditentukan dihilangkan dari yang baru. DynamicFrame

  • mappings— Daftar tupel pemetaan (wajib). Masing-masing terdiri dari: (kolom sumber, tipe sumber, kolom target, tipe target).

    Jika kolom sumber memiliki titik "." dalam nama, Anda harus menempatkan backticks "``" di sekitarnya. Misalnya, untuk memetakan this.old.name (string) kethisNewName, Anda akan menggunakan tupel berikut:

    ("`this.old.name`", "string", "thisNewName", "string")
  • transformation_ctx — Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).

  • info — Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional).

  • stageThreshold— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

  • totalThreshold— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

Contoh: Gunakan apply_mapping untuk mengganti nama bidang dan mengubah jenis bidang

Contoh kode berikut menunjukkan cara menggunakan apply_mapping metode untuk mengganti nama bidang yang dipilih dan mengubah jenis bidang.

catatan

Untuk mengakses kumpulan data yang digunakan dalam contoh ini, lihat Contoh kode: Bergabung dan menghubungkan data dan ikuti petunjuk diLangkah 1: Merayapi data di bucket Amazon S3.

# Example: Use apply_mapping to reshape source data into # the desired column names and types as a new DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Select and rename fields, change field type print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:") persons_mapped = persons.apply_mapping( [ ("family_name", "String", "last_name", "String"), ("name", "String", "first_name", "String"), ("birth_date", "String", "date_of_birth", "Date"), ] ) persons_mapped.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the persons_mapped DynamicFrame, created with apply_mapping: root |-- last_name: string |-- first_name: string |-- date_of_birth: date

drop_fields

drop_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Panggil transformasi FlatMap kelas untuk menghapus bidang dari DynamicFrame. Mengembalikan sebuah DynamicFrame baru dengan bidang tertentu yang dibuang.

  • paths— Daftar string. Masing-masing berisi jalur lengkap ke node bidang yang ingin Anda jatuhkan. Anda dapat menggunakan notasi titik untuk menentukan bidang bersarang. Misalnya, jika bidang first adalah anak dari bidang name di pohon, Anda menentukan "name.first" untuk jalurnya.

    Jika node bidang memiliki literal . dalam nama, Anda harus melampirkan nama di backticks (). `

  • transformation_ctx — Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).

  • info — Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional).

  • stageThreshold— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

  • totalThreshold— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

Contoh: Gunakan drop_fields untuk menghapus bidang dari DynamicFrame

Contoh kode ini menggunakan drop_fields metode untuk menghapus bidang tingkat atas dan bersarang yang dipilih dari a. DynamicFrame

Contoh dataset

Contoh menggunakan dataset berikut yang diwakili oleh EXAMPLE-FRIENDS-DATA tabel dalam kode:

{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []} {"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]} {"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]} {"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}} {"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}

Contoh kode

# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame. # Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name. # Replace EXAMPLE-FRIENDS-DATA with your table name. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame from Glue Data Catalog glue_source_database = "MY-EXAMPLE-DATABASE" glue_source_table = "EXAMPLE-FRIENDS-DATA" friends = glueContext.create_dynamic_frame.from_catalog( database=glue_source_database, table_name=glue_source_table ) print("Schema for friends DynamicFrame before calling drop_fields:") friends.printSchema() # Remove location.county, remove friends.age, remove age friends = friends.drop_fields(paths=["age", "location.county", "friends.age"]) print("Schema for friends DynamicFrame after removing age, county, and friend age:") friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields: root |-- name: string |-- age: int |-- location: struct | |-- state: string | |-- county: string |-- friends: array | |-- element: struct | | |-- name: string | | |-- age: int Schema for friends DynamicFrame after removing age, county, and friend age: root |-- name: string |-- location: struct | |-- state: string |-- friends: array | |-- element: struct | | |-- name: string

filter

filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Mengembalikan baru DynamicFrame yang berisi semua DynamicRecords dalam input DynamicFrame yang memenuhi fungsi f predikat tertentu.

  • f — Fungsi predikat yang akan diterapkan pada DynamicFrame. Fungsi tersebut harus mengambil DynamicRecord sebagai argumen dan mengembalikan BETUL jika DynamicRecord memenuhi persyaratan filter, atau SALAH jika tidak (wajib).

    Sebuah DynamicRecord mewakili catatan logis dalam sebuah DynamicFrame. Ini mirip dengan baris di SparkDataFrame, kecuali bahwa itu menggambarkan diri sendiri dan dapat digunakan untuk data yang tidak sesuai dengan skema tetap.

  • transformation_ctx — Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).

  • info — Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional).

  • stageThreshold— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

  • totalThreshold— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

Contoh: Gunakan filter untuk mendapatkan pilihan bidang yang difilter

Contoh ini menggunakan filter metode untuk membuat baru DynamicFrame yang mencakup pemilihan bidang lain DynamicFrame yang difilter.

Seperti map metode, filter mengambil fungsi sebagai argumen yang akan diterapkan untuk setiap record dalam aslinyaDynamicFrame. Fungsi ini mengambil catatan sebagai input dan mengembalikan nilai Boolean. Jika nilai pengembalian benar, catatan akan dimasukkan dalam hasilDynamicFrame. Jika salah, catatan ditinggalkan.

catatan

Untuk mengakses kumpulan data yang digunakan dalam contoh ini, lihat Contoh kode: Persiapan data menggunakan ResolveChoice, Lambda, dan ApplyMapping dan ikuti petunjuk diLangkah 1: Merayapi data di bucket Amazon S3.

# Example: Use filter to create a new DynamicFrame # with a filtered selection of records from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create DynamicFrame from Glue Data Catalog medicare = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv" ] }, "csv", {"withHeader": True}, ) # Create filtered DynamicFrame with custom lambda # to filter records by Provider State and Provider City sac_or_mon = medicare.filter( f=lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"] ) # Compare record counts print("Unfiltered record count: ", medicare.count()) print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065 Filtered record count: 564

join

join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Melakukan kesetaraan penggabungan dengan DynamicFrame yang lain dan mengembalikan DynamicFrame yang dihasilkan.

  • paths1 — Daftar kunci dalam bingkai ini yang akan digabungkan.

  • paths2 — Daftar kunci dalam bingkai lain yang akan digabungkan.

  • frame2DynamicFrame yang lainnya yang akan digabungkan.

  • transformation_ctx — Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).

  • info — Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional).

  • stageThreshold— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

  • totalThreshold— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

Contoh: Gunakan bergabung untuk menggabungkan DynamicFrames

Contoh ini menggunakan join metode untuk melakukan join pada tigaDynamicFrames. AWS Glue melakukan gabungan berdasarkan tombol bidang yang Anda berikan. Hasilnya DynamicFrame berisi baris dari dua frame asli di mana kunci yang ditentukan cocok.

Perhatikan bahwa join transformasi membuat semua bidang tetap utuh. Ini berarti bahwa bidang yang Anda tentukan untuk dicocokkan muncul di hasil DynamicFrame, meskipun mereka berlebihan dan berisi kunci yang sama. Dalam contoh ini, kita gunakan drop_fields untuk menghapus kunci redundan ini setelah bergabung.

catatan

Untuk mengakses kumpulan data yang digunakan dalam contoh ini, lihat Contoh kode: Bergabung dan menghubungkan data dan ikuti petunjuk diLangkah 1: Merayapi data di bucket Amazon S3.

# Example: Use join to combine data from three DynamicFrames from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load DynamicFrames from Glue Data Catalog persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json" ) orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() print("Schema for the memberships DynamicFrame:") memberships.printSchema() print("Schema for the orgs DynamicFrame:") orgs.printSchema() # Join persons and memberships by ID persons_memberships = persons.join( paths1=["id"], paths2=["person_id"], frame2=memberships ) # Rename and drop fields from orgs # to prevent field name collisions with persons_memberships orgs = ( orgs.drop_fields(["other_names", "identifiers"]) .rename_field("id", "org_id") .rename_field("name", "org_name") ) # Create final join of all three DynamicFrames legislators_combined = orgs.join( paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships ).drop_fields(["person_id", "org_id"]) # Inspect the schema for the joined data print("Schema for the new legislators_combined DynamicFrame:") legislators_combined.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the memberships DynamicFrame: root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string Schema for the orgs DynamicFrame: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string Schema for the new legislators_combined DynamicFrame: root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

map

map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Mengembalikan DynamicFrame baru dihasilkan dari penerapan fungsi pemetaan yang ditentukan untuk semua catatan dalam DynamicFrame aslinya.

  • f — Fungsi pemetaan yang akan diterapkan ke semua catatan di DynamicFrame. Fungsi harus mengambil DynamicRecord sebagai sebuah argumen dan mengembalikan sebuah DynamicRecord baru (wajib).

    Sebuah DynamicRecord mewakili catatan logis dalam sebuah DynamicFrame. Ini mirip dengan baris di Apache SparkDataFrame, kecuali bahwa itu menggambarkan diri sendiri dan dapat digunakan untuk data yang tidak sesuai dengan skema tetap.

  • transformation_ctx — Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).

  • info— String yang dikaitkan dengan kesalahan dalam transformasi (opsional).

  • stageThreshold— Jumlah maksimum kesalahan yang dapat terjadi dalam transformasi sebelum kesalahan keluar (opsional). Default-nya adalah nol.

  • totalThreshold— Jumlah maksimum kesalahan yang dapat terjadi secara keseluruhan sebelum memproses kesalahan keluar (opsional). Default-nya adalah nol.

Contoh: Gunakan peta untuk menerapkan fungsi ke setiap catatan dalam DynamicFrame

Contoh ini menunjukkan bagaimana menggunakan map metode untuk menerapkan fungsi untuk setiap record dari sebuahDynamicFrame. Secara khusus, contoh ini menerapkan fungsi yang dipanggil MergeAddress ke setiap catatan untuk menggabungkan beberapa bidang alamat menjadi satu struct jenis.

catatan

Untuk mengakses kumpulan data yang digunakan dalam contoh ini, lihat Contoh kode: Persiapan data menggunakan ResolveChoice, Lambda, dan ApplyMapping dan ikuti petunjuk diLangkah 1: Merayapi data di bucket Amazon S3.

# Example: Use map to combine fields in all records # of a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema medicare = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]}, "csv", {"withHeader": True}) print("Schema for medicare DynamicFrame:") medicare.printSchema() # Define a function to supply to the map transform # that merges address fields into a single field def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec # Use map to apply MergeAddress to every record mapped_medicare = medicare.map(f = MergeAddress) print("Schema for mapped_medicare DynamicFrame:") mapped_medicare.printSchema()
Schema for medicare DynamicFrame: root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string Schema for mapped_medicare DynamicFrame: root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

mergeDynamicFrame

mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)

Menggabungkan DynamicFrame ini dengan pentahapan DynamicFrame berdasarkan kunci primer yang ditentukan untuk mengidentifikasi catatan. Rekaman duplikat (catatan dengan kunci utama yang sama) tidak dideduplikasi. Jika tidak ada catatan yang cocok dalam bingkai pentahapan, semua catatan (termasuk duplikat) akan dipertahankan dari sumber. Jika bingkai pementasan memiliki catatan yang cocok, catatan dari bingkai pementasan menimpa catatan di sumber AWS Glue.

  • stage_dynamic_frame — Pentahapan DynamicFrame yang akan digabungkan.

  • primary_keys — Daftar bidang kunci primer untuk mencocokkan catatan dari sumber dan pentahapan bingkai dinamis.

  • transformation_ctx — Sebuah string unik yang digunakan untuk mengambil metadata tentang transformasi saat ini (opsional).

  • options— Serangkaian pasangan JSON nama-nilai yang memberikan informasi tambahan untuk transformasi ini. Argumen ini saat ini tidak digunakan.

  • info — Sebuah String. Setiap string yang akan dikaitkan dengan kesalahan dalam transformasi ini.

  • stageThreshold — Sebuah Long. Jumlah kesalahan dalam transformasi yang ditentukan yang memerlukan pengolahan untuk membersihkan kesalahan.

  • totalThreshold — Sebuah Long. Jumlah total kesalahan hingga dan termasuk transformasi ini yang perlu disalahartikan oleh pemrosesan.

Metode ini mengembalikan baru DynamicFrame yang diperoleh dengan menggabungkan ini DynamicFrame dengan DynamicFrame pementasan.

DynamicFrame yang dikembalikan berisi catatan A dalam kasus ini:

  • Jika A ada di bingkai sumber dan bingkai pentahapan, maka A dalam bingkai pentahapan akan dikembalikan.

  • Jika A ada di tabel sumber dan tidak A.primaryKeys ada distagingDynamicFrame, tidak A diperbarui dalam tabel pementasan.

Bingkai sumber dan bingkai pementasan tidak perlu memiliki skema yang sama.

Contoh: Gunakan mergeDynamicFrame untuk menggabungkan dua DynamicFrames berdasarkan kunci utama

Contoh kode berikut menunjukkan bagaimana menggunakan mergeDynamicFrame metode untuk menggabungkan DynamicFrame dengan “pementasan”DynamicFrame, berdasarkan kunci utama. id

Contoh dataset

Contoh menggunakan dua DynamicFrames dari yang DynamicFrameCollection dipanggilsplit_rows_collection. Berikut ini adalah daftar kunci displit_rows_collection.

dict_keys(['high', 'low'])

Contoh kode

# Example: Use mergeDynamicFrame to merge DynamicFrames # based on a set of specified primary keys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import SelectFromCollection # Inspect the original DynamicFrames frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low") print("Inspect the DynamicFrame that contains rows where ID < 10") frame_low.toDF().show() frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high") print("Inspect the DynamicFrame that contains rows where ID > 10") frame_high.toDF().show() # Merge the DynamicFrames based on the "id" primary key merged_high_low = frame_high.mergeDynamicFrame( stage_dynamic_frame=frame_low, primary_keys=["id"] ) # View the results where the ID is 1 or 20 print("Inspect the merged DynamicFrame that contains the combined rows") merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 2| 0| fax| 202-225-3307| | 2| 1| phone| 202-225-5731| | 3| 0| fax| 202-225-3307| | 3| 1| phone| 202-225-5731| | 4| 0| fax| 202-225-3307| | 4| 1| phone| 202-225-5731| | 5| 0| fax| 202-225-3307| | 5| 1| phone| 202-225-5731| | 6| 0| fax| 202-225-3307| | 6| 1| phone| 202-225-5731| | 7| 0| fax| 202-225-3307| | 7| 1| phone| 202-225-5731| | 8| 0| fax| 202-225-3307| | 8| 1| phone| 202-225-5731| | 9| 0| fax| 202-225-3307| | 9| 1| phone| 202-225-5731| | 10| 0| fax| 202-225-6328| | 10| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains rows where ID > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| fax| 202-225-6328| | 11| 1| phone| 202-225-4576| | 11| 2| twitter| RepTrentFranks| | 12| 0| fax| 202-225-6328| | 12| 1| phone| 202-225-4576| | 12| 2| twitter| RepTrentFranks| | 13| 0| fax| 202-225-6328| | 13| 1| phone| 202-225-4576| | 13| 2| twitter| RepTrentFranks| | 14| 0| fax| 202-225-6328| | 14| 1| phone| 202-225-4576| | 14| 2| twitter| RepTrentFranks| | 15| 0| fax| 202-225-6328| | 15| 1| phone| 202-225-4576| | 15| 2| twitter| RepTrentFranks| | 16| 0| fax| 202-225-6328| | 16| 1| phone| 202-225-4576| | 16| 2| twitter| RepTrentFranks| | 17| 0| fax| 202-225-6328| | 17| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the merged DynamicFrame that contains the combined rows +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 20| 0| fax| 202-225-5604| | 20| 1| phone| 202-225-6536| | 20| 2| twitter| USRepLong| +---+-----+------------------------+-------------------------+

relationalize

relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Mengkonversi DynamicFrame ke dalam bentuk yang cocok dalam database relasional. Relationalisasi a DynamicFrame sangat berguna ketika Anda ingin memindahkan data dari SQL lingkungan No seperti DynamoDB ke database relasional seperti My. SQL

Transformasi menghasilkan daftar bingkai dengan membuka kolom bersarang dan kolom array berputar. Anda dapat menggabungkan kolom array berputar ke tabel root dengan menggunakan tombol gabungan yang dihasilkan selama fase unnest.

  • root_table_name — Nama untuk tabel akar.

  • staging_path— Jalur di mana metode dapat menyimpan partisi tabel berputar dalam CSV format (opsional). Tabel berputar dibaca kembali dari path ini.

  • options — Kamus parameter opsional.

  • transformation_ctx — Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).

  • info — Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional).

  • stageThreshold— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

  • totalThreshold— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

Contoh: Gunakan relationalize untuk meratakan skema bersarang di DynamicFrame

Contoh kode ini menggunakan relationalize metode untuk meratakan skema bersarang menjadi bentuk yang cocok dengan database relasional.

Contoh dataset

Contoh menggunakan DynamicFrame dipanggil legislators_combined dengan skema berikut. legislators_combinedmemiliki beberapa bidang bersarang sepertilinks,images, dancontact_details, yang akan diratakan oleh transformasi. relationalize

root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

Contoh kode

# Example: Use relationalize to flatten # a nested schema into a format that fits # into a relational database. # Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Apply relationalize and inspect new tables legislators_relationalized = legislators_combined.relationalize( "l_root", "s3://DOC-EXAMPLE-BUCKET/tmpDir" ) legislators_relationalized.keys() # Compare the schema of the contact_details # nested field to the new relationalized table that # represents it legislators_combined.select_fields("contact_details").printSchema() legislators_relationalized.select("l_root_contact_details").toDF().where( "id = 10 or id = 75" ).orderBy(["id", "index"]).show()

Output berikut memungkinkan Anda membandingkan skema bidang bersarang yang dipanggil contact_details ke tabel yang dibuat oleh relationalize transformasi. Perhatikan bahwa catatan tabel menautkan kembali ke tabel utama menggunakan kunci asing yang disebut id dan index kolom yang mewakili posisi array.

dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers']) root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| 202-225-4160| | 10| 1| phone| 202-225-3436| | 75| 0| fax| 202-225-6791| | 75| 1| phone| 202-225-2861| | 75| 2| twitter| RepSamFarr| +---+-----+------------------------+-------------------------+

rename_field

rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Mengganti nama bidang dalam DynamicFrame hal ini dan mengembalikan DynamicFrame baru dengan bidang yang diganti namanya.

  • oldName — Jalur lengkap ke simpul yang ingin Anda ganti namanya.

    Jika nama lama memiliki titik di dalamnya, RenameField tidak berfungsi kecuali Anda menempatkan backticks di sekitarnya ()`. Sebagai contoh, untuk menggantikan this.old.name dengan thisNewName, Anda akan memanggil rename_field sebagai berikut.

    newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
  • newName — Nama baru, sebagai path lengkap.

  • transformation_ctx — Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).

  • info — Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional).

  • stageThreshold— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

  • totalThreshold— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

Contoh: Gunakan rename_field untuk mengganti nama bidang dalam DynamicFrame

Contoh kode ini menggunakan rename_field metode untuk mengganti nama bidang dalamDynamicFrame. Perhatikan bahwa contoh menggunakan metode chaining untuk mengganti nama beberapa bidang secara bersamaan.

catatan

Untuk mengakses kumpulan data yang digunakan dalam contoh ini, lihat Contoh kode: Bergabung dan menghubungkan data dan ikuti petunjuk diLangkah 1: Merayapi data di bucket Amazon S3.

Contoh kode

# Example: Use rename_field to rename fields # in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Inspect the original orgs schema orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Original orgs schema: ") orgs.printSchema() # Rename fields and view the new schema orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name") print("New orgs schema with renamed fields: ") orgs.printSchema()
Original orgs schema: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string New orgs schema with renamed fields: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- classification: string |-- org_id: string |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string

resolveChoice

resolveChoice(specs = None, choice = "" , database = None , table_name = None , transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id = None)

Mengubah jenis pilihan dalam DynamicFrame ini dan mengembalikan DynamicFrame.

  • specs — Daftar ambiguitas spesifik yang akan diubah, masing-masing dalam bentuk tupel: (field_path, action).

    Ada dua cara untuk menggunakan resolveChoice. Yang pertama adalah dengan menggunakan argumen specs untuk menentukan urutan bidang tertentu dan cara mengubahnya. Mode lainnya resolveChoice adalah menggunakan choice argumen untuk menentukan resolusi tunggal untuk semuaChoiceTypes.

    Nilai untuk specs ditetapkan sebagai tupel terdiri dari pasangan (field_path, action). Nilai field_path mengidentifikasi elemen ambigu tertentu, dan nilai action mengidentifikasi resolusi yang sesuai. Berikut ini adalah tindakan yang mungkin:

    • cast:type — Upaya untuk mengubah semua nilai ke jenis tertentu. Sebagai contoh: cast:int.

    • make_cols — Mengkonversi setiap jenis yang berbeda menjadi kolom dengan nama columnName_type. Ini menyelesaikan ambiguitas potensial dengan meratakan data. Misalnya, jika columnA bisa berupa int atau string, maka resolusi akan menghasilkan dua kolom bernama columnA_int dan columnA_string dalam DynamicFrame yang dihasilkan.

    • make_struct — Mengubah ambiguitas potensial dengan menggunakan struct untuk mewakili data. Misalnya, jika data dalam kolom bisa berupa int atau astring, make_struct tindakan menghasilkan kolom struktur yang dihasilkanDynamicFrame. Setiap struktur berisi a int dan astring.

    • project:type — Mengubah ambiguitas potensial dengan memproyeksikan semua data ke salah satu jenis data yang mungkin. Sebagai contoh, jika data dalam kolom bisa berupa int atau string, dengan menggunakan tindakan project:string menghasilkan kolom dalam DynamicFrame yang dihasilkan di mana semua nilai int telah dikonversi menjadi string.

    Jika field_path mengidentifikasi sebuah array, menempatkan kurung persegi kosong setelah nama array untuk menghindari ambiguitas. Misalnya, anggap Anda bekerja dengan data yang terstruktur sebagai berikut:

    "myList": [ { "price": 100.00 }, { "price": "$100.00" } ]

    Anda dapat memilih numerik daripada versi string harga dengan menyetel field_path ke"myList[].price", dan menyetel action ke"cast:double".

    catatan

    Anda hanya dapat menggunakan salah satu choice parameter specs dan. Jika parameter specs bukan None, maka parameter choice harus string kosong. Sebaliknya, jika choice bukan string kosong, maka specs parameternya harusNone.

  • choice — Menentukan resolusi tunggal untuk semua ChoiceTypes. Anda dapat menggunakan ini dalam kasus di mana daftar lengkap tidak ChoiceTypes diketahui sebelum runtime. Selain tindakan-tindakan yang tercantum sebelumnya untuk specs, argumen ini juga mendukung tindakan berikut:

    • match_catalog — Upaya untuk mengubah setiap ChoiceType menjadi jenis yang sesuai dalam tabel Katalog Data yang ditentukan.

  • database — Basis data Katalog Data yang akan digunakan dengan tindakan match_catalog.

  • table_name — Tabel Katalog Data yang akan digunakan dengan tindakan match_catalog.

  • transformation_ctx — Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).

  • info — Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional).

  • stageThreshold— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

  • totalThreshold— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error out.

  • catalog_id — ID katalog Katalog Data yang sedang diakses (ID akun Katalog Data). Ketika diatur ke None (nilai default), ia menggunakan ID katalog akun yang memanggil.

Contoh: Gunakan resolveChoice untuk menangani kolom yang berisi beberapa jenis

Contoh kode ini menggunakan resolveChoice metode untuk menentukan cara menangani DynamicFrame kolom yang berisi nilai-nilai dari beberapa jenis. Contoh menunjukkan dua cara umum untuk menangani kolom dengan jenis yang berbeda:

  • Transmisikan kolom ke satu tipe data.

  • Pertahankan semua jenis di kolom terpisah.

Contoh dataset

catatan

Untuk mengakses kumpulan data yang digunakan dalam contoh ini, lihat Contoh kode: Persiapan data menggunakan ResolveChoice, Lambda, dan ApplyMapping dan ikuti petunjuk diLangkah 1: Merayapi data di bucket Amazon S3.

Contoh menggunakan DynamicFrame dipanggil medicare dengan skema berikut:

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

Contoh kode

# Example: Use resolveChoice to handle # a column that contains multiple types from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input data and inspect the "provider id" column medicare = glueContext.create_dynamic_frame.from_catalog( database="payments", table_name="medicare_hospital_provider_csv" ) print("Inspect the provider id column:") medicare.toDF().select("provider id").show() # Cast provider id to type long medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")]) print("Schema after casting provider id to type long:") medicare_resolved_long.printSchema() medicare_resolved_long.toDF().select("provider id").show() # Create separate columns # for each provider id type medicare_resolved_cols = medicare.resolveChoice(choice="make_cols") print("Schema after creating separate columns for each type:") medicare_resolved_cols.printSchema() medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column: +-----------+ |provider id| +-----------+ | [10001,]| | [10005,]| | [10006,]| | [10011,]| | [10016,]| | [10023,]| | [10029,]| | [10033,]| | [10039,]| | [10040,]| | [10046,]| | [10055,]| | [10056,]| | [10078,]| | [10083,]| | [10085,]| | [10090,]| | [10092,]| | [10100,]| | [10103,]| +-----------+ only showing top 20 rows Schema after casting 'provider id' to type long: root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +-----------+ |provider id| +-----------+ | 10001| | 10005| | 10006| | 10011| | 10016| | 10023| | 10029| | 10033| | 10039| | 10040| | 10046| | 10055| | 10056| | 10078| | 10083| | 10085| | 10090| | 10092| | 10100| | 10103| +-----------+ only showing top 20 rows Schema after creating separate columns for each type: root |-- drg definition: string |-- provider id_string: string |-- provider id_long: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +----------------+------------------+ |provider id_long|provider id_string| +----------------+------------------+ | 10001| null| | 10005| null| | 10006| null| | 10011| null| | 10016| null| | 10023| null| | 10029| null| | 10033| null| | 10039| null| | 10040| null| | 10046| null| | 10055| null| | 10056| null| | 10078| null| | 10083| null| | 10085| null| | 10090| null| | 10092| null| | 10100| null| | 10103| null| +----------------+------------------+ only showing top 20 rows

select_fields

select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Mengembalikan baru DynamicFrame yang berisi bidang yang dipilih.

  • paths— Daftar string. Setiap string adalah jalur ke node tingkat atas yang ingin Anda pilih.

  • transformation_ctx — Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).

  • info — Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional).

  • stageThreshold— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

  • totalThreshold— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

Contoh: Gunakan select_fields untuk membuat baru DynamicFrame dengan bidang yang dipilih

Contoh kode berikut menunjukkan cara menggunakan select_fields metode untuk membuat yang baru DynamicFrame dengan daftar bidang yang dipilih dari yang sudah adaDynamicFrame.

catatan

Untuk mengakses kumpulan data yang digunakan dalam contoh ini, lihat Contoh kode: Bergabung dan menghubungkan data dan ikuti petunjuk diLangkah 1: Merayapi data di bucket Amazon S3.

# Example: Use select_fields to select specific fields from a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Create a new DynamicFrame with chosen fields names = persons.select_fields(paths=["family_name", "given_name"]) print("Schema for the names DynamicFrame, created with select_fields:") names.printSchema() names.toDF().show()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the names DynamicFrame: root |-- family_name: string |-- given_name: string +-----------+----------+ |family_name|given_name| +-----------+----------+ | Collins| Michael| | Huizenga| Bill| | Clawson| Curtis| | Solomon| Gerald| | Rigell| Edward| | Crapo| Michael| | Hutto| Earl| | Ertel| Allen| | Minish| Joseph| | Andrews| Robert| | Walden| Greg| | Kazen| Abraham| | Turner| Michael| | Kolbe| James| | Lowenthal| Alan| | Capuano| Michael| | Schrader| Kurt| | Nadler| Jerrold| | Graves| Tom| | McMillan| John| +-----------+----------+ only showing top 20 rows

sederhana_ddb_json

simplify_ddb_json(): DynamicFrame

Menyederhanakan kolom bersarang dalam DynamicFrame yang secara khusus dalam JSON struktur DynamoDB, dan mengembalikan disederhanakan baru. DynamicFrame Jika ada beberapa jenis atau tipe Peta dalam tipe Daftar, elemen dalam Daftar tidak akan disederhanakan. Perhatikan bahwa ini adalah jenis transformasi tertentu yang berperilaku berbeda dari unnest transformasi biasa dan mengharuskan data sudah berada dalam struktur DynamoDBJSON. Untuk informasi selengkapnya, lihat DynamoDB JSON.

Misalnya, skema pembacaan ekspor dengan struktur JSON DynamoDB mungkin terlihat seperti berikut:

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

simplify_ddb_json()Transformasi akan mengubah ini menjadi:

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

Contoh: Gunakan simplify_ddb_json untuk memanggil DynamoDB menyederhanakan JSON

Contoh kode ini menggunakan simplify_ddb_json metode untuk menggunakan konektor ekspor AWS Glue DynamoDB, memanggil DynamoDB menyederhanakan, dan mencetak JSON jumlah partisi.

Contoh kode

from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "dynamodb", connection_options = { 'dynamodb.export': 'ddb', 'dynamodb.tableArn': '<table arn>', 'dynamodb.s3.bucket': '<bucket name>', 'dynamodb.s3.prefix': '<bucket prefix>', 'dynamodb.s3.bucketOwner': '<account_id of bucket>' } ) simplified = dynamicFrame.simplify_ddb_json() print(simplified.getNumPartitions())

spigot

spigot(path, options={})

Menulis catatan sampel ke tujuan tertentu untuk membantu Anda memverifikasi transformasi yang dilakukan oleh pekerjaan Anda.

  • path— Jalur tujuan untuk menulis (wajib).

  • options— Pasangan kunci-nilai yang menentukan opsi (opsional). Pilihan "topk" menentukan bahwa catatan k pertama harus ditulis. "prob"Opsi menentukan probabilitas (sebagai desimal) untuk memilih catatan yang diberikan. Anda dapat menggunakannya dalam memilih catatan untuk menulis.

  • transformation_ctx — Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).

Contoh: Gunakan keran untuk menulis bidang contoh dari a DynamicFrame ke Amazon S3

Contoh kode ini menggunakan spigot metode untuk menulis catatan sampel ke bucket Amazon S3 setelah menerapkan transformasi. select_fields

Contoh dataset

catatan

Untuk mengakses kumpulan data yang digunakan dalam contoh ini, lihat Contoh kode: Bergabung dan menghubungkan data dan ikuti petunjuk diLangkah 1: Merayapi data di bucket Amazon S3.

Contoh menggunakan DynamicFrame dipanggil persons dengan skema berikut:

root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

Contoh kode

# Example: Use spigot to write sample records # to a destination during a transformation # from pyspark.context import SparkContext. # Replace DOC-EXAMPLE-BUCKET with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load table data into a DynamicFrame persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) # Perform the select_fields on the DynamicFrame persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"]) # Use spigot to write a sample of the transformed data # (the first 10 records) spigot_output = persons.spigot( path="s3://DOC-EXAMPLE-BUCKET", options={"topk": 10} )

Berikut ini adalah contoh data yang spigot menulis ke Amazon S3. Karena kode contoh yang ditentukanoptions={"topk": 10}, data sampel berisi 10 catatan pertama.

{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"} {"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"} {"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"} {"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"} {"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"} {"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"} {"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"} {"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"} {"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"} {"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}

split_fields

split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Mengembalikan baru DynamicFrameCollection yang berisi duaDynamicFrames. Yang pertama DynamicFrame berisi semua node yang telah dipisahkan, dan yang kedua berisi node yang tersisa.

  • paths — Daftar string, masing-masing merupakan path lengkap ke sebuah simpul yang ingin Anda bagi menjadi sebuah DynamicFrame.

  • name1 — Sebuah string nama untuk DynamicFrame yang terbelah.

  • name2 — Sebuah string nama untuk DynamicFrame yang tersisa setelah simpul yang ditentukan telah dipecah.

  • transformation_ctx — Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).

  • info — Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional).

  • stageThreshold— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

  • totalThreshold— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

Contoh: Gunakan split_fields untuk membagi bidang yang dipilih menjadi terpisah DynamicFrame

Contoh kode ini menggunakan split_fields metode untuk membagi daftar bidang tertentu menjadi terpisahDynamicFrame.

Contoh dataset

Contoh menggunakan panggilan DynamicFrame l_root_contact_details yang berasal dari koleksi bernamalegislators_relationalized.

l_root_contact_detailsmemiliki skema dan entri berikut.

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| ...

Contoh kode

# Example: Use split_fields to split selected # fields into a separate DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input DynamicFrame and inspect its schema frame_to_split = legislators_relationalized.select("l_root_contact_details") print("Inspect the input DynamicFrame schema:") frame_to_split.printSchema() # Split id and index fields into a separate DynamicFrame split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right") # Inspect the resulting DynamicFrames print("Inspect the schemas of the DynamicFrames created with split_fields:") split_fields_collection.select("left").printSchema() split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema: root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string Inspect the schemas of the DynamicFrames created with split_fields: root |-- id: long |-- index: int root |-- contact_details.val.type: string |-- contact_details.val.value: string

split_rows

split_rows(comparison_dict, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Membagi satu atau beberapa baris dalam DynamicFrame ke sebuah DynamicFrame yang baru.

Metode mengembalikan baru DynamicFrameCollection yang berisi duaDynamicFrames. Yang pertama DynamicFrame berisi semua baris yang telah dipisahkan, dan yang kedua berisi baris yang tersisa.

  • comparison_dict— Kamus di mana kuncinya adalah jalur ke kolom, dan nilainya adalah kamus lain untuk pemetaan pembanding ke nilai yang dibandingkan dengan nilai kolom. Misalnya, {"age": {">": 10, "<": 20}} membagi semua baris yang nilainya di kolom usia lebih besar dari 10 dan kurang dari 20.

  • name1 — Sebuah string nama untuk DynamicFrame yang terbelah.

  • name2 — Sebuah string nama untuk DynamicFrame yang tersisa setelah simpul yang ditentukan telah dipecah.

  • transformation_ctx — Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).

  • info — Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional).

  • stageThreshold— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

  • totalThreshold— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

Contoh: Gunakan split_rows untuk membagi baris dalam DynamicFrame

Contoh kode ini menggunakan split_rows metode untuk membagi baris DynamicFrame berdasarkan nilai id bidang.

Contoh dataset

Contoh menggunakan panggilan DynamicFrame l_root_contact_details yang dipilih dari koleksi bernamalegislators_relationalized.

l_root_contact_detailsmemiliki skema dan entri berikut.

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+

Contoh kode

# Example: Use split_rows to split up # rows in a DynamicFrame based on value from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Retrieve the DynamicFrame to split frame_to_split = legislators_relationalized.select("l_root_contact_details") # Split up rows by ID split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low") # Inspect the resulting DynamicFrames print("Inspect the DynamicFrame that contains IDs < 10") split_rows_collection.select("low").toDF().show() print("Inspect the DynamicFrame that contains IDs > 10") split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains IDs > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| phone| 202-225-5476| | 11| 1| twitter| RepDavidYoung| | 12| 0| phone| 202-225-4035| | 12| 1| twitter| RepStephMurphy| | 13| 0| fax| 202-226-0774| | 13| 1| phone| 202-225-6335| | 14| 0| fax| 202-226-0774| | 14| 1| phone| 202-225-6335| | 15| 0| fax| 202-226-0774| | 15| 1| phone| 202-225-6335| | 16| 0| fax| 202-226-0774| | 16| 1| phone| 202-225-6335| | 17| 0| fax| 202-226-0774| | 17| 1| phone| 202-225-6335| | 18| 0| fax| 202-226-0774| | 18| 1| phone| 202-225-6335| | 19| 0| fax| 202-226-0774| | 19| 1| phone| 202-225-6335| | 20| 0| fax| 202-226-0774| | 20| 1| phone| 202-225-6335| +---+-----+------------------------+-------------------------+ only showing top 20 rows

unbox

unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)

Membuka kotak (memformat ulang) bidang string di a DynamicFrame dan mengembalikan yang baru yang berisi kotak yang DynamicFrame tidak dikotaknya. DynamicRecords

Sebuah DynamicRecord mewakili catatan logis dalam sebuah DynamicFrame. Ini mirip dengan baris di Apache SparkDataFrame, kecuali bahwa itu menggambarkan diri sendiri dan dapat digunakan untuk data yang tidak sesuai dengan skema tetap.

  • path — Sebuah path lengkap ke simpul string ingin Anda buka kotaknya.

  • format — Format spesifikasi (opsional). Anda menggunakan ini untuk Amazon S3 atau AWS Glue koneksi yang mendukung berbagai format. Untuk format yang didukung, lihatOpsi format data untuk input dan output untuk Spark AWS Glue.

  • transformation_ctx — Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).

  • info — Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional).

  • stageThreshold— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

  • totalThreshold— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

  • options — Satu atau beberapa hal berikut:

    • separator— String yang berisi karakter pemisah.

    • escaper— Sebuah string yang berisi karakter escape.

    • skipFirst— Nilai Boolean yang menunjukkan apakah akan melewatkan contoh pertama.

    • withSchema— Sebuah string yang berisi JSON representasi dari skema node. Format JSON representasi skema didefinisikan oleh output dari. StructType.json()

    • withHeader— Nilai Boolean yang menunjukkan apakah header disertakan.

Contoh: Gunakan unbox untuk membuka kotak bidang string ke dalam struct

Contoh kode ini menggunakan unbox metode untuk membuka kotak, atau memformat ulang, bidang string dalam bidang DynamicFrame ke dalam bidang tipe struct.

Contoh dataset

Contoh menggunakan DynamicFrame dipanggil mapped_with_string dengan skema dan entri berikut.

Perhatikan bidang bernamaAddressString. Ini adalah bidang yang membuka kotak contoh menjadi struct.

root |-- Average Total Payments: string |-- AddressString: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| ...

Contoh kode

# Example: Use unbox to unbox a string field # into a struct in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) unboxed = mapped_with_string.unbox("AddressString", "json") unboxed.printSchema() unboxed.toDF().show()
root |-- Average Total Payments: string |-- AddressString: struct | |-- Street: string | |-- City: string | |-- State: string | |-- Zip.Code: string | |-- Array: array | | |-- element: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| | $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...| | $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...| | $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...| | $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...| | $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL| | $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...| | $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...| | $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL| | $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...| | $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...| | $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...| | $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...| | $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL| | $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...| | $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL| | $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ only showing top 20 rows

serikat

union(frame1, frame2, transformation_ctx = "", info = "", stageThreshold = 0, totalThreshold = 0)

Serikat dua DynamicFrames. Pengembalian DynamicFrame yang berisi semua catatan dari kedua input DynamicFrames. Transformasi ini dapat mengembalikan hasil yang berbeda dari penyatuan dua DataFrames dengan data yang setara. Jika Anda membutuhkan perilaku DataFrame serikat Spark, pertimbangkan untuk menggunakantoDF.

  • frame1Pertama DynamicFrame ke serikat pekerja.

  • frame2Kedua setelah DynamicFrame serikat pekerja.

  • transformation_ctx— (opsional) String unik yang digunakan untuk mengidentifikasi statistik/informasi negara

  • info— (opsional) String apa pun yang akan dikaitkan dengan kesalahan dalam transformasi

  • stageThreshold— (opsional) Jumlah maksimum kesalahan dalam transformasi sampai pemrosesan akan error

  • totalThreshold— (opsional) Jumlah maksimum kesalahan total sampai pemrosesan akan error.

unnest

unnest(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Menyingkirkan objek bersarang di aDynamicFrame, yang menjadikannya objek tingkat atas, dan mengembalikan unnested baru. DynamicFrame

  • transformation_ctx — Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).

  • info — Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional).

  • stageThreshold— Jumlah kesalahan yang ditemui selama transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

  • totalThreshold— Jumlah kesalahan yang ditemui hingga dan termasuk transformasi ini di mana proses harus error out (opsional). Defaultnya adalah nol, yang menunjukkan bahwa proses tidak boleh error.

Contoh: Gunakan unnest untuk mengubah bidang bersarang menjadi bidang tingkat atas

Contoh kode ini menggunakan unnest metode untuk meratakan semua bidang bersarang di dalam bidang tingkat atas. DynamicFrame

Contoh dataset

Contoh menggunakan DynamicFrame dipanggil mapped_medicare dengan skema berikut. Perhatikan bahwa Address bidang adalah satu-satunya bidang yang berisi data bersarang.

root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

Contoh kode

# Example: Use unnest to unnest nested # objects in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Unnest all nested fields unnested = mapped_medicare.unnest() unnested.printSchema()
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address.Zip.Code: string |-- Address.City: string |-- Address.Array: array | |-- element: string |-- Address.State: string |-- Address.Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

unnest_ddb_json

Unnests kolom bersarang di a DynamicFrame yang secara khusus dalam JSON struktur DynamoDB, dan mengembalikan unnested baru. DynamicFrame Kolom yang terdiri dari array tipe struct tidak akan di-unnested. Perhatikan bahwa ini adalah jenis transformasi unnesting tertentu yang berperilaku berbeda dari unnest transformasi biasa dan mengharuskan data sudah berada dalam struktur DynamoDB. JSON Untuk informasi selengkapnya, lihat DynamoDB JSON.

unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
  • transformation_ctx — Sebuah string unik yang digunakan untuk mengidentifikasi informasi status (opsional).

  • info — Sebuah string yang akan dikaitkan dengan pelaporan kesalahan untuk transformasi ini (opsional).

  • stageThreshold — Jumlah kesalahan yang dihadapi selama transformasi ini di mana proses mengeluarkan kesalahan (opsional: nol secara default, menunjukkan bahwa proses tidak mengeluarkan kesalahan).

  • totalThreshold — Jumlah kesalahan yang dihadapi hingga dan termasuk transformasi ini di mana proses mengeluarkan kesalahan (opsional: nol secara default, menunjukkan bahwa proses tidak mengeluarkan kesalahan).

Misalnya, skema pembacaan ekspor dengan struktur JSON DynamoDB mungkin terlihat seperti berikut:

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

unnest_ddb_json()Transformasi akan mengubah ini menjadi:

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

Contoh kode berikut menunjukkan cara menggunakan konektor ekspor AWS Glue DynamoDB, memanggil DynamoDB unnest, dan mencetak JSON jumlah partisi:

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()

tulis

write(connection_type, connection_options, format, format_options, accumulator_size)

Mendapatkan DataSink(objek) dari jenis koneksi yang ditentukan dari GlueContext kelas dari DynamicFrame ini, dan menggunakannya untuk memformat dan menulis isi dari DynamicFrame ini. Mengembalikan DynamicFrame baru yang sudah diformat dan ditulis sebagaimana ditentukan.

  • connection_type — Jenis koneksi yang akan digunakan. Nilai yang valid termasuk s3, mysql, postgresql, redshift, sqlserver, dan oracle.

  • connection_options — Opsi koneksi yang akan digunakan (opsional). Untuk connection_type dari s3, path Amazon S3 didefinisikan.

    connection_options = {"path": "s3://aws-glue-target/temp"}

    Untuk JDBC koneksi, beberapa properti harus didefinisikan. Perhatikan bahwa nama database harus menjadi bagian dariURL. Secara opsional dapat disertakan dalam opsi koneksi.

    Awas

    Menyimpan kata sandi dalam skrip Anda tidak disarankan. Pertimbangkan boto3 untuk menggunakan untuk mengambilnya dari AWS Secrets Manager atau Katalog Data AWS Glue.

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}
  • format — Format spesifikasi (opsional). Ini digunakan untuk Amazon Simple Storage Service (Amazon S3) atau AWS Glue koneksi yang mendukung berbagai format. Lihat Opsi format data untuk input dan output untuk Spark AWS Glue untuk format yang didukung.

  • format_options — Pilihan format untuk format yang ditentukan. Lihat Opsi format data untuk input dan output untuk Spark AWS Glue untuk format yang didukung.

  • accumulator_size— Ukuran akumulasi untuk digunakan, dalam byte (opsional).

 — kesalahan —

assertErrorThreshold

assertErrorThreshold( ) — Sebuah pernyataan untuk kesalahan dalam transformasi yang menciptakan DynamicFrame ini. Mengembalikan sebuah Exception dari DataFrame yang mendasari.

errorsAsDynamicBingkai

errorsAsDynamicFrame( ) — Mengembalikan DynamicFrame yang memiliki catatan kesalahan yang di-nest di dalamnya.

Contoh: Gunakan errorsAsDynamic Frame untuk melihat catatan kesalahan

Contoh kode berikut menunjukkan cara menggunakan errorsAsDynamicFrame metode untuk melihat catatan kesalahan untuk fileDynamicFrame.

Contoh dataset

Contoh menggunakan kumpulan data berikut yang dapat Anda unggah ke Amazon JSON S3 sebagai. Perhatikan bahwa catatan kedua cacat. Data yang salah bentuk biasanya merusak penguraian file saat Anda menggunakan Spark. SQL Namun, DynamicFrame mengenali masalah malformasi dan mengubah garis cacat menjadi catatan kesalahan yang dapat Anda tangani satu per satu.

{"id": 1, "name": "george", "surname": "washington", "height": 178} {"id": 2, "name": "benjamin", "surname": "franklin", {"id": 3, "name": "alexander", "surname": "hamilton", "height": 171} {"id": 4, "name": "john", "surname": "jay", "height": 190}

Contoh kode

# Example: Use errorsAsDynamicFrame to view error records. # Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create errors DynamicFrame, view schema errors = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://DOC-EXAMPLE-S3-BUCKET/error_data.json"]}, "json" ) print("Schema of errors DynamicFrame:") errors.printSchema() # Show that errors only contains valid entries from the dataset print("errors contains only valid records from the input dataset (2 of 4 records)") errors.toDF().show() # View errors print("Errors count:", str(errors.errorsCount())) print("Errors:") errors.errorsAsDynamicFrame().toDF().show() # View error fields and error data error_record = errors.errorsAsDynamicFrame().toDF().head() error_fields = error_record["error"] print("Error fields: ") print(error_fields.asDict().keys()) print("\nError record data:") for key in error_fields.asDict().keys(): print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame: root |-- id: int |-- name: string |-- surname: string |-- height: int errors contains only valid records from the input dataset (2 of 4 records) +---+------+----------+------+ | id| name| surname|height| +---+------+----------+------+ | 1|george|washington| 178| | 4| john| jay| 190| +---+------+----------+------+ Errors count: 1 Errors: +--------------------+ | error| +--------------------+ |[[ File "/tmp/20...| +--------------------+ Error fields: dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord']) Error record data: callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='') msg : error in jackson reader stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at scala.collection.Iterator$$anon$9.next(Iterator.scala:162) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202) at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116) at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109) at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247) at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) input : bytesread : 252 source : dynamicRecord : Row(id=2, name='benjamin', surname='franklin')

errorsCount

errorsCount( ) — Mengembalikan jumlah kesalahan dalam sebuah DynamicFrame.

stageErrorsCount

stageErrorsCount — Mengembalikan jumlah kesalahan yang terjadi dalam proses membuat DynamicFrame ini.