AWS Glue Koneksi streaming - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

AWS Glue Koneksi streaming

Bagian berikut memberikan informasi tentang cara menggunakan koneksi di AWS Glue Streaming.

Bekerja dengan koneksi Kafka

Anda dapat menggunakan koneksi Kafka untuk membaca dan menulis ke aliran data Kafka menggunakan informasi yang disimpan dalam tabel Katalog Data, atau dengan memberikan informasi untuk langsung mengakses aliran data. Koneksi mendukung cluster Kafka atau Amazon Managed Streaming untuk Apache Kafka Kafka cluster. Anda dapat membaca informasi dari Kafka menjadi Spark DataFrame, lalu mengubahnya menjadi Glue AWS . DynamicFrame Anda dapat menulis DynamicFrames ke Kafka dalam JSON format. Jika Anda langsung mengakses aliran data, gunakan opsi ini untuk memberikan informasi tentang cara mengakses aliran data.

Jika Anda menggunakan getCatalogSource atau create_data_frame_from_catalog menggunakan catatan dari sumber streaming Kafka, getCatalogSink atau write_dynamic_frame_from_catalog untuk menulis catatan ke Kafka, dan pekerjaan tersebut memiliki database Katalog Data dan informasi nama tabel, dan dapat menggunakannya untuk mendapatkan beberapa parameter dasar untuk membaca dari sumber streaming Kafka. Jika Anda menggunakangetSource,getCatalogSink,getSourceWithFormat,getSinkWithFormat, createDataFrameFromOptions ataucreate_data_frame_from_options, atauwrite_dynamic_frame_from_catalog, Anda harus menentukan parameter dasar ini menggunakan opsi koneksi yang dijelaskan di sini.

Anda dapat menentukan opsi koneksi untuk Kafka menggunakan argumen berikut untuk metode yang ditentukan di GlueContext kelas.

  • Skala

    • connectionOptions: Gunakan dengangetSource,createDataFrameFromOptions, getSink

    • additionalOptions: Gunakan dengangetCatalogSource, getCatalogSink

    • options: Gunakan dengangetSourceWithFormat, getSinkWithFormat

  • Python

    • connection_options: Gunakan dengancreate_data_frame_from_options, write_dynamic_frame_from_options

    • additional_options: Gunakan dengancreate_data_frame_from_catalog, write_dynamic_frame_from_catalog

    • options: Gunakan dengangetSource, getSink

Untuk catatan dan batasan tentang ETL pekerjaan streaming, konsultasikanETLCatatan dan batasan streaming.

Topik

    Konfigurasikan Kafka

    Tidak ada AWS prasyarat untuk menghubungkan ke aliran Kafka yang tersedia melalui internet.

    Anda dapat membuat koneksi AWS Glue Kafka untuk mengelola kredenal koneksi Anda. Untuk informasi selengkapnya, lihat Membuat AWS Glue koneksi untuk aliran data Apache Kafka. Dalam konfigurasi pekerjaan AWS Glue Anda, berikan connectionName sebagai koneksi jaringan tambahan, maka, dalam panggilan metode Anda, berikan connectionName ke connectionName parameter.

    Dalam kasus tertentu, Anda perlu mengkonfigurasi prasyarat tambahan:

    • Jika menggunakan Amazon Managed Streaming untuk Apache Kafka Kafka IAM dengan otentikasi, Anda akan memerlukan konfigurasi yang sesuai. IAM

    • Jika menggunakan Amazon Managed Streaming for Apache Kafka dalam Amazon, Anda akan memerlukan konfigurasi VPC Amazon yang sesuai. VPC Anda perlu membuat koneksi AWS Glue yang menyediakan informasi VPC koneksi Amazon. Anda akan memerlukan konfigurasi pekerjaan Anda untuk menyertakan koneksi AWS Glue sebagai koneksi jaringan Tambahan.

    Untuk informasi lebih lanjut tentang prasyarat ETL pekerjaan Streaming, lihat. ETLLowongan kerja streaming di AWS Glue

    Contoh: Membaca dari aliran Kafka

    Digunakan bersama denganforEachBatch.

    Contoh untuk sumber streaming Kafka:

    kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)

    Contoh: Menulis ke aliran Kafka

    Contoh untuk menulis ke Kafka:

    Contoh dengan getSink metode:

    data_frame_datasource0 = glueContext.getSink( connectionType="kafka", connectionOptions={ JsonOptions("""{ "connectionName": "ConfluentKafka", "classification": "json", "topic": "kafka-auth-topic", "typeOfData": "kafka"} """)}, transformationContext="dataframe_ApacheKafka_node1711729173428") .getDataFrame()

    Contoh dengan write_dynamic_frame.from_options metode:

    kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "classification": "json" } data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)

    Referensi opsi koneksi Kafka

    Saat membaca, gunakan opsi koneksi berikut dengan"connectionType": "kafka":

    • "bootstrap.servers"(Wajib) Daftar server bootstrapURLs, misalnya, sebagaib-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094. Opsi ini harus ditentukan dalam API panggilan atau didefinisikan dalam metadata tabel di Katalog Data.

    • "security.protocol"(Wajib) Protokol yang digunakan untuk berkomunikasi dengan broker. Nilai yang mungkin adalah "SSL" atau "PLAINTEXT".

    • "topicName"(Wajib) Daftar topik yang dipisahkan koma untuk berlangganan. Anda harus menentukan satu dan hanya satu dari"topicName", "assign" atau"subscribePattern".

    • "assign": (Wajib) JSON String yang menentukan spesifik TopicPartitions untuk dikonsumsi. Anda harus menentukan satu dan hanya satu dari"topicName", "assign" atau"subscribePattern".

      Contoh: '{"topicA”: [0,1], "topiCB”: [2,4]}'

    • "subscribePattern": (Wajib) Sebuah string regex Java yang mengidentifikasi daftar topik untuk berlangganan. Anda harus menentukan satu dan hanya satu dari"topicName", "assign" atau"subscribePattern".

      Contoh: 'topik. *'

    • "classification"(Wajib) Format file yang digunakan oleh data dalam catatan. Diperlukan kecuali disediakan melalui Katalog Data.

    • "delimiter"(Opsional) Pemisah nilai yang classification digunakan CSV saat. Defaultnya adalah ",.”

    • "startingOffsets": (Opsional) Posisi awal dalam topik Kafka tempat untuk membaca data. Nilai yang mungkin adalah "earliest" atau "latest". Nilai default-nya adalah "latest".

    • "startingTimestamp": (Opsional, hanya didukung untuk AWS Glue versi 4.0 atau yang lebih baru) Stempel waktu catatan dalam topik Kafka untuk membaca data dari. Nilai yang mungkin adalah string Timestamp dalam UTC format dalam pola yyyy-mm-ddTHH:MM:SSZ (di mana Z mewakili UTC zona waktu offset dengan +/-. Misalnya: “2023-04-04T 08:00:00-04:00 “).

      Catatan: Hanya satu dari 'startingOffsets' atau 'startingTimestamp' yang dapat hadir dalam daftar Opsi Koneksi dari skrip streaming AWS Glue, termasuk kedua properti ini akan mengakibatkan kegagalan pekerjaan.

    • "endingOffsets": (Opsional) Titik akhir ketika kueri batch berakhir. Nilai yang mungkin adalah salah satu "latest" atau JSON string yang menentukan offset akhir untuk masing-masing. TopicPartition

      Untuk JSON string, formatnya adalah{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}. Nilai -1 sebagai offset mewakili "latest".

    • "pollTimeoutMs": (Opsional) Waktu habis dalam milidetik untuk memeriksa status data dari Kafka di pelaksana tugas Spark. Nilai default-nya adalah 512.

    • "numRetries": (Opsional) Jumlah percobaan sebelum gagal untuk mengambil offset Kafka. Nilai default-nya adalah 3.

    • "retryIntervalMs": (Opsional) Waktu dalam milidetik yang digunakan untuk menunggu sebelum mencoba kembali untuk mengambil offset Kafka. Nilai default-nya adalah 10.

    • "maxOffsetsPerTrigger": (Opsional) Batas tingkat pada jumlah maksimum offset yang diproses untuk setiap interval pemicu. Jumlah total offset yang ditentukan dibagi secara proporsional di seluruh topicPartitions dengan volume yang berbeda. Nilai default-nya adalah nol, yang berarti bahwa konsumen membaca semua offset sampai diketahui offset terbaru.

    • "minPartitions": (Opsional) Jumlah minimum partisi yang diinginkan yang akan dibaca dari Kafka. Nilai default-nya adalah nol, yang berarti bahwa jumlah partisi spark sama dengan jumlah partisi Kafka.

    • "includeHeaders": (Opsional) Apakah akan menyertakan header Kafka. Ketika opsi diatur ke “true”, output data akan berisi kolom tambahan bernama “glue_streaming_kafka_headers” dengan tipe. Array[Struct(key: String, value: String)] Nilai defaultnya adalah “false”. Opsi ini tersedia dalam AWS Glue versi 3.0 atau yang lebih baru.

    • "schema": (Diperlukan saat inferSchema disetel ke false) Skema yang digunakan untuk memproses muatan. Jika klasifikasi adalah avro skema yang disediakan harus dalam format skema Avro. Jika klasifikasi tidakavro, skema yang disediakan harus dalam format DDL skema.

      Berikut ini adalah contoh skema.

      Example in DDL schema format
      'column1' INT, 'column2' STRING , 'column3' FLOAT
      Example in Avro schema format
      { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
    • "inferSchema": (Opsional) Nilai default adalah 'salah'. Jika disetel ke 'true', skema akan terdeteksi saat runtime dari payload di dalamnya. foreachbatch

    • "avroSchema": (Usang) Parameter yang digunakan untuk menentukan skema data Avro saat format Avro digunakan. Parameter ini sekarang tidak digunakan lagi. Gunakan parameter schema.

    • "addRecordTimestamp": (Opsional) Ketika opsi ini diatur ke 'true', output data akan berisi kolom tambahan bernama “__src_timestamp” yang menunjukkan waktu ketika catatan terkait diterima oleh topik. Nilai defaultnya adalah 'salah'. Opsi ini didukung di AWS Glue versi 4.0 atau yang lebih baru.

    • "emitConsumerLagMetrics": (Opsional) Ketika opsi disetel ke 'true', untuk setiap batch, itu akan memancarkan metrik untuk durasi antara catatan tertua yang diterima oleh topik dan waktu tiba. AWS Glue CloudWatch Nama metriknya adalah “glue.driver.streaming. maxConsumerLagInMs”. Nilai defaultnya adalah 'salah'. Opsi ini didukung di AWS Glue versi 4.0 atau yang lebih baru.

    Saat menulis, gunakan opsi koneksi berikut dengan"connectionType": "kafka":

    • "connectionName"(Wajib) Nama sambungan AWS Glue yang digunakan untuk menghubungkan ke cluster Kafka (mirip dengan sumber Kafka).

    • "topic"(Wajib) Jika kolom topik ada maka nilainya digunakan sebagai topik saat menulis baris yang diberikan ke Kafka, kecuali jika opsi konfigurasi topik disetel. Artinya, opsi topic konfigurasi mengesampingkan kolom topik.

    • "partition"(Opsional) Jika nomor partisi yang valid ditentukan, itu partition akan digunakan saat mengirim catatan.

      Jika tidak ada partisi yang ditentukan tetapi a key hadir, partisi akan dipilih menggunakan hash dari kunci.

      Jika tidak partition ada key atau tidak ada, partisi akan dipilih berdasarkan partisi lengket perubahan tersebut ketika setidaknya batch.size byte diproduksi ke partisi.

    • "key"(Opsional) Digunakan untuk partisi jika nolpartition.

    • "classification"(Opsional) Format file yang digunakan oleh data dalam catatan. Kami hanya mendukungJSON, CSV dan Avro.

      Dengan format Avro, kami dapat menyediakan kustom avroSchema untuk membuat serial, tetapi perhatikan bahwa ini perlu disediakan pada sumber untuk deserialisasi juga. Lain, secara default menggunakan Apache AvroSchema untuk serialisasi.

    Selain itu, Anda dapat menyempurnakan wastafel Kafka sesuai kebutuhan dengan memperbarui parameter konfigurasi produsen Kafka. Perhatikan bahwa tidak ada daftar izin pada opsi koneksi, semua pasangan nilai kunci tetap ada di wastafel apa adanya.

    Namun, ada daftar kecil opsi penolakan yang tidak akan berlaku. Untuk informasi selengkapnya, lihat konfigurasi khusus Kafka.

    Bekerja dengan koneksi Kinesis

    Anda dapat menggunakan koneksi Kinesis untuk membaca dan menulis ke aliran data Amazon Kinesis menggunakan informasi yang disimpan dalam tabel Katalog Data, atau dengan memberikan informasi untuk langsung mengakses aliran data. Anda dapat membaca informasi dari Kinesis menjadi Spark DataFrame, lalu mengubahnya menjadi Glue. AWS DynamicFrame Anda dapat menulis DynamicFrames ke Kinesis dalam format. JSON Jika Anda langsung mengakses aliran data, gunakan opsi ini untuk memberikan informasi tentang cara mengakses aliran data.

    Jika Anda menggunakan getCatalogSource atau create_data_frame_from_catalog menggunakan catatan dari sumber streaming Kinesis, pekerjaan tersebut memiliki database Katalog Data dan informasi nama tabel, dan dapat menggunakannya untuk mendapatkan beberapa parameter dasar untuk membaca dari sumber streaming Kinesis. Jika Anda menggunakangetSource,getSourceWithFormat, createDataFrameFromOptions ataucreate_data_frame_from_options, Anda harus menentukan parameter dasar ini menggunakan opsi koneksi yang dijelaskan di sini.

    Anda dapat menentukan opsi koneksi untuk Kinesis menggunakan argumen berikut untuk metode yang ditentukan di GlueContext kelas.

    • Skala

      • connectionOptions: Gunakan dengangetSource,createDataFrameFromOptions, getSink

      • additionalOptions: Gunakan dengangetCatalogSource, getCatalogSink

      • options: Gunakan dengangetSourceWithFormat, getSinkWithFormat

    • Python

      • connection_options: Gunakan dengancreate_data_frame_from_options, write_dynamic_frame_from_options

      • additional_options: Gunakan dengancreate_data_frame_from_catalog, write_dynamic_frame_from_catalog

      • options: Gunakan dengangetSource, getSink

    Untuk catatan dan batasan tentang ETL pekerjaan Streaming, konsultasikanETLCatatan dan batasan streaming.

    Konfigurasikan Kinesis

    Untuk terhubung ke aliran data Kinesis dalam pekerjaan AWS Glue Spark, Anda memerlukan beberapa prasyarat:

    • Jika membaca, tugas AWS Glue harus memiliki IAM izin tingkat akses Baca ke aliran data Kinesis.

    • Jika menulis, pekerjaan AWS Glue harus memiliki IAM izin tingkat akses Tulis ke aliran data Kinesis.

    Dalam kasus tertentu, Anda perlu mengkonfigurasi prasyarat tambahan:

    • Jika pekerjaan AWS Glue Anda dikonfigurasi dengan koneksi jaringan Tambahan (biasanya untuk terhubung ke kumpulan data lain) dan salah satu koneksi tersebut menyediakan opsi VPC Jaringan Amazon, ini akan mengarahkan pekerjaan Anda untuk berkomunikasi melalui Amazon. VPC Dalam hal ini Anda juga perlu mengonfigurasi aliran data Kinesis Anda untuk berkomunikasi melalui Amazon. VPC Anda dapat melakukan ini dengan membuat VPC titik akhir antarmuka antara Amazon VPC dan aliran data Kinesis Anda. Untuk informasi selengkapnya, lihat Menggunakan Kinesis Data Streams VPC dengan Titik Akhir Antarmuka.

    • Saat menentukan Amazon Kinesis Data Streams di akun lain, Anda harus menyiapkan peran dan kebijakan untuk mengizinkan akses lintas akun. Untuk informasi selengkapnya, lihat Contoh: Baca Dari Pengaliran Kinesis di Akun Berbeda.

    Untuk informasi lebih lanjut tentang prasyarat ETL pekerjaan Streaming, lihat. ETLLowongan kerja streaming di AWS Glue

    Baca dari Kinesis

    Contoh: Membaca dari aliran Kinesis

    Digunakan bersama denganforEachBatch.

    Contoh untuk sumber streaming Amazon Kinesis:

    kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

    Menulis ke Kinesis

    Contoh: Menulis ke aliran Kinesis

    Digunakan bersama denganforEachBatch. Anda DynamicFrame akan ditulis ke aliran dalam JSON format. Jika pekerjaan tidak dapat menulis setelah beberapa kali percobaan ulang, itu akan gagal. Secara default, setiap DynamicFrame catatan akan dikirim ke aliran Kinesis satu per satu. Anda dapat mengonfigurasi perilaku ini menggunakan aggregationEnabled dan parameter terkait.

    Contoh menulis ke Amazon Kinesis dari pekerjaan streaming:

    Python
    glueContext.write_dynamic_frame.from_options( frame=frameToWrite connection_type="kinesis", connection_options={ "partitionKey": "part1", "streamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/streamName", } )
    Scala
    glueContext.getSinkWithFormat( connectionType="kinesis", options=JsonOptions("""{ "streamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/streamName", "partitionKey": "part1" }"""), ) .writeDynamicFrame(frameToWrite)

    Parameter koneksi Kinesis

    Menetapkan opsi koneksi untuk Amazon Kinesis Data Streams.

    Gunakan opsi koneksi berikut untuk sumber data streaming Kinesis:

    • "streamARN"(Wajib) Digunakan untuk Baca/Tulis. ARNAliran data Kinesis.

    • "classification"(Diperlukan untuk dibaca) Digunakan untuk Baca. Format file yang digunakan oleh data dalam catatan. Diperlukan kecuali disediakan melalui Katalog Data.

    • "streamName"— (Opsional) Digunakan untuk Baca. Nama aliran data Kinesis untuk dibaca. Digunakan denganendpointUrl.

    • "endpointUrl"— (Opsional) Digunakan untuk Baca. Default: "https://kinesis.us-east-1.amazonaws.com”. AWS Titik akhir dari aliran Kinesis. Anda tidak perlu mengubah ini kecuali Anda terhubung ke wilayah khusus.

    • "partitionKey"— (Opsional) Digunakan untuk Menulis. Kunci partisi Kinesis digunakan saat memproduksi catatan.

    • "delimiter"(Opsional) Digunakan untuk Baca. Pemisah nilai yang digunakan saatclassification. CSV Defaultnya adalah ",.”

    • "startingPosition": (Opsional) Digunakan untuk Baca. Posisi awal dalam aliran data Kinesis untuk membaca data dari. Nilai yang mungkin adalah"latest","trim_horizon","earliest", atau string Timestamp dalam UTC format dalam pola yyyy-mm-ddTHH:MM:SSZ (di mana Z mewakili UTC zona waktu offset dengan +/-. Misalnya “2023-04-04T 08:00:00-04:00 “). Nilai default-nya adalah "latest". Catatan: string Timestamp dalam UTC Format for hanya didukung untuk "startingPosition" AWS Glue versi 4.0 atau yang lebih baru.

    • "failOnDataLoss": (Opsional) Gagal pekerjaan jika ada pecahan aktif yang hilang atau kedaluwarsa. Nilai default-nya adalah "false".

    • "awsSTSRoleARN": (Opsional) Digunakan untuk Baca/Tulis. Amazon Resource Name (ARN) dari peran yang akan diasumsikan menggunakan AWS Security Token Service (AWS STS). Peran ini harus memiliki izin untuk mendeskripsikan atau membaca operasi rekaman untuk aliran data Kinesis. Anda harus menggunakan parameter ini saat mengakses aliran data di akun yang berbeda. Digunakan bersama dengan"awsSTSSessionName".

    • "awsSTSSessionName": (Opsional) Digunakan untuk Baca/Tulis. Pengidentifikasi untuk sesi dengan asumsi peran menggunakan. AWS STS Anda harus menggunakan parameter ini saat mengakses aliran data di akun yang berbeda. Digunakan bersama dengan"awsSTSRoleARN".

    • "awsSTSEndpoint": (Opsional) AWS STS Titik akhir yang digunakan saat menghubungkan ke Kinesis dengan peran yang diasumsikan. Ini memungkinkan penggunaan AWS STS titik akhir regional di aVPC, yang tidak dimungkinkan dengan titik akhir global default.

    • "maxFetchTimeInMs": (Opsional) Digunakan untuk Baca. Waktu maksimum yang dihabiskan untuk pelaksana pekerjaan untuk membaca catatan untuk batch saat ini dari aliran data Kinesis, ditentukan dalam milidetik (ms). Beberapa GetRecords API panggilan dapat dilakukan dalam waktu ini. Nilai default-nya adalah 1000.

    • "maxFetchRecordsPerShard": (Opsional) Digunakan untuk Baca. Jumlah maksimum catatan yang diambil per pecahan dalam aliran data Kinesis per mikrobatch. Catatan: Klien dapat melampaui batas ini jika pekerjaan streaming telah membaca catatan tambahan dari Kinesis (dalam panggilan get-records yang sama). Jika maxFetchRecordsPerShard perlu ketat maka itu harus kelipatanmaxRecordPerRead. Nilai default-nya adalah 100000.

    • "maxRecordPerRead": (Opsional) Digunakan untuk Baca. Jumlah maksimum catatan untuk diambil dari aliran data Kinesis dalam getRecords setiap operasi. Nilai default-nya adalah 10000.

    • "addIdleTimeBetweenReads": (Opsional) Digunakan untuk Baca. Menambahkan penundaan waktu antara dua operasi berturut-turutgetRecords. Nilai default-nya adalah "False". Opsi ini hanya dapat dikonfigurasi untuk Glue versi 2.0 dan di atasnya.

    • "idleTimeBetweenReadsInMs": (Opsional) Digunakan untuk Baca. Waktu tunda minimum antara dua getRecords operasi berturut-turut, ditentukan dalam ms. Nilai default-nya adalah 1000. Opsi ini hanya dapat dikonfigurasi untuk Glue versi 2.0 dan di atasnya.

    • "describeShardInterval": (Opsional) Digunakan untuk Baca. Interval waktu minimum antara dua ListShards API panggilan untuk skrip Anda untuk mempertimbangkan resharding. Untuk informasi selengkapnya, lihat Strategi Penyerpihan Kembali di Panduan Developer Amazon Kinesis Data Streams. Nilai default-nya adalah 1s.

    • "numRetries": (Opsional) Digunakan untuk Baca. Jumlah maksimum percobaan ulang untuk permintaan Kinesis Data Streams. API Nilai default-nya adalah 3.

    • "retryIntervalMs": (Opsional) Digunakan untuk Baca. Periode waktu pendinginan (ditentukan dalam ms) sebelum mencoba kembali panggilan Kinesis Data Streams. API Nilai default-nya adalah 1000.

    • "maxRetryIntervalMs": (Opsional) Digunakan untuk Baca. Periode waktu pendinginan maksimum (ditentukan dalam ms) antara dua percobaan ulang panggilan Kinesis Data Streams. API Nilai default-nya adalah 10000.

    • "avoidEmptyBatches": (Opsional) Digunakan untuk Baca. Hindari membuat pekerjaan microbatch kosong dengan memeriksa data yang belum dibaca di aliran data Kinesis sebelum batch dimulai. Nilai default-nya adalah "False".

    • "schema": (Diperlukan saat inferSchema disetel ke false) Digunakan untuk Baca. Skema yang digunakan untuk memproses muatan. Jika klasifikasi adalah avro skema yang disediakan harus dalam format skema Avro. Jika klasifikasi tidakavro, skema yang disediakan harus dalam format DDL skema.

      Berikut ini adalah contoh skema.

      Example in DDL schema format
      `column1` INT, `column2` STRING , `column3` FLOAT
      Example in Avro schema format
      { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
    • "inferSchema": (Opsional) Digunakan untuk Baca. Nilai defaultnya adalah 'salah'. Jika disetel ke 'true', skema akan terdeteksi saat runtime dari payload di dalamnya. foreachbatch

    • "avroSchema": (Usang) Digunakan untuk Baca. Parameter yang digunakan untuk menentukan skema data Avro saat format Avro digunakan. Parameter ini sekarang tidak digunakan lagi. Gunakan parameter schema.

    • "addRecordTimestamp": (Opsional) Digunakan untuk Baca. Ketika opsi ini diatur ke 'true', output data akan berisi kolom tambahan bernama “__src_timestamp” yang menunjukkan waktu ketika catatan terkait diterima oleh aliran. Nilai defaultnya adalah 'salah'. Opsi ini didukung di AWS Glue versi 4.0 atau yang lebih baru.

    • "emitConsumerLagMetrics": (Opsional) Digunakan untuk Baca. Ketika opsi disetel ke 'true', untuk setiap batch, itu akan memancarkan metrik untuk durasi antara rekaman tertua yang diterima oleh aliran dan waktu pemasangannya. AWS Glue CloudWatch Nama metriknya adalah “glue.driver.streaming. maxConsumerLagInMs”. Nilai defaultnya adalah 'salah'. Opsi ini didukung di AWS Glue versi 4.0 atau yang lebih baru.

    • "fanoutConsumerARN": (Opsional) Digunakan untuk Baca. Konsumen aliran Kinesis untuk aliran yang ditentukan dalam. ARN streamARN Digunakan untuk mengaktifkan mode fan-out yang disempurnakan untuk koneksi Kinesis Anda. Untuk informasi lebih lanjut tentang mengonsumsi aliran Kinesis dengan fan-out yang ditingkatkan, lihat. Menggunakan fan-out yang disempurnakan dalam pekerjaan streaming Kinesis

    • "recordMaxBufferedTime"— (Opsional) Digunakan untuk Menulis. Default: 1000 (ms). Waktu maksimum sebuah rekaman di-buffer sambil menunggu untuk ditulis.

    • "aggregationEnabled"— (Opsional) Digunakan untuk Menulis. Default: benar. Menentukan apakah catatan harus dikumpulkan sebelum mengirim mereka ke Kinesis.

    • "aggregationMaxSize"— (Opsional) Digunakan untuk Menulis. Default: 51200 (byte). Jika catatan lebih besar dari batas ini, itu akan melewati agregator. Catatan Kinesis memberlakukan batas 50KB pada ukuran rekaman. Jika Anda menetapkan ini di luar 50KB, catatan kebesaran akan ditolak oleh Kinesis.

    • "aggregationMaxCount"— (Opsional) Digunakan untuk Menulis. Standar: 4294967295. Jumlah maksimum item untuk dikemas ke dalam catatan agregat.

    • "producerRateLimit"— (Opsional) Digunakan untuk Menulis. Default: 150 (%). Membatasi throughput per shard yang dikirim dari satu produsen (seperti pekerjaan Anda), sebagai persentase dari batas backend.

    • "collectionMaxCount"— (Opsional) Digunakan untuk Menulis. Default: 500. Jumlah maksimum item untuk dikemas ke dalam PutRecords permintaan.

    • "collectionMaxSize"— (Opsional) Digunakan untuk Menulis. Default: 5242880 (byte). Jumlah maksimum data untuk dikirim dengan PutRecords permintaan.