Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
AWS GlueKelas scala DynamicFrame
Package: com.amazonaws.services.glue
class DynamicFrame extends Serializable with Logging (
val glueContext : GlueContext,
_records : RDD[DynamicRecord],
val name : String = s"",
val transformationContext : String = DynamicFrame.UNDEFINED,
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0,
prevErrors : => Long = 0,
errorExpr : => Unit = {} )
Sebuah DynamicFrame
adalah koleksi terdistribusi dari objek DynamicRecord yang mendeskripsi secara mandiri.
DynamicFrame
dirancang untuk menyediakan model data yang fleksibel untuk operasi ETL (ekstrak, transformasi, dan muat). Mereka tidak mengharuskan membuat sebuah skema, dan Anda dapat menggunakannya untuk membaca dan melakukan transformasi pada data yang berisi nilai dan jenis yang berantakan atau tidak konsisten. Sebuah skema dapat dihitung berdasarkan permintaan untuk operasi-operasi yang membutuhkannya.
DynamicFrame
menyediakan berbagai transformasi untuk pembersihan data dan ETL. Mereka juga mendukung konversi ke dan dari DataFrames SparkSQL untuk diintegrasikan dengan kode yang ada dan banyak operasi analitik yang menyediakan. DataFrames
Parameter berikut dibagi di banyak AWS Glue transformasi yang membangun sDynamicFrame
:
transformationContext
— Pengidentifikasi untukDynamicFrame
ini.transformationContext
digunakan sebagai kunci untuk status bookmark tugas yang tetap ada di seluruh eksekusi.callSite
— Menyediakan informasi konteks untuk pelaporan kesalahan. Nilai-nilai ini secara otomatis ditetapkan ketika memanggil dari Python.stageThreshold
— Jumlah maksimum catatan kesalahan yang diizinkan dari komputasiDynamicFrame
ini sebelum melemparkan pengecualian, tidak termasuk catatan yang ada dalamDynamicFrame
sebelumnya.totalThreshold
— Jumlah maksimum total catatan kesalahan sebelum pengecualian dilemparkan, termasuk yang dari bingkai sebelumnya.
ErrorsCount Val
val errorsCount
Jumlah catatan kesalahan dalam DynamicFrame
ini. Ini termasuk kesalahan dari operasi sebelumnya.
Def ApplyMapping
def applyMapping( mappings : Seq[Product4[String, String, String, String]],
caseSensitive : Boolean = true,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
mappings
— Urutan pemetaan untuk membangunDynamicFrame
baru.caseSensitive
— Apakah akan memperlakukan kolom sumber sebagai kolom yang peka huruf besar dan kecil. Mengatur ini ke false mungkin membantu ketika mengintegrasikan dengan penyimpanan yang peka huruf besar dan kecil seperti Katalog Data Glue AWS.
Memilih, memproyeksikan, dan melemparkan kolom berdasarkan urutan pemetaan.
Setiap pemetaan terdiri dari kolom sumber dan jenis serta kolom target dan jenis. Pemetaan dapat ditentukan sebagai empat tupel (source_path
, source_type
, target_path
, target_type
) atau objek MappingSpec yang berisi informasi yang sama.
Selain menggunakan pemetaan untuk proyeksi dan transmisi sederhana, Anda dapat menggunakannya untuk melakukan nest atau membuka nest pada bidang dengan memisahkan komponen dari path dengan tanda '.
' (titik).
Sebagai contoh, anggaplah Anda memiliki DynamicFrame
dengan skema berikut.
{{{
root
|-- name: string
|-- age: int
|-- address: struct
| |-- state: string
| |-- zip: int
}}}
Anda dapat membuat panggilan berikut untuk membuka sarang dari bidang state
dan zip
.
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("address.state", "string", "state", "string"),
("address.zip", "int", "zip", "int")))
}}}
Skema yang dihasilkan adalah sebagai berikut.
{{{
root
|-- name: string
|-- age: int
|-- state: string
|-- zip: int
}}}
Anda juga dapat menggunakan applyMapping
untuk melakukan nest kembali pada kolom. Sebagai contoh, berikut ini membalikkan transformasi sebelumnya dan membuat sebuah struct bernama address
dalam target.
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("state", "string", "address.state", "string"),
("zip", "int", "address.zip", "int")))
}}}
Nama kolom yang berisi karakter '.
' (titik) dapat dikutip dengan menggunakan karakter backtick (``
).
catatan
Saat ini, Anda tidak dapat menggunakan metode applyMapping
untuk memetakan kolom yang bersarang di bawah array.
Def assertErrorThreshold
def assertErrorThreshold : Unit
Tindakan yang memaksa perhitungan dan memverifikasi bahwa jumlah catatan kesalahan berada di bawah stageThreshold
dan totalThreshold
. Melempar pengecualian jika salah satu kondisi gagal.
Hitungan def
lazy
def count
Mengembalikan jumlah elemen dalam DynamicFrame
ini.
DropField Def
def dropField( path : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Mengembalikan DynamicFrame
baru dengan kolom tertentu dihapus.
DropFields Def
def dropFields( fieldNames : Seq[String], // The column names to drop.
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Mengembalikan DynamicFrame
baru dengan kolom-kolom tertentu dihapus.
Anda dapat menggunakan metode ini untuk menghapus kolom bersarang, termasuk yang ada dalam array, tetapi tidak untuk membuang elemen array tertentu.
DropNulls Def
def dropNulls( transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0 )
Mengembalikan DynamicFrame
baru dengan semua kolom nol dihapus.
catatan
Ini hanya menghapus kolom tipe NullType
. Nilai-nilai nol individu dalam kolom lain tidak dihapus atau dimodifikasi.
Bingkai Def errorsAsDynamic
def errorsAsDynamicFrame
Mengembalikan DynamicFrame
baru yang berisi catatan kesalahan dari DynamicFrame
.
Filter Def
def filter( f : DynamicRecord => Boolean,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Membangun DynamicFrame
baru yang hanya berisi catatan-catatan yang untuknya fungsi 'f
' mengembalikan true
. Fungsi filter 'f
' seharusnya tidak mengubah catatan masukan.
Def getName
def getName : String
Mengembalikan nama dari DynamicFrame
ini.
Def getNumPartitions
def getNumPartitions
Mengembalikan jumlah partisi dalam DynamicFrame
ini.
Def Dihitung getSchemaIf
def getSchemaIfComputed : Option[Schema]
Mengembalikan skema jika ia sudah dikomputasi. Tidak memindai data jika skema belum dikomputasi.
Def isSchemaComputed
def isSchemaComputed : Boolean
Mengembalikan true
jika skema telah dikomputasi untuk DynamicFrame
ini, atau false
jika tidak. Jika metode ini mengembalikan false, maka memanggil metode schema
akan mengharuskan pemberian lain atas catatan dalam DynamicFrame
ini.
Def javaToPython
def javaToPython : JavaRDD[Array[Byte]]
Def bergabung
def join( keys1 : Seq[String],
keys2 : Seq[String],
frame2 : DynamicFrame,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
keys1
— Kolom dalamDynamicFrame
ini yang akan digunakan untuk penggabungan.keys2
— Kolom dalamframe2
yang akan digunakan untuk penggabungan. Harus memiliki panjang yang sama sepertikeys1
.frame2
—DynamicFrame
yang akan digabungkan padanya.
Mengembalikan hasil dari pelaksanaan equijoin dengan frame2
menggunakan kunci yang ditentukan.
Def peta
def map( f : DynamicRecord => DynamicRecord,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Mengembalikan DynamicFrame
baru yang dibangun dengan menerapkan fungsi 'f
' tertentu untuk setiap catatan dalam DynamicFrame
.
Metode ini menyalin setiap catatan sebelum menerapkan fungsi yang ditentukan, sehingga aman untuk mengubah catatan. Jika fungsi pemetaan melempar pengecualian pada catatan tertentu, maka catatan yang ditandai sebagai kesalahan, dan jejak tumpukan disimpan sebagai sebuah kolom dalam catatan kesalahan.
Def mergeDynamicFrames
def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "",
options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"),
stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
stageDynamicFrame
— PentahapanDynamicFrame
yang akan digabungkan.primaryKeys
— Daftar bidang kunci primer untuk mencocokkan catatan dari sumber dan pentahapanDynamicFrame
.transformationContext
— Sebuah string unik yang digunakan untuk mengambil metadata tentang transformasi saat ini (opsional).options
— Sebuah string pasangan nama-nilai JSON yang memberikan informasi tambahan untuk transformasi ini.callSite
— Digunakan untuk menyediakan informasi konteks untuk pelaporan kesalahan.stageThreshold
— SebuahLong
. Jumlah kesalahan dalam transformasi yang ditentukan yang memerlukan pengolahan untuk membersihkan kesalahan.totalThreshold
— SebuahLong
. Jumlah kesalahan hingga dan termasuk dalam transformasi yang memerlukan pengolahan untuk membersihkan kesalahan.
Menggabungkan DynamicFrame
ini dengan pentahapan DynamicFrame
berdasarkan kunci primer yang ditentukan untuk mengidentifikasi catatan. Catatan duplikat (catatan dengan kunci primer yang sama) tidak di-deduplikasi. Jika tidak ada catatan yang cocok dalam bingkai pentahapan, semua catatan (termasuk duplikat) akan dipertahankan dari sumber. Jika bingkai pementasan memiliki catatan yang cocok, catatan dari bingkai pementasan menimpa catatan di sumber. AWS Glue
DynamicFrame
yang dikembalikan berisi catatan A dalam kasus berikut:
Jika
A
ada di bingkai sumber dan bingkai pentahapan, makaA
dalam bingkai pentahapan akan dikembalikan.Jika
A
ada dalam tabel sumber danA.primaryKeys
tidak ada distagingDynamicFrame
(berartiA
tidak diperbarui dalam tabel pentahapan).
Bingkai sumber dan bingkai pentahapan tidak perlu memiliki skema yang sama.
val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))
PrintSkema Cetak Def
def printSchema : Unit
Mencetak skema DynamicFrame
ini ke stdout
dalam format yang dapat dibaca manusia.
ReComputesChema Def
def recomputeSchema : Schema
Memaksakan komputasi ulang skema. Hal ini memerlukan pemindaian data, tapi mungkin akan "mengencangkan" skema jika ada beberapa bidang dalam skema saat ini yang tidak ada dalam data.
Mengembalikan skema yang telah dikomputasi ulang.
Def relasialisasi
def relationalize( rootTableName : String,
stagingPath : String,
options : JsonOptions = JsonOptions.empty,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
rootTableName
— Nama yang digunakan untukDynamicFrame
dasar dalam output.DynamicFrame
yang dibuat oleh array berputar dimulai dengan ini sebagai prefiks.stagingPath
— Path Amazon Simple Storage Service (Amazon S3) untuk menulis data menengah.options
— Menghubungkan opsi dan konfigurasi. Saat ini tidak digunakan.
Meratakan semua struktur bersarang dan memutar array ke dalam tabel terpisah.
Anda dapat menggunakan operasi ini untuk mempersiapkan data yang sangat bersarang untuk penyerapan ke dalam basis data relasional. Struct bersarang diratakan dengan cara yang sama seperti transformasi Tidak bersarang. Selain itu, array yang diputar ke dalam tabel terpisah dengan masing-masing elemen array yang menjadi sebuah baris. Sebagai contoh, anggaplah Anda memiliki DynamicFrame
dengan data berikut.
{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]}
{"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]}
{"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}
Jalankan kode berikut.
{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}
Hal ini menghasilkan dua tabel. Tabel pertama bernama "orang" dan isinya adalah sebagai berikut.
{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}
Di sini, array teman-teman telah diganti dengan kunci penggabungan yang dihasilkan secara otomatis. Sebuah tabel terpisah bernama people.friends
dibuat dengan isi sebagai berikut.
{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}
Dalam tabel ini, 'id
' adalah sebuah kunci penggabungan yang mengidentifikasi yang mencatat asal elemen array, 'index
' mengacu pada posisi dalam array asli, dan 'val
' adalah entri array yang sebenarnya.
Metode relationalize
mengembalikan deret DynamicFrame
yang dibuat dengan menerapkan proses ini secara rekursif pada semua array.
catatan
AWS GluePustaka secara otomatis menghasilkan kunci gabungan untuk tabel baru. Untuk memastikan bahwa kunci penggabungan bersifat unik di seluruh eksekusi tugas, Anda harus mengaktifkan bookmark tugas.
Def RenameField
def renameField( oldName : String,
newName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
oldName
— Nama asli kolom.newName
— Nama baru kolom.
Mengembalikan sebuah DynamicFrame
baru dengan kolom tertentu yang diganti namanya.
Anda dapat menggunakan metode ini untuk mengganti nama bidang bersarang. Misalnya, kode berikut akan mengubah nama state
menjadi state_code
dalam struct alamat.
{{{ df.renameField("address.state", "address.state_code") }}}
Def repartisi
def repartition( numPartitions : Int,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Mengembalikan sebuah DynamicFrame
baru dengan partisi numPartitions
.
Def ResolveChoice
def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec],
choiceOption : Option[ChoiceOption] = None,
database : Option[String] = None,
tableName : Option[String] = None,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
choiceOption
— Sebuah tindakan yang akan diterapkan ke semua kolomChoiceType
yang tidak tercantum dalam urutan spesifikasi.database
— Basis data Katalog Data yang akan digunakan dengan tindakanmatch_catalog
.tableName
— Tabel Katalog Data yang akan digunakan dengan tindakanmatch_catalog
.
Mengembalikan sebuah DynamicFrame
baru dengan mengganti satu atau beberapa ChoiceType
dengan tipe yang lebih spesifik.
Ada dua cara untuk menggunakan resolveChoice
. Yang pertama adalah menentukan urutan kolom tertentu dan cara mengubahnya. Hal ini ditentukan sebagai tupel yang terdiri dari pasangan (kolom, tindakan).
Berikut ini adalah tindakan yang mungkin:
cast:type
— Upaya untuk mengubah semua nilai ke jenis tertentu.make_cols
— Mengkonversi setiap jenis yang berbeda menjadi kolom dengan namacolumnName_type
.make_struct
— Mengkonversi kolom menjadi struct dengan kunci untuk setiap jenis yang berbeda.project:type
— Mempertahankan hanya nilai-nilai dari jenis tertentu saja.
Mode lain untuk resolveChoice
adalah untuk menentukan resolusi tunggal untuk semua ChoiceType
. Anda dapat menggunakan ini dalam kasus di mana daftar ChoiceType
lengkap tidak diketahui sebelum eksekusi. Selain tindakan-tindakan yang tercantum sebelumnya, mode ini juga mendukung tindakan berikut:
match_catalog
— Upaya untuk mengubah setiapChoiceType
menjadi jenis yang sesuai dalam tabel katalog yang ditentukan.
Contoh:
Ubah kolom user.id
dengan mengubahnya menjadi int, dan membuat bidang address
hanya mempertahankan struct saja.
{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}
Ubah semua ChoiceType
dengan mengkonversi setiap pilihan menjadi kolom terpisah.
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}
Ubah semua ChoiceType
dengan mengubahnya menjadi jenis dalam tabel katalog yang ditentukan.
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}
Skema Def
def schema : Schema
Mengembalikan skema DynamicFrame
ini.
Skema yang dikembalikan dijamin mengandung setiap bidang yang ada dalam catatan di DynamicFrame
ini. Namun dalam sejumlah kecil kasus, di mana skema mungkin juga berisi bidang tambahan. Anda dapat menggunakan metode Tidak bersarang untuk "mengencangkan" skema berdasarkan catatan dalam DynamicFrame
.
SelectField Def
def selectField( fieldName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Mengembalikan satu bidang sebagai sebuah DynamicFrame
.
SelectFields Def
def selectFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
paths
— Urutan nama kolom yang akan dipilih.
Mengembalikan sebuah DynamicFrame
baru yang berisi kolom yang ditentukan.
catatan
Anda hanya dapat menggunakan metode selectFields
untuk memilih kolom tingkat atas. Anda dapat menggunakan metode applyMapping untuk memilih kolom bersarang.
Pertunjukan def
def show( numRows : Int = 20 ) : Unit
numRows
— Jumlah baris yang akan dicetak.
Mencetak baris dari DynamicFrame
ini dalam format JSON.
Def SederhanaDDBJSON
Ekspor DynamoDB dengan AWS Glue konektor ekspor DynamoDB menghasilkan file JSON dari struktur bersarang tertentu. Untuk informasi selengkapnya, lihat Objek data. simplifyDDBJson
Menyederhanakan kolom bersarang dalam jenis data ini, dan mengembalikan yang baru disederhanakan. DynamicFrame DynamicFrame Jika ada beberapa jenis atau tipe Peta yang terdapat dalam tipe Daftar, elemen dalam Daftar tidak akan disederhanakan. Metode ini hanya mendukung data dalam format JSON ekspor DynamoDB. Pertimbangkan unnest
untuk melakukan perubahan serupa pada jenis data lainnya.
def simplifyDDBJson() : DynamicFrame
Metode ini tidak mengambil parameter apa pun.
Contoh masukan
Pertimbangkan skema berikut yang dihasilkan oleh ekspor DynamoDB:
root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean
Contoh kode
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "
ddbTableARN
", "dynamodb.s3.bucket" -> "exportBucketLocation
", "dynamodb.s3.prefix" -> "exportBucketPrefix
", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID
", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }
simplifyDDBJson
Transformasi akan menyederhanakan ini menjadi:
root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
Keran Def
def spigot( path : String,
options : JsonOptions = new JsonOptions("{}"),
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Transformasi passthrough yang mengembalikan catatan yang sama tetapi menulis subset dari catatan sebagai efek samping.
path
— Path di Amazon S3 untuk menulis output, dalam bentuks3://bucket//path
.options
— PetaJsonOptions
opsional yang menjelaskan perilaku pengambilan sampel.
Mengembalikan sebuah DynamicFrame
yang berisi catatan yang sama seperti yang satu ini.
Secara default, menulis 100 catatan yang berubah-ubah ke lokasi yang ditentukan oleh path
. Anda dapat menyesuaikan perilaku ini dengan menggunakan peta options
. Kunci yang valid meliputi yang berikut ini:
topk
— Menentukan jumlah total catatan yang ditulis. Secara default, nilainya adalah 100.prob
— Menentukan probabilitas (dalam desimal) bahwa catatan individu sudah disertakan. Default-nya adalah 1.
Misalnya, panggilan berikut akan mengambil sampel dari set data dengan memilih setiap catatan dengan probabilitas 20 persen dan berhenti setelah 200 catatan telah ditulis.
{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}
Def SplitFields
def splitFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
paths
— Path yang akan disertakan dalamDynamicFrame
pertama.
Mengembalikan deret dua DynamicFrame
. DynamicFrame
yang pertama berisi path yang ditentukan, dan yang kedua berisi semua kolom lainnya.
Contoh
Contoh ini mengambil DynamicFrame dibuat dari persons
tabel dalam legislators
database di Katalog Data AWS Glue dan membagi DynamicFrame menjadi dua, dengan bidang yang ditentukan masuk ke bidang pertama DynamicFrame dan yang tersisa menjadi yang kedua DynamicFrame. Contoh kemudian memilih yang pertama DynamicFrame dari hasilnya.
val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)
Def SplitRows
def splitRows( paths : Seq[String],
values : Seq[Any],
operators : Seq[String],
transformationContext : String,
callSite : CallSite,
stageThreshold : Long,
totalThreshold : Long
) : Seq[DynamicFrame]
Membagi baris berdasarkan predikat yang membandingkan kolom dengan konstanta.
paths
— Kolom yang digunakan untuk perbandingan.values
— Nilai konstanta yang digunakan untuk perbandingan.operators
— Operator yang digunakan untuk perbandingan.
Mengembalikan deret dua DynamicFrame
. Yang pertama berisi baris dengan predikat true yang kedua berisi baris dengan predikat false.
Predikat ditentukan dengan menggunakan tiga urutan: 'paths
' berisi nama kolom (mungkin bersarang), 'values
' berisi nilai-nilai konstanta yang akan dibandingkan, dan 'operators
' berisi operator yang akan digunakan untuk perbandingan. Ketiga urutan harus sama panjangnya: operator ke-n
digunakan untuk membandingkan kolom ke-n
dengan nilai ke-n
.
Setiap operator harus berupa salah satu dari "!=
", "=
", "<=
", "<
", ">=
", atau ">
".
Sebagai contoh, panggilan berikut akan membagi DynamicFrame
sehingga bingkai output pertama akan berisi catatan dari orang-orang yang berusia di atas 65 dari Amerika Serikat, dan yang kedua akan berisi semua catatan lainnya.
{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq(">=", "=")) }}}
Def stageErrorsCount
def stageErrorsCount
Mengembalikan jumlah catatan kesalahan yang dibuat saat melakukan komputasi pada DynamicFrame
ini. Ini tidak termasuk kesalahan dari operasi sebelumnya yang dilewatkan ke DynamicFrame
ini sebagai masukan.
Def ToDF
def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame
Mengkonversi DynamicFrame
ini ke DataFrame
Apache Spark SQL dengan skema dan catatan yang sama.
catatan
Karena DataFrame
tidak mendukung ChoiceType
, jadi metode ini secara otomatis mengkonversi kolom ChoiceType
menjadi StructType
. Untuk informasi selengkapnya dan opsi pengubahan pilihan, lihat resolveChoice.
Def membuka kotak
def unbox( path : String,
format : String,
optionString : String = "{}",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
path
— Kolom yang akan diurai. Harus berupa string atau biner.format
— Format yang akan digunakan untuk penguraian.optionString
— Pilihan untuk memberikan ke format, seperti pemisah CSV.
Mengurai string atau kolom biner yang tertanam sesuai dengan format yang ditentukan. Kolom yang diurai disarangkan di bawah struct dengan nama kolom asli.
Misalnya, anggaplah Anda memiliki file CSV dengan kolom JSON yang tertanam.
name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...
Setelah penguraian awal, Anda akan mendapatkan DynamicFrame
dengan skema berikut.
{{{ root |-- name: string |-- age: int |-- address: string }}}
Anda dapat memanggil unbox
pada kolom alamat untuk mengurai komponen tertentu.
{{{ df.unbox("address", "json") }}}
hal ini akan memberi kita sebuah DynamicFrame
dengan skema berikut.
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
Def tidak bersarang
def unnest( transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Mengembalikan sebuah DynamicFrame
baru dengan semua struktur bersarang yang sudah diratakan. Nama dibangun menggunakan karakter '.
' (titik).
Sebagai contoh, anggaplah Anda memiliki DynamicFrame
dengan skema berikut.
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
Panggilan berikut membuka sarang struct alamat.
{{{ df.unnest() }}}
Skema yang dihasilkan adalah sebagai berikut.
{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}
Metode ini juga akan membuka struct bersarang dalam array. Tetapi karena alasan riwayat, nama-nama bidang tersebut didahului dengan nama array yang dilampirkan dan ".val
".
Def Unnestddbjson
unnestDDBJson(transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0): DynamicFrame
Unnests kolom bersarang di a DynamicFrame
yang secara khusus dalam struktur DynamoDB JSON, dan mengembalikan unnested baru. DynamicFrame
Kolom yang terdiri dari array tipe struct tidak akan di-unnested. Perhatikan bahwa ini adalah jenis transformasi unnesting tertentu yang berperilaku berbeda dari unnest
transformasi biasa dan mengharuskan data sudah berada dalam struktur DynamoDB JSON. Untuk informasi selengkapnya, lihat DynamoDB JSON.
Misalnya, skema pembacaan ekspor dengan struktur DynamoDB JSON mungkin terlihat seperti berikut:
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
unnestDDBJson()
Transformasi akan mengubah ini menjadi:
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
Contoh kode berikut menunjukkan cara menggunakan konektor ekspor AWS Glue DynamoDB, memanggil DynamoDB JSON unnest, dan mencetak jumlah partisi:
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }
Def withFrameSchema
def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
getSchema
— Sebuah fungsi yang mengembalikan skema yang akan digunakan. Ditentukan sebagai fungsi nol-parameter untuk menunda komputasi yang berpotensi mahal.
Menetapkan skema dari DynamicFrame
ini dengan nilai yang ditentukan. Hal ini terutama digunakan secara internal untuk menghindari komputasi ulang yang mahal pada skema. Skema yang dimasukkan harus berisi semua kolom yang ada dalam data.
Def withName
def withName( name : String ) : DynamicFrame
name
— Nama baru yang akan digunakan.
Mengembalikan salinan dari DynamicFrame
ini dengan nama baru.
Def withTransformationContext
def withTransformationContext( ctx : String ) : DynamicFrame
Mengembalikan salinan dari DynamicFrame
ini dengan konteks transformasi yang ditentukan.