Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Anda dapat menggunakan koneksi Kafka untuk membaca dan menulis ke aliran data Kafka menggunakan informasi yang disimpan dalam tabel Katalog Data, atau dengan memberikan informasi untuk langsung mengakses aliran data. Koneksi mendukung cluster Kafka atau Amazon Managed Streaming untuk Apache Kafka Kafka cluster. Anda dapat membaca informasi dari Kafka menjadi Spark DataFrame, lalu mengubahnya menjadi Glue AWS . DynamicFrame Anda dapat menulis DynamicFrames ke Kafka dalam JSON format. Jika Anda langsung mengakses aliran data, gunakan opsi ini untuk memberikan informasi tentang cara mengakses aliran data.
Jika Anda menggunakan getCatalogSource
atau create_data_frame_from_catalog
menggunakan catatan dari sumber streaming Kafka, getCatalogSink
atau write_dynamic_frame_from_catalog
untuk menulis catatan ke Kafka, dan pekerjaan tersebut memiliki database Katalog Data dan informasi nama tabel, dan dapat menggunakannya untuk mendapatkan beberapa parameter dasar untuk membaca dari sumber streaming Kafka. Jika Anda menggunakangetSource
,getCatalogSink
,getSourceWithFormat
,getSinkWithFormat
, createDataFrameFromOptions
ataucreate_data_frame_from_options
, atauwrite_dynamic_frame_from_catalog
, Anda harus menentukan parameter dasar ini menggunakan opsi koneksi yang dijelaskan di sini.
Anda dapat menentukan opsi koneksi untuk Kafka menggunakan argumen berikut untuk metode yang ditentukan di GlueContext
kelas.
-
Skala
-
connectionOptions
: Gunakan dengangetSource
,createDataFrameFromOptions
,getSink
-
additionalOptions
: Gunakan dengangetCatalogSource
,getCatalogSink
-
options
: Gunakan dengangetSourceWithFormat
,getSinkWithFormat
-
-
Python
-
connection_options
: Gunakan dengancreate_data_frame_from_options
,write_dynamic_frame_from_options
-
additional_options
: Gunakan dengancreate_data_frame_from_catalog
,write_dynamic_frame_from_catalog
-
options
: Gunakan dengangetSource
,getSink
-
Untuk catatan dan batasan tentang ETL pekerjaan streaming, konsultasikanETLCatatan dan batasan streaming.
Topik
Konfigurasikan Kafka
Tidak ada AWS prasyarat untuk menghubungkan ke aliran Kafka yang tersedia melalui internet.
Anda dapat membuat koneksi AWS Glue Kafka untuk mengelola kredenal koneksi Anda. Untuk informasi selengkapnya, lihat Membuat AWS Glue koneksi untuk aliran data Apache Kafka. Dalam konfigurasi pekerjaan AWS Glue Anda, berikan connectionName
sebagai koneksi jaringan tambahan, maka, dalam panggilan metode Anda, berikan connectionName
ke connectionName
parameter.
Dalam kasus tertentu, Anda perlu mengkonfigurasi prasyarat tambahan:
-
Jika menggunakan Amazon Managed Streaming untuk Apache Kafka Kafka IAM dengan otentikasi, Anda akan memerlukan konfigurasi yang sesuai. IAM
-
Jika menggunakan Amazon Managed Streaming for Apache Kafka dalam Amazon, Anda akan memerlukan konfigurasi VPC Amazon yang sesuai. VPC Anda perlu membuat koneksi AWS Glue yang menyediakan informasi VPC koneksi Amazon. Anda akan memerlukan konfigurasi pekerjaan Anda untuk menyertakan koneksi AWS Glue sebagai koneksi jaringan Tambahan.
Untuk informasi lebih lanjut tentang prasyarat ETL pekerjaan Streaming, lihat. ETLLowongan kerja streaming di AWS Glue
Contoh: Membaca dari aliran Kafka
Digunakan bersama denganforEachBatch.
Contoh untuk sumber streaming Kafka:
kafka_options =
{ "connectionName": "ConfluentKafka",
"topicName": "kafka-auth-topic",
"startingOffsets": "earliest",
"inferSchema": "true",
"classification": "json"
}
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
Contoh: Menulis ke aliran Kafka
Contoh untuk menulis ke Kafka:
Contoh dengan getSink
metode:
data_frame_datasource0 =
glueContext.getSink(
connectionType="kafka",
connectionOptions={
JsonOptions("""{
"connectionName": "ConfluentKafka",
"classification": "json",
"topic": "kafka-auth-topic",
"typeOfData": "kafka"}
""")},
transformationContext="dataframe_ApacheKafka_node1711729173428")
.getDataFrame()
Contoh dengan write_dynamic_frame.from_options
metode:
kafka_options =
{ "connectionName": "ConfluentKafka",
"topicName": "kafka-auth-topic",
"classification": "json"
}
data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)
Referensi opsi koneksi Kafka
Saat membaca, gunakan opsi koneksi berikut dengan"connectionType": "kafka"
:
-
"bootstrap.servers"
(Wajib) Daftar server bootstrapURLs, misalnya, sebagaib-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094
. Opsi ini harus ditentukan dalam API panggilan atau didefinisikan dalam metadata tabel di Katalog Data. -
"security.protocol"
(Wajib) Protokol yang digunakan untuk berkomunikasi dengan broker. Nilai yang mungkin adalah"SSL"
atau"PLAINTEXT"
. -
"topicName"
(Wajib) Daftar topik yang dipisahkan koma untuk berlangganan. Anda harus menentukan satu dan hanya satu dari"topicName"
,"assign"
atau"subscribePattern"
. -
"assign"
: (Wajib) JSON String yang menentukan spesifikTopicPartitions
untuk dikonsumsi. Anda harus menentukan satu dan hanya satu dari"topicName"
,"assign"
atau"subscribePattern"
.Contoh: '{"topicA”: [0,1], "topiCB”: [2,4]}'
-
"subscribePattern"
: (Wajib) Sebuah string regex Java yang mengidentifikasi daftar topik untuk berlangganan. Anda harus menentukan satu dan hanya satu dari"topicName"
,"assign"
atau"subscribePattern"
.Contoh: 'topik. *'
-
"classification"
(Wajib) Format file yang digunakan oleh data dalam catatan. Diperlukan kecuali disediakan melalui Katalog Data. -
"delimiter"
(Opsional) Pemisah nilai yangclassification
digunakan CSV saat. Defaultnya adalah ",
.” -
"startingOffsets"
: (Opsional) Posisi awal dalam topik Kafka tempat untuk membaca data. Nilai yang mungkin adalah"earliest"
atau"latest"
. Nilai default-nya adalah"latest"
. -
"startingTimestamp"
: (Opsional, hanya didukung untuk AWS Glue versi 4.0 atau yang lebih baru) Stempel waktu catatan dalam topik Kafka untuk membaca data dari. Nilai yang mungkin adalah string Timestamp dalam UTC format dalam polayyyy-mm-ddTHH:MM:SSZ
(di manaZ
mewakili UTC zona waktu offset dengan +/-. Misalnya: “2023-04-04T 08:00:00-04:00 “).Catatan: Hanya satu dari 'startingOffsets' atau 'startingTimestamp' yang dapat hadir dalam daftar Opsi Koneksi dari skrip streaming AWS Glue, termasuk kedua properti ini akan mengakibatkan kegagalan pekerjaan.
-
"endingOffsets"
: (Opsional) Titik akhir ketika kueri batch berakhir. Nilai yang mungkin adalah salah satu"latest"
atau JSON string yang menentukan offset akhir untuk masing-masing.TopicPartition
Untuk JSON string, formatnya adalah
{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
. Nilai-1
sebagai offset mewakili"latest"
. -
"pollTimeoutMs"
: (Opsional) Waktu habis dalam milidetik untuk memeriksa status data dari Kafka di pelaksana tugas Spark. Nilai default-nya adalah512
. -
"numRetries"
: (Opsional) Jumlah percobaan sebelum gagal untuk mengambil offset Kafka. Nilai default-nya adalah3
. -
"retryIntervalMs"
: (Opsional) Waktu dalam milidetik yang digunakan untuk menunggu sebelum mencoba kembali untuk mengambil offset Kafka. Nilai default-nya adalah10
. -
"maxOffsetsPerTrigger"
: (Opsional) Batas tingkat pada jumlah maksimum offset yang diproses untuk setiap interval pemicu. Jumlah total offset yang ditentukan dibagi secara proporsional di seluruhtopicPartitions
dengan volume yang berbeda. Nilai default-nya adalah nol, yang berarti bahwa konsumen membaca semua offset sampai diketahui offset terbaru. -
"minPartitions"
: (Opsional) Jumlah minimum partisi yang diinginkan yang akan dibaca dari Kafka. Nilai default-nya adalah nol, yang berarti bahwa jumlah partisi spark sama dengan jumlah partisi Kafka. -
"includeHeaders"
: (Opsional) Apakah akan menyertakan header Kafka. Ketika opsi diatur ke “true”, output data akan berisi kolom tambahan bernama “glue_streaming_kafka_headers” dengan tipe.Array[Struct(key: String, value: String)]
Nilai defaultnya adalah “false”. Opsi ini tersedia dalam AWS Glue versi 3.0 atau yang lebih baru. -
"schema"
: (Diperlukan saat inferSchema disetel ke false) Skema yang digunakan untuk memproses muatan. Jika klasifikasi adalahavro
skema yang disediakan harus dalam format skema Avro. Jika klasifikasi tidakavro
, skema yang disediakan harus dalam format DDL skema.Berikut ini adalah contoh skema.
'column1' INT, 'column2' STRING , 'column3' FLOAT
-
"inferSchema"
: (Opsional) Nilai default adalah 'salah'. Jika disetel ke 'true', skema akan terdeteksi saat runtime dari payload di dalamnya.foreachbatch
-
"avroSchema"
: (Usang) Parameter yang digunakan untuk menentukan skema data Avro saat format Avro digunakan. Parameter ini sekarang tidak digunakan lagi. Gunakan parameterschema
. -
"addRecordTimestamp"
: (Opsional) Ketika opsi ini diatur ke 'true', output data akan berisi kolom tambahan bernama “__src_timestamp” yang menunjukkan waktu ketika catatan terkait diterima oleh topik. Nilai defaultnya adalah 'salah'. Opsi ini didukung di AWS Glue versi 4.0 atau yang lebih baru. -
"emitConsumerLagMetrics"
: (Opsional) Ketika opsi disetel ke 'true', untuk setiap batch, itu akan memancarkan metrik untuk durasi antara catatan tertua yang diterima oleh topik dan waktu tiba. AWS Glue CloudWatch Nama metriknya adalah “glue.driver.streaming. maxConsumerLagInMs”. Nilai defaultnya adalah 'salah'. Opsi ini didukung di AWS Glue versi 4.0 atau yang lebih baru.
Saat menulis, gunakan opsi koneksi berikut dengan"connectionType": "kafka"
:
-
"connectionName"
(Wajib) Nama sambungan AWS Glue yang digunakan untuk menghubungkan ke cluster Kafka (mirip dengan sumber Kafka). -
"topic"
(Wajib) Jika kolom topik ada maka nilainya digunakan sebagai topik saat menulis baris yang diberikan ke Kafka, kecuali jika opsi konfigurasi topik disetel. Artinya, opsitopic
konfigurasi mengesampingkan kolom topik. -
"partition"
(Opsional) Jika nomor partisi yang valid ditentukan, itupartition
akan digunakan saat mengirim catatan.Jika tidak ada partisi yang ditentukan tetapi a
key
hadir, partisi akan dipilih menggunakan hash dari kunci.Jika tidak
partition
adakey
atau tidak ada, partisi akan dipilih berdasarkan partisi lengket perubahan tersebut ketika setidaknya batch.size byte diproduksi ke partisi. -
"key"
(Opsional) Digunakan untuk partisi jika nolpartition
. -
"classification"
(Opsional) Format file yang digunakan oleh data dalam catatan. Kami hanya mendukungJSON, CSV dan Avro.Dengan format Avro, kami dapat menyediakan kustom avroSchema untuk membuat serial, tetapi perhatikan bahwa ini perlu disediakan pada sumber untuk deserialisasi juga. Lain, secara default menggunakan Apache AvroSchema untuk serialisasi.
Selain itu, Anda dapat menyempurnakan wastafel Kafka sesuai kebutuhan dengan memperbarui parameter konfigurasi produsen Kafka
Namun, ada daftar kecil opsi penolakan yang tidak akan berlaku. Untuk informasi selengkapnya, lihat konfigurasi khusus Kafka