Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
AWS GlueAPI Scala GlueContext
Package: com.amazonaws.services.glue
class GlueContext extends SQLContext(sc) (
@transient val sc : SparkContext,
val defaultSourcePartitioner : PartitioningStrategy )
GlueContext
adalah titik masuk untuk membaca dan menulis DynamicFrame dari dan ke Amazon Simple Storage Service (Amazon S3), Katalog Data Glue AWS , JDBC, dan sebagainya. Kelas ini menyediakan fungsi utilitas untuk membuat objek DataSource sifat dan DataSink yang pada gilirannya dapat digunakan untuk membaca dan menulis DynamicFrame
.
Anda juga dapat menggunakan GlueContext
untuk menetapkan target jumlah partisi (default 20) di DynamicFrame
jika jumlah partisi yang dibuat dari sumber kurang dari ambang batas minimum untuk partisi (default 10).
Kolom def addIngestionTime
def addIngestionTimeColumns(
df : DataFrame,
timeGranularity : String = "") : dataFrame
Menambahkan kolom waktu penyerapan seperti ingest_year
, ingest_month
, ingest_day
, ingest_hour
, ingest_minute
ke input DataFrame
. Fungsi ini secara otomatis dihasilkan dalam skrip yang dihasilkan oleh AWS Glue saat Anda menentukan tabel Katalog Data dengan Amazon S3 sebagai target. Fungsi ini secara otomatis memperbarui partisi dengan kolom waktu penyerapan pada tabel output. Hal ini memungkinkan data output dipartisi secara otomatis pada waktu penyerapan tanpa memerlukan kolom waktu penyerapan eksplisit dalam data input.
-
dataFrame
—dataFrame
yang akan ditambahi dengan kolom waktu penyerapan. -
timeGranularity
— Kedetailan dari kolom waktu. Nilai yang benar adalah "day
", "hour
" dan "minute
". Misalnya, jika "hour
" diberikan dalam fungsi, makadataFrame
asli akan memiliki kolom waktu "ingest_year
", "ingest_month
", "ingest_day
", dan "ingest_hour
" yang ditambahkan.
Mengembalikan bingkai data setelah menambahkan kolom kedetailan waktu.
Contoh:
glueContext.addIngestionTimeColumns(dataFrame, "hour")
def createDataFrame FromOptions
def createDataFrameFromOptions( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Mengembalikan sebuah DataFrame
dibuat dengan koneksi dan format yang ditentukan. Gunakan fungsi ini hanya dengan sumber streaming AWS Glue.
connectionType
— Jenis koneksi streaming. Nilai yang valid mencakupkinesis
dankafka
.-
connectionOptions
— Opsi koneksi, yang berbeda untuk Kinesis dan Kafka. Anda dapat menemukan daftar semua opsi koneksi untuk setiap sumber data streaming di Jenis dan opsi koneksi untuk ETL di AWS Glue untuk Spark. Perhatikan perbedaan berikut dalam pilihan koneksi streaming:-
Sumber streaming Kinesis memerlukan
streamARN
,startingPosition
,inferSchema
, danclassification
. -
Sumber streaming Kafka membutuhkan
connectionName
,topicName
,startingOffsets
,inferSchema
, danclassification
.
-
transformationContext
— Konteks transformasi yang akan digunakan (opsional).format
- Spesifikasi format (opsional). Ini digunakan untuk Amazon S3 atau AWS Glue koneksi yang mendukung berbagai format. Untuk informasi tentang format yang didukung, lihat Opsi format data untuk input dan output untuk Spark AWS GlueformatOptions
— Opsi format untuk format yang ditentukan. Untuk informasi tentang pilihan format yang didukung, lihat Opsi format data.
Contoh untuk sumber streaming Amazon Kinesis:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))
Contoh untuk sumber streaming Kafka:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))
forEachBatch
forEachBatch(frame, batch_function, options)
Menerapkan batch_function
yang diberikan ke setiap batch mikro yang dibaca dari sumber Streaming.
-
frame
— Yang DataFrame berisi batch mikro saat ini. -
batch_function
— Sebuah fungsi yang akan diterapkan untuk setiap batch mikro. -
options
— Kumpulan pasangan kunci-nilai yang menyimpan informasi tentang cara memproses batch mikro. Opsi-opsi berikut diperlukan:-
windowSize
— Jumlah waktu yang diperlukan untuk pemrosesan setiap batch. -
checkpointLocation
— Lokasi di mana pos pemeriksaan disimpan untuk tugas ETL streaming. -
batchMaxRetries
— Jumlah waktu maksimum untuk mengulang mencoba batch sekali lagi jika gagal. Nilai default-nya adalah 3. Opsi ini hanya dapat dikonfigurasi untuk Glue versi 2.0 dan di atasnya.
-
Contoh:
glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))
def getCatalogSink
def getCatalogSink( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSink
Membuat sebuah DataSink yang menulis ke lokasi yang ditentukan dalam tabel yang didefinisikan dalam Katalog Data.
database
— Nama basis data dalam Katalog Data.tableName
— Nama tabel dalam Katalog Data.redshiftTmpDir
— Direktori pentahapan sementara yang akan digunakan dengan data sink tertentu. Diatur ke kosong secara default.transformationContext
— Konteks transformasi yang dikaitkan dengan sink yang akan digunakan oleh bookmark tugas. Diatur ke kosong secara default.additionalOptions
— Opsi tambahan yang disediakan untukAWS Glue.catalogId
— ID katalog (ID akun) dari Katalog Data yang sedang diakses. Bila nol, maka ID akun default pemanggil yang akan digunakan.
Mengembalikan DataSink
.
def getCatalogSource
def getCatalogSource( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
pushDownPredicate : String = " "
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSource
Membuat sebuah DataSource sifat yang membaca data dari tabel definisi dalam Katalog Data.
database
— Nama basis data dalam Katalog Data.tableName
— Nama tabel dalam Katalog Data.redshiftTmpDir
— Direktori pentahapan sementara yang akan digunakan dengan data sink tertentu. Diatur ke kosong secara default.transformationContext
— Konteks transformasi yang dikaitkan dengan sink yang akan digunakan oleh bookmark tugas. Diatur ke kosong secara default.pushDownPredicate
— Memfilter partisi tanpa harus mencantumkan dan membaca semua file dalam set data Anda. Untuk informasi selengkapnya, lihat Pra-penyaringan menggunakan predikat pushdown.additionalOptions
— Kumpulan pasangan nama-nilai opsional. Opsi yang mungkin adalah opsi-opsi yang tercantum dalam Jenis dan opsi koneksi untuk ETL di AWS Glue untuk Spark kecualiendpointUrl
,streamName
,bootstrap.servers
,security.protocol
,topicName
,classification
, dandelimiter
. Opsi lain yang didukung adalahcatalogPartitionPredicate
:catalogPartitionPredicate
— Anda dapat meneruskan ekspresi katalog untuk memfilter berdasarkan kolom indeks. Ini mendorong penyaringan ke sisi server. Untuk informasi selengkapnya, lihat Indeks AWS Glue Partisi. Perhatikan itupush_down_predicate
dancatalogPartitionPredicate
gunakan sintaks yang berbeda. Yang pertama menggunakan sintaks standar Spark SQL dan yang kemudian menggunakan parser JSQL.catalogId
— ID katalog (ID akun) dari Katalog Data yang sedang diakses. Bila nol, maka ID akun default pemanggil yang akan digunakan.
Mengembalikan DataSource
.
Contoh untuk sumber streaming
val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()
def getJDBCSink
def getJDBCSink( catalogConnection : String,
options : JsonOptions,
redshiftTmpDir : String = "",
transformationContext : String = "",
catalogId: String = null
) : DataSink
Membuat sebuah DataSink yang menulis ke basis data JDBC yang ditentukan dalam objek Connection
dalam Katalog Data. Objek Connection
memiliki informasi untuk terhubung ke sbuah sink JDBC, termasuk URL, nama pengguna, kata sandi, VPC, subnet, dan grup keamanan.
catalogConnection
— Nama koneksi dalam Katalog Data yang berisi URL JDBC yang akan ditulisi.options
— Sebuah string pasangan nama-nilai JSON yang memberikan informasi tambahan yang diperlukan untuk menulis ke penyimpanan data JDBC. Hal ini mencakup:dbtable (wajib) — Nama tabel JDBC. Untuk penyimpanan data JDBC yang mendukung skema dalam basis data, tentukan
schema.table-name
. Jika skema tidak disediakan, maka skema "publik" default digunakan. Contoh berikut menunjukkan parameter pilihan yang mengarahkan ke skema bernamatest
dan sebuah tabel bernamatest_table
dalam basis datatest_db
.options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
database (wajib) — Nama basis data JDBC.
Setiap pilihan tambahan diberikan langsung ke penulis JDBC SparkSQL. Untuk informasi selengkapnya, lihat Sumber data Redshift untuk Spark
.
redshiftTmpDir
— Sebuah direktori pentahapan sementara yang akan digunakan dengan data sink tertentu. Diatur ke kosong secara default.transformationContext
— Konteks transformasi yang dikaitkan dengan sink yang akan digunakan oleh bookmark tugas. Diatur ke kosong secara default.catalogId
— ID katalog (ID akun) dari Katalog Data yang sedang diakses. Bila nol, maka ID akun default pemanggil yang akan digunakan.
Kode contoh:
getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")
Mengembalikan DataSink
.
def getSink
def getSink( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
) : DataSink
Membuat file DataSink yang menulis data ke tujuan seperti Amazon Simple Storage Service (Amazon S3), JDBC, atau Glue Data Catalog, atau AWS aliran data Apache Kafka atau Amazon Kinesis.
-
connectionType
— Jenis koneksi. Lihat Jenis dan opsi koneksi untuk ETL di AWS Glue untuk Spark. -
connectionOptions
— Sebuah string pasangan nama-nilai JSON yang memberikan informasi tambahan untuk membangun koneksi dengan data sink. Lihat Jenis dan opsi koneksi untuk ETL di AWS Glue untuk Spark. -
transformationContext
— Konteks transformasi yang dikaitkan dengan sink yang akan digunakan oleh bookmark tugas. Diatur ke kosong secara default.
Mengembalikan DataSink
.
Format def getSinkWith
def getSinkWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSink
Membuat DataSink yang menulis data ke tujuan seperti Amazon S3, JDBC, atau Katalog Data, atau aliran data Apache Kafka atau Amazon Kinesis. Juga menetapkan format untuk data yang akan ditulis ke tujuan.
connectionType
— Jenis koneksi. Lihat Jenis dan opsi koneksi untuk ETL di AWS Glue untuk Spark.-
options
— Sebuah string pasangan nama-nilai JSON yang memberikan informasi tambahan untuk membangun sebuah koneksi dengan data sink. Lihat Jenis dan opsi koneksi untuk ETL di AWS Glue untuk Spark. transformationContext
— Konteks transformasi yang dikaitkan dengan sink yang akan digunakan oleh bookmark tugas. Diatur ke kosong secara default.format
— Format data yang akan ditulis ke tujuan.formatOptions
— Sebuah string pasangan nama-nilai JSON yang menyediakan opsi tambahan untuk memformat data di tujuan. Lihat Opsi format data.
Mengembalikan DataSink
.
def getSource
def getSource( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
pushDownPredicate
) : DataSource
Membuat DataSource sifat yang membaca data dari sumber seperti Amazon S3, JDBC, atau Glue AWS Data Catalog. Juga mendukung sumber data streaming Kafka dan Kinesis.
connectionType
— Jenis sumber data. Lihat Jenis dan opsi koneksi untuk ETL di AWS Glue untuk Spark.-
connectionOptions
— Sebuah string pasangan nama-nilai JSON yang memberikan informasi tambahan untuk membangun sebuah koneksi dengan sumber data. Untuk informasi selengkapnya, lihat Jenis dan opsi koneksi untuk ETL di AWS Glue untuk Spark.Sumber streaming Kinesis memerlukan opsi koneksi berikut:
streamARN
,startingPosition
,inferSchema
, danclassification
.Sumber streaming Kafka membutuhkan pilihan koneksi berikut:
connectionName
,topicName
,startingOffsets
,inferSchema
, danclassification
. transformationContext
— Konteks transformasi yang dikaitkan dengan sink yang akan digunakan oleh bookmark tugas. Diatur ke kosong secara default.pushDownPredicate
— Predikat pada kolom partisi.
Mengembalikan DataSource
.
Contoh untuk sumber streaming Amazon Kinesis:
val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
Contoh untuk sumber streaming Kafka:
val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
Format def getSourceWith
def getSourceWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Membuat DataSource sifat yang membaca data dari sumber seperti Amazon S3, JDBC, atau AWS Glue Data Catalog, dan juga menetapkan format data yang disimpan dalam sumber.
connectionType
— Jenis sumber data. Lihat Jenis dan opsi koneksi untuk ETL di AWS Glue untuk Spark.-
options
— Sebuah string pasangan nama-nilai JSON yang memberikan informasi tambahan untuk membangun sebuah koneksi dengan sumber data. Lihat Jenis dan opsi koneksi untuk ETL di AWS Glue untuk Spark. transformationContext
— Konteks transformasi yang dikaitkan dengan sink yang akan digunakan oleh bookmark tugas. Diatur ke kosong secara default.format
— Format data yang disimpan pada sumber. SaatconnectionType
adalah "s3", Anda juga dapat menentukanformat
. Bisa berupa “avro”, “csv”, “groklog”, “ion”, “json”, “xml”, “parquet”, atau “orc”, salah satunya.formatOptions
— Sebuah string pasangan nama-nilai JSON yang menyediakan opsi tambahan untuk mengurai data di sumber. Lihat Opsi format data.
Mengembalikan DataSource
.
Contoh
Buat DynamicFrame dari sumber data yang merupakan file nilai yang dipisahkan koma (CSV) di Amazon S3:
val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "
s3://csv/nycflights.csv
"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()
Buat DynamicFrame dari sumber data yang merupakan PostgreSQL menggunakan koneksi JDBC:
val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://
databasePostgres-1
.rds.amazonaws.com:5432
/testdb
", "dbtable": "public.company
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
Buat DynamicFrame dari sumber data yang merupakan MySQL menggunakan koneksi JDBC:
val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://
databaseMysql-1
.rds.amazonaws.com:3306
/testdb
", "dbtable": "athenatest_nycflights13_csv
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
def getSparkSession
def getSparkSession : SparkSession
Mendapat SparkSession
objek yang terkait dengan ini GlueContext. Gunakan SparkSession objek ini untuk mendaftarkan tabel dan UDF untuk digunakan dengan DataFrame
dibuat dari DynamicFrames.
Mengembalikan SparkSession.
def StartTransaksi
def startTransaction(readOnly: Boolean):String
Mulai transaksi baru. Secara internal memanggil Lake Formation StartTransaction API.
readOnly
— (Boolean) Menunjukkan apakah transaksi ini harus dibaca saja atau dibaca dan ditulis. Penulisan yang dibuat menggunakan ID transaksi hanya-baca akan ditolak. Transaksi read-only tidak perlu dilakukan.
Mengembalikan ID transaksi.
def CommitTransaction
def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean
Upaya untuk melakukan transaksi yang ditentukan. commitTransaction
dapat kembali sebelum transaksi selesai dilakukan. Secara internal memanggil Lake Formation CommitTransaction API.
transactionId
— (String) Transaksi untuk melakukan.waitForCommit
— (Boolean) Menentukan apakahcommitTransaction
pengembalian segera. Nilai default-nya adalah betul. Jika salah,commitTransaction
polling dan menunggu sampai transaksi dilakukan. Jumlah waktu tunggu dibatasi hingga 1 menit menggunakan backoff eksponensial dengan maksimal 6 upaya coba lagi.
Mengembalikan Boolean untuk menunjukkan apakah komit dilakukan atau tidak.
def batalkan Transaksi
def cancelTransaction(transactionId: String): Unit
Upaya untuk membatalkan transaksi yang ditentukan. Secara internal memanggil Lake Formation CancelTransactionAPI.
transactionId
— (String) Transaksi untuk membatalkan.
Mengembalikan TransactionCommittedException
pengecualian jika transaksi sebelumnya dilakukan.
def this
def this( sc : SparkContext,
minPartitions : Int,
targetPartitions : Int )
Membuat sebuah objek GlueContext
menggunakan SparkContext
yang ditentukan, partisi minimum, dan partisi target.
sc
—SparkContext
.minPartitions
— Jumlah partisi minimum.targetPartitions
— Jumlah partisi target.
Mengembalikan GlueContext
.
def this
def this( sc : SparkContext )
Membuat sebuah objek GlueContext
dengan SparkContext
yang disediakan. Menetapkan partisi minimum ke 10 dan partisi target ke 20.
sc
—SparkContext
.
Mengembalikan GlueContext
.
def this
def this( sparkContext : JavaSparkContext )
Membuat sebuah objek GlueContext
dengan JavaSparkContext
yang disediakan. Menetapkan partisi minimum ke 10 dan partisi target ke 20.
sparkContext
—JavaSparkContext
.
Mengembalikan GlueContext
.