AWS GlueKelas scala DynamicFrame - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

AWS GlueKelas scala DynamicFrame

Package: com.amazonaws.services.glue

class DynamicFrame extends Serializable with Logging ( val glueContext : GlueContext, _records : RDD[DynamicRecord], val name : String = s"", val transformationContext : String = DynamicFrame.UNDEFINED, callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0, prevErrors : => Long = 0, errorExpr : => Unit = {} )

Sebuah DynamicFrame adalah koleksi terdistribusi dari objek DynamicRecord yang mendeskripsi secara mandiri.

DynamicFrame dirancang untuk menyediakan model data yang fleksibel untuk operasi ETL (ekstrak, transformasi, dan muat). Mereka tidak mengharuskan membuat sebuah skema, dan Anda dapat menggunakannya untuk membaca dan melakukan transformasi pada data yang berisi nilai dan jenis yang berantakan atau tidak konsisten. Sebuah skema dapat dihitung berdasarkan permintaan untuk operasi-operasi yang membutuhkannya.

DynamicFrame menyediakan berbagai transformasi untuk pembersihan data dan ETL. Mereka juga mendukung konversi ke dan dari DataFrames SparkSQL untuk diintegrasikan dengan kode yang ada dan banyak operasi analitik yang menyediakan. DataFrames

Parameter berikut dibagi di banyak AWS Glue transformasi yang membangun sDynamicFrame:

  • transformationContext — Pengidentifikasi untuk DynamicFrame ini. transformationContext digunakan sebagai kunci untuk status bookmark tugas yang tetap ada di seluruh eksekusi.

  • callSite — Menyediakan informasi konteks untuk pelaporan kesalahan. Nilai-nilai ini secara otomatis ditetapkan ketika memanggil dari Python.

  • stageThreshold — Jumlah maksimum catatan kesalahan yang diizinkan dari komputasi DynamicFrame ini sebelum melemparkan pengecualian, tidak termasuk catatan yang ada dalam DynamicFrame sebelumnya.

  • totalThreshold — Jumlah maksimum total catatan kesalahan sebelum pengecualian dilemparkan, termasuk yang dari bingkai sebelumnya.

ErrorsCount Val

val errorsCount

Jumlah catatan kesalahan dalam DynamicFrame ini. Ini termasuk kesalahan dari operasi sebelumnya.

Def ApplyMapping

def applyMapping( mappings : Seq[Product4[String, String, String, String]], caseSensitive : Boolean = true, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • mappings — Urutan pemetaan untuk membangun DynamicFrame baru.

  • caseSensitive — Apakah akan memperlakukan kolom sumber sebagai kolom yang peka huruf besar dan kecil. Mengatur ini ke false mungkin membantu ketika mengintegrasikan dengan penyimpanan yang peka huruf besar dan kecil seperti Katalog Data Glue AWS.

Memilih, memproyeksikan, dan melemparkan kolom berdasarkan urutan pemetaan.

Setiap pemetaan terdiri dari kolom sumber dan jenis serta kolom target dan jenis. Pemetaan dapat ditentukan sebagai empat tupel (source_path, source_type, target_path, target_type) atau objek MappingSpec yang berisi informasi yang sama.

Selain menggunakan pemetaan untuk proyeksi dan transmisi sederhana, Anda dapat menggunakannya untuk melakukan nest atau membuka nest pada bidang dengan memisahkan komponen dari path dengan tanda '.' (titik).

Sebagai contoh, anggaplah Anda memiliki DynamicFrame dengan skema berikut.

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- zip: int }}}

Anda dapat membuat panggilan berikut untuk membuka sarang dari bidang state dan zip.

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("address.state", "string", "state", "string"), ("address.zip", "int", "zip", "int"))) }}}

Skema yang dihasilkan adalah sebagai berikut.

{{{ root |-- name: string |-- age: int |-- state: string |-- zip: int }}}

Anda juga dapat menggunakan applyMapping untuk melakukan nest kembali pada kolom. Sebagai contoh, berikut ini membalikkan transformasi sebelumnya dan membuat sebuah struct bernama address dalam target.

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("state", "string", "address.state", "string"), ("zip", "int", "address.zip", "int"))) }}}

Nama kolom yang berisi karakter '.' (titik) dapat dikutip dengan menggunakan karakter backtick (``).

catatan

Saat ini, Anda tidak dapat menggunakan metode applyMapping untuk memetakan kolom yang bersarang di bawah array.

Def assertErrorThreshold

def assertErrorThreshold : Unit

Tindakan yang memaksa perhitungan dan memverifikasi bahwa jumlah catatan kesalahan berada di bawah stageThreshold dan totalThreshold. Melempar pengecualian jika salah satu kondisi gagal.

Hitungan def

lazy def count

Mengembalikan jumlah elemen dalam DynamicFrame ini.

DropField Def

def dropField( path : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Mengembalikan DynamicFrame baru dengan kolom tertentu dihapus.

DropFields Def

def dropFields( fieldNames : Seq[String], // The column names to drop. transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Mengembalikan DynamicFrame baru dengan kolom-kolom tertentu dihapus.

Anda dapat menggunakan metode ini untuk menghapus kolom bersarang, termasuk yang ada dalam array, tetapi tidak untuk membuang elemen array tertentu.

DropNulls Def

def dropNulls( transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 )

Mengembalikan DynamicFrame baru dengan semua kolom nol dihapus.

catatan

Ini hanya menghapus kolom tipe NullType. Nilai-nilai nol individu dalam kolom lain tidak dihapus atau dimodifikasi.

Bingkai Def errorsAsDynamic

def errorsAsDynamicFrame

Mengembalikan DynamicFrame baru yang berisi catatan kesalahan dari DynamicFrame.

Filter Def

def filter( f : DynamicRecord => Boolean, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Membangun DynamicFrame baru yang hanya berisi catatan-catatan yang untuknya fungsi 'f' mengembalikan true. Fungsi filter 'f' seharusnya tidak mengubah catatan masukan.

Def getName

def getName : String

Mengembalikan nama dari DynamicFrame ini.

Def getNumPartitions

def getNumPartitions

Mengembalikan jumlah partisi dalam DynamicFrame ini.

Def Dihitung getSchemaIf

def getSchemaIfComputed : Option[Schema]

Mengembalikan skema jika ia sudah dikomputasi. Tidak memindai data jika skema belum dikomputasi.

Def isSchemaComputed

def isSchemaComputed : Boolean

Mengembalikan true jika skema telah dikomputasi untuk DynamicFrame ini, atau false jika tidak. Jika metode ini mengembalikan false, maka memanggil metode schema akan mengharuskan pemberian lain atas catatan dalam DynamicFrame ini.

Def javaToPython

def javaToPython : JavaRDD[Array[Byte]]

Def bergabung

def join( keys1 : Seq[String], keys2 : Seq[String], frame2 : DynamicFrame, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • keys1 — Kolom dalam DynamicFrame ini yang akan digunakan untuk penggabungan.

  • keys2 — Kolom dalam frame2 yang akan digunakan untuk penggabungan. Harus memiliki panjang yang sama seperti keys1.

  • frame2DynamicFrame yang akan digabungkan padanya.

Mengembalikan hasil dari pelaksanaan equijoin dengan frame2 menggunakan kunci yang ditentukan.

Def peta

def map( f : DynamicRecord => DynamicRecord, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Mengembalikan DynamicFrame baru yang dibangun dengan menerapkan fungsi 'f' tertentu untuk setiap catatan dalam DynamicFrame.

Metode ini menyalin setiap catatan sebelum menerapkan fungsi yang ditentukan, sehingga aman untuk mengubah catatan. Jika fungsi pemetaan melempar pengecualian pada catatan tertentu, maka catatan yang ditandai sebagai kesalahan, dan jejak tumpukan disimpan sebagai sebuah kolom dalam catatan kesalahan.

Def mergeDynamicFrames

def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "", options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"), stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
  • stageDynamicFrame — Pentahapan DynamicFrame yang akan digabungkan.

  • primaryKeys — Daftar bidang kunci primer untuk mencocokkan catatan dari sumber dan pentahapan DynamicFrame.

  • transformationContext — Sebuah string unik yang digunakan untuk mengambil metadata tentang transformasi saat ini (opsional).

  • options — Sebuah string pasangan nama-nilai JSON yang memberikan informasi tambahan untuk transformasi ini.

  • callSite — Digunakan untuk menyediakan informasi konteks untuk pelaporan kesalahan.

  • stageThreshold — Sebuah Long. Jumlah kesalahan dalam transformasi yang ditentukan yang memerlukan pengolahan untuk membersihkan kesalahan.

  • totalThreshold — Sebuah Long. Jumlah kesalahan hingga dan termasuk dalam transformasi yang memerlukan pengolahan untuk membersihkan kesalahan.

Menggabungkan DynamicFrame ini dengan pentahapan DynamicFrame berdasarkan kunci primer yang ditentukan untuk mengidentifikasi catatan. Catatan duplikat (catatan dengan kunci primer yang sama) tidak di-deduplikasi. Jika tidak ada catatan yang cocok dalam bingkai pentahapan, semua catatan (termasuk duplikat) akan dipertahankan dari sumber. Jika bingkai pementasan memiliki catatan yang cocok, catatan dari bingkai pementasan menimpa catatan di sumber. AWS Glue

DynamicFrame yang dikembalikan berisi catatan A dalam kasus berikut:

  1. Jika A ada di bingkai sumber dan bingkai pentahapan, maka A dalam bingkai pentahapan akan dikembalikan.

  2. Jika A ada dalam tabel sumber dan A.primaryKeys tidak ada di stagingDynamicFrame (berarti A tidak diperbarui dalam tabel pentahapan).

Bingkai sumber dan bingkai pentahapan tidak perlu memiliki skema yang sama.

val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))

PrintSkema Cetak Def

def printSchema : Unit

Mencetak skema DynamicFrame ini ke stdout dalam format yang dapat dibaca manusia.

ReComputesChema Def

def recomputeSchema : Schema

Memaksakan komputasi ulang skema. Hal ini memerlukan pemindaian data, tapi mungkin akan "mengencangkan" skema jika ada beberapa bidang dalam skema saat ini yang tidak ada dalam data.

Mengembalikan skema yang telah dikomputasi ulang.

Def relasialisasi

def relationalize( rootTableName : String, stagingPath : String, options : JsonOptions = JsonOptions.empty, transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • rootTableName — Nama yang digunakan untuk DynamicFrame dasar dalam output. DynamicFrame yang dibuat oleh array berputar dimulai dengan ini sebagai prefiks.

  • stagingPath — Path Amazon Simple Storage Service (Amazon S3) untuk menulis data menengah.

  • options — Menghubungkan opsi dan konfigurasi. Saat ini tidak digunakan.

Meratakan semua struktur bersarang dan memutar array ke dalam tabel terpisah.

Anda dapat menggunakan operasi ini untuk mempersiapkan data yang sangat bersarang untuk penyerapan ke dalam basis data relasional. Struct bersarang diratakan dengan cara yang sama seperti transformasi Tidak bersarang. Selain itu, array yang diputar ke dalam tabel terpisah dengan masing-masing elemen array yang menjadi sebuah baris. Sebagai contoh, anggaplah Anda memiliki DynamicFrame dengan data berikut.

{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]} {"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]} {"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}

Jalankan kode berikut.

{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}

Hal ini menghasilkan dua tabel. Tabel pertama bernama "orang" dan isinya adalah sebagai berikut.

{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}

Di sini, array teman-teman telah diganti dengan kunci penggabungan yang dihasilkan secara otomatis. Sebuah tabel terpisah bernama people.friends dibuat dengan isi sebagai berikut.

{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}

Dalam tabel ini, 'id' adalah sebuah kunci penggabungan yang mengidentifikasi yang mencatat asal elemen array, 'index' mengacu pada posisi dalam array asli, dan 'val' adalah entri array yang sebenarnya.

Metode relationalize mengembalikan deret DynamicFrame yang dibuat dengan menerapkan proses ini secara rekursif pada semua array.

catatan

AWS GluePustaka secara otomatis menghasilkan kunci gabungan untuk tabel baru. Untuk memastikan bahwa kunci penggabungan bersifat unik di seluruh eksekusi tugas, Anda harus mengaktifkan bookmark tugas.

Def RenameField

def renameField( oldName : String, newName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • oldName — Nama asli kolom.

  • newName — Nama baru kolom.

Mengembalikan sebuah DynamicFrame baru dengan kolom tertentu yang diganti namanya.

Anda dapat menggunakan metode ini untuk mengganti nama bidang bersarang. Misalnya, kode berikut akan mengubah nama state menjadi state_code dalam struct alamat.

{{{ df.renameField("address.state", "address.state_code") }}}

Def repartisi

def repartition( numPartitions : Int, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Mengembalikan sebuah DynamicFrame baru dengan partisi numPartitions.

Def ResolveChoice

def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec], choiceOption : Option[ChoiceOption] = None, database : Option[String] = None, tableName : Option[String] = None, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • choiceOption — Sebuah tindakan yang akan diterapkan ke semua kolom ChoiceType yang tidak tercantum dalam urutan spesifikasi.

  • database — Basis data Katalog Data yang akan digunakan dengan tindakan match_catalog.

  • tableName — Tabel Katalog Data yang akan digunakan dengan tindakan match_catalog.

Mengembalikan sebuah DynamicFrame baru dengan mengganti satu atau beberapa ChoiceType dengan tipe yang lebih spesifik.

Ada dua cara untuk menggunakan resolveChoice. Yang pertama adalah menentukan urutan kolom tertentu dan cara mengubahnya. Hal ini ditentukan sebagai tupel yang terdiri dari pasangan (kolom, tindakan).

Berikut ini adalah tindakan yang mungkin:

  • cast:type — Upaya untuk mengubah semua nilai ke jenis tertentu.

  • make_cols — Mengkonversi setiap jenis yang berbeda menjadi kolom dengan nama columnName_type.

  • make_struct — Mengkonversi kolom menjadi struct dengan kunci untuk setiap jenis yang berbeda.

  • project:type — Mempertahankan hanya nilai-nilai dari jenis tertentu saja.

Mode lain untuk resolveChoice adalah untuk menentukan resolusi tunggal untuk semua ChoiceType. Anda dapat menggunakan ini dalam kasus di mana daftar ChoiceType lengkap tidak diketahui sebelum eksekusi. Selain tindakan-tindakan yang tercantum sebelumnya, mode ini juga mendukung tindakan berikut:

  • match_catalog — Upaya untuk mengubah setiap ChoiceType menjadi jenis yang sesuai dalam tabel katalog yang ditentukan.

Contoh:

Ubah kolom user.id dengan mengubahnya menjadi int, dan membuat bidang address hanya mempertahankan struct saja.

{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}

Ubah semua ChoiceType dengan mengkonversi setiap pilihan menjadi kolom terpisah.

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}

Ubah semua ChoiceType dengan mengubahnya menjadi jenis dalam tabel katalog yang ditentukan.

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}

Skema Def

def schema : Schema

Mengembalikan skema DynamicFrame ini.

Skema yang dikembalikan dijamin mengandung setiap bidang yang ada dalam catatan di DynamicFrame ini. Namun dalam sejumlah kecil kasus, di mana skema mungkin juga berisi bidang tambahan. Anda dapat menggunakan metode Tidak bersarang untuk "mengencangkan" skema berdasarkan catatan dalam DynamicFrame.

SelectField Def

def selectField( fieldName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Mengembalikan satu bidang sebagai sebuah DynamicFrame.

SelectFields Def

def selectFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • paths — Urutan nama kolom yang akan dipilih.

Mengembalikan sebuah DynamicFrame baru yang berisi kolom yang ditentukan.

catatan

Anda hanya dapat menggunakan metode selectFields untuk memilih kolom tingkat atas. Anda dapat menggunakan metode applyMapping untuk memilih kolom bersarang.

Pertunjukan def

def show( numRows : Int = 20 ) : Unit
  • numRows — Jumlah baris yang akan dicetak.

Mencetak baris dari DynamicFrame ini dalam format JSON.

Def SederhanaDDBJSON

Ekspor DynamoDB dengan AWS Glue konektor ekspor DynamoDB menghasilkan file JSON dari struktur bersarang tertentu. Untuk informasi selengkapnya, lihat Objek data. simplifyDDBJson Menyederhanakan kolom bersarang dalam jenis data ini, dan mengembalikan yang baru disederhanakan. DynamicFrame DynamicFrame Jika ada beberapa jenis atau tipe Peta yang terdapat dalam tipe Daftar, elemen dalam Daftar tidak akan disederhanakan. Metode ini hanya mendukung data dalam format JSON ekspor DynamoDB. Pertimbangkan unnest untuk melakukan perubahan serupa pada jenis data lainnya.

def simplifyDDBJson() : DynamicFrame

Metode ini tidak mengambil parameter apa pun.

Contoh masukan

Pertimbangkan skema berikut yang dihasilkan oleh ekspor DynamoDB:

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

Contoh kode

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "ddbTableARN", "dynamodb.s3.bucket" -> "exportBucketLocation", "dynamodb.s3.prefix" -> "exportBucketPrefix", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }

simplifyDDBJsonTransformasi akan menyederhanakan ini menjadi:

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

Keran Def

def spigot( path : String, options : JsonOptions = new JsonOptions("{}"), transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Transformasi passthrough yang mengembalikan catatan yang sama tetapi menulis subset dari catatan sebagai efek samping.

  • path — Path di Amazon S3 untuk menulis output, dalam bentuk s3://bucket//path.

  • options — Peta JsonOptions opsional yang menjelaskan perilaku pengambilan sampel.

Mengembalikan sebuah DynamicFrame yang berisi catatan yang sama seperti yang satu ini.

Secara default, menulis 100 catatan yang berubah-ubah ke lokasi yang ditentukan oleh path. Anda dapat menyesuaikan perilaku ini dengan menggunakan peta options. Kunci yang valid meliputi yang berikut ini:

  • topk — Menentukan jumlah total catatan yang ditulis. Secara default, nilainya adalah 100.

  • prob — Menentukan probabilitas (dalam desimal) bahwa catatan individu sudah disertakan. Default-nya adalah 1.

Misalnya, panggilan berikut akan mengambil sampel dari set data dengan memilih setiap catatan dengan probabilitas 20 persen dan berhenti setelah 200 catatan telah ditulis.

{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}

Def SplitFields

def splitFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • paths — Path yang akan disertakan dalam DynamicFrame pertama.

Mengembalikan deret dua DynamicFrame. DynamicFrame yang pertama berisi path yang ditentukan, dan yang kedua berisi semua kolom lainnya.

Contoh

Contoh ini mengambil DynamicFrame dibuat dari persons tabel dalam legislators database di Katalog Data AWS Glue dan membagi DynamicFrame menjadi dua, dengan bidang yang ditentukan masuk ke bidang pertama DynamicFrame dan yang tersisa menjadi yang kedua DynamicFrame. Contoh kemudian memilih yang pertama DynamicFrame dari hasilnya.

val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)

Def SplitRows

def splitRows( paths : Seq[String], values : Seq[Any], operators : Seq[String], transformationContext : String, callSite : CallSite, stageThreshold : Long, totalThreshold : Long ) : Seq[DynamicFrame]

Membagi baris berdasarkan predikat yang membandingkan kolom dengan konstanta.

  • paths — Kolom yang digunakan untuk perbandingan.

  • values — Nilai konstanta yang digunakan untuk perbandingan.

  • operators — Operator yang digunakan untuk perbandingan.

Mengembalikan deret dua DynamicFrame. Yang pertama berisi baris dengan predikat true yang kedua berisi baris dengan predikat false.

Predikat ditentukan dengan menggunakan tiga urutan: 'paths' berisi nama kolom (mungkin bersarang), 'values' berisi nilai-nilai konstanta yang akan dibandingkan, dan 'operators' berisi operator yang akan digunakan untuk perbandingan. Ketiga urutan harus sama panjangnya: operator ke-n digunakan untuk membandingkan kolom ke-n dengan nilai ke-n.

Setiap operator harus berupa salah satu dari "!=", "=", "<=", "<", ">=", atau ">".

Sebagai contoh, panggilan berikut akan membagi DynamicFrame sehingga bingkai output pertama akan berisi catatan dari orang-orang yang berusia di atas 65 dari Amerika Serikat, dan yang kedua akan berisi semua catatan lainnya.

{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq("&gt;=", "=")) }}}

Def stageErrorsCount

def stageErrorsCount

Mengembalikan jumlah catatan kesalahan yang dibuat saat melakukan komputasi pada DynamicFrame ini. Ini tidak termasuk kesalahan dari operasi sebelumnya yang dilewatkan ke DynamicFrame ini sebagai masukan.

Def ToDF

def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame

Mengkonversi DynamicFrame ini ke DataFrame Apache Spark SQL dengan skema dan catatan yang sama.

catatan

Karena DataFrame tidak mendukung ChoiceType, jadi metode ini secara otomatis mengkonversi kolom ChoiceType menjadi StructType. Untuk informasi selengkapnya dan opsi pengubahan pilihan, lihat resolveChoice.

Def membuka kotak

def unbox( path : String, format : String, optionString : String = "{}", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • path — Kolom yang akan diurai. Harus berupa string atau biner.

  • format — Format yang akan digunakan untuk penguraian.

  • optionString — Pilihan untuk memberikan ke format, seperti pemisah CSV.

Mengurai string atau kolom biner yang tertanam sesuai dengan format yang ditentukan. Kolom yang diurai disarangkan di bawah struct dengan nama kolom asli.

Misalnya, anggaplah Anda memiliki file CSV dengan kolom JSON yang tertanam.

name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...

Setelah penguraian awal, Anda akan mendapatkan DynamicFrame dengan skema berikut.

{{{ root |-- name: string |-- age: int |-- address: string }}}

Anda dapat memanggil unbox pada kolom alamat untuk mengurai komponen tertentu.

{{{ df.unbox("address", "json") }}}

hal ini akan memberi kita sebuah DynamicFrame dengan skema berikut.

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

Def tidak bersarang

def unnest( transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

Mengembalikan sebuah DynamicFrame baru dengan semua struktur bersarang yang sudah diratakan. Nama dibangun menggunakan karakter '.' (titik).

Sebagai contoh, anggaplah Anda memiliki DynamicFrame dengan skema berikut.

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

Panggilan berikut membuka sarang struct alamat.

{{{ df.unnest() }}}

Skema yang dihasilkan adalah sebagai berikut.

{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}

Metode ini juga akan membuka struct bersarang dalam array. Tetapi karena alasan riwayat, nama-nama bidang tersebut didahului dengan nama array yang dilampirkan dan ".val".

Def Unnestddbjson

unnestDDBJson(transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0): DynamicFrame

Unnests kolom bersarang di a DynamicFrame yang secara khusus dalam struktur DynamoDB JSON, dan mengembalikan unnested baru. DynamicFrame Kolom yang terdiri dari array tipe struct tidak akan di-unnested. Perhatikan bahwa ini adalah jenis transformasi unnesting tertentu yang berperilaku berbeda dari unnest transformasi biasa dan mengharuskan data sudah berada dalam struktur DynamoDB JSON. Untuk informasi selengkapnya, lihat DynamoDB JSON.

Misalnya, skema pembacaan ekspor dengan struktur DynamoDB JSON mungkin terlihat seperti berikut:

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

unnestDDBJson()Transformasi akan mengubah ini menjadi:

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

Contoh kode berikut menunjukkan cara menggunakan konektor ekspor AWS Glue DynamoDB, memanggil DynamoDB JSON unnest, dan mencetak jumlah partisi:

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }

Def withFrameSchema

def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
  • getSchema — Sebuah fungsi yang mengembalikan skema yang akan digunakan. Ditentukan sebagai fungsi nol-parameter untuk menunda komputasi yang berpotensi mahal.

Menetapkan skema dari DynamicFrame ini dengan nilai yang ditentukan. Hal ini terutama digunakan secara internal untuk menghindari komputasi ulang yang mahal pada skema. Skema yang dimasukkan harus berisi semua kolom yang ada dalam data.

Def withName

def withName( name : String ) : DynamicFrame
  • name — Nama baru yang akan digunakan.

Mengembalikan salinan dari DynamicFrame ini dengan nama baru.

Def withTransformationContext

def withTransformationContext( ctx : String ) : DynamicFrame

Mengembalikan salinan dari DynamicFrame ini dengan konteks transformasi yang ditentukan.