Kafka 接続
Kafka 接続を使用すると、Data Catalog テーブルに格納されている情報を使用するか、データストリームに直接アクセスするための情報を指定することにより、Kafka データストリームへ読み込むまたは書き込むことができます。接続は、Kafka クラスターまたは Amazon Managed Streaming for Apache Kafka クラスターをサポートします。Kafka から Spark DataFrame に情報を読み込み、AWS Glue DynamicFrame に変換できます。DynamicFrames を JSON 形式で Kafka に書き込むことができます。データストリームに直接アクセスする場合は、これらのオプションを使用して、データストリームへのアクセス方法に関する情報を提供します。
getCatalogSource
または create_data_frame_from_catalog
を使用して Kafka ストリーミングソースからレコードを消費するか、getCatalogSink
または write_dynamic_frame_from_catalog
を使用して Kafka にレコードを書き込み、ジョブに Data Catalog データベースおよびテーブル名の情報があり、それを使用して Kafka ストリーミングソースから読み込むためのいくつかの基本パラメータを取得できる場合。getSource
、getCatalogSink
、getSourceWithFormat
、getSinkWithFormat
、createDataFrameFromOptions
、create_data_frame_from_options
、write_dynamic_frame_from_catalog
を使用する場合、ここで説明する接続オプションを使用し、これらの基本パラメータを指定する必要があります。
GlueContext
クラスで指定されたメソッドの次の引数を使用し、Kafka の接続オプションを指定できます。
-
Scala
-
connectionOptions
:getSource
、createDataFrameFromOptions
、getSink
で使用 -
additionalOptions
:getCatalogSource
、getCatalogSink
で使用 -
options
:getSourceWithFormat
、getSinkWithFormat
で使用
-
-
Python
-
connection_options
:create_data_frame_from_options
、write_dynamic_frame_from_options
で使用 -
additional_options
:create_data_frame_from_catalog
、write_dynamic_frame_from_catalog
で使用 -
options
:getSource
、getSink
で使用
-
ストリーミング ETL ジョブに関する注意事項と制限事項については、「ストリーミング ETL に関する注意と制限」を参照してください。
トピック
Kafka の設定
インターネット経由で利用可能な Kafka ストリームに接続するための AWS の前提条件はありません。
AWS Glue Kafka 接続を作成して、接続認証情報を管理できます。詳細については、「Apache Kafka データストリームの AWS Glue 接続の作成」を参照してください。AWS Glue ジョブ設定で、connectionName
を追加ネットワーク接続として指定し、メソッド呼び出しで connectionName
パラメータに connectionName
を指定します。
場合によっては、追加の前提条件を設定する必要があります。
-
IAM 認証で Amazon Managed Streaming for Apache Kafka を使用する場合は、適切な IAM 設定が必要になります。
-
Amazon VPC で Amazon Managed Streaming for Apache Kafka を使用する場合は、適切な Amazon VPC 設定が必要になります。Amazon VPC 接続情報を提供する AWS Glue 接続を作成する必要があります。AWS Glue 接続を追加ネットワーク接続として含めるには、ジョブ設定が必要です。
ストリーミング ETL ジョブの前提条件の詳細については、「AWS Glue でのストリーミング ETL ジョブ」を参照してください。
例: Kafka ストリームからの読み込み
forEachBatch と組み合わせて使用します。
Kafka ストリーミングソースの例:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
例: Kafka ストリームへの書き込み
Kafka への書き込み例:
getSink
メソッドを使った例:
data_frame_datasource0 = glueContext.getSink( connectionType="kafka", connectionOptions={ JsonOptions("""{ "connectionName": "ConfluentKafka", "classification": "json", "topic": "kafka-auth-topic", "typeOfData": "kafka"} """)}, transformationContext="dataframe_ApacheKafka_node1711729173428") .getDataFrame()
write_dynamic_frame.from_options
メソッドを使った例:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "classification": "json" } data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)
Kafka 接続オプションのリファレンス
読み込むとき、"connectionType": "kafka"
に次の接続オプションを使用します。
-
"bootstrap.servers"
(必須) ブートストラップサーバーの URL のリスト (例:b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094
)。このオプションは API 呼び出しで指定するか、データカタログ内のテーブルメタデータで定義する必要があります。 -
"security.protocol"
(必須) ブローカーと通信するために使用されるプロトコル。使用できる値は、"SSL"
または"PLAINTEXT"
です。 -
"topicName"
(必須) サブスクライブするトピックのカンマ区切りリスト。"topicName"
、"assign"
、または"subscribePattern"
の中から、いずれか 1 つのみを指定する必要があります。 -
"assign"
: (必須) 消費する特定のTopicPartitions
を指定する JSON 文字列。"topicName"
、"assign"
、または"subscribePattern"
の中から、いずれか 1 つのみを指定する必要があります。例: '{"topicA":[0,1],"topicB":[2,4]}'
-
"subscribePattern"
: (必須) サブスクライブする先のトピックリストを識別する Java の正規表現文字列。"topicName"
、"assign"
、または"subscribePattern"
の中から、いずれか 1 つのみを指定する必要があります。例: 'topic.*'
-
"classification"
(必須) レコード内のデータで使用されるファイル形式。データカタログを通じて提供されていない限り、必須です。 -
"delimiter"
(オプション)classification
が CSV の場合に使用される値の区切り文字。デフォルトは「,
」です。 -
"startingOffsets"
: (オプション) Kafka トピック内で、データの読み取りを開始する位置 使用できる値は、"earliest"
または"latest"
です。デフォルト値は"latest"
です。 -
"startingTimestamp"
: (オプション、AWS Glue バージョン 4.0 以降でのみサポート) Kafka トピック内で、データの読み込みを開始するレコードのタイムスタンプ。指定できる値は UTC 形式 (yyyy-mm-ddTHH:MM:SSZ
のパターン) のタイムスタンプ文字列です (Z
は UTC タイムゾーンのオフセットを +/- で表します。例: 「2023-04-04T08:00:00-04:00」)。注: AWS Glue ストリーミングスクリプトの接続オプションリストには、「startingOffsets」または「startingTimestamp」のいずれかしか表示できません。これらのプロパティの両方を含めると、ジョブが失敗します。
-
"endingOffsets"
: (オプション) バッチクエリの終了位置。設定が可能な値は、"latest"
または、各TopicPartition
の終了オフセットを指定する JSON 文字列のいずれかです。JSON 文字列の場合、
{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
の形式を使用します。オフセットとして値-1
を設定する場合、"latest"
の意味になります。 -
"pollTimeoutMs"
: (オプション) Spark ジョブエグゼキュータで、Kafka からデータをポーリングする際のタイムアウト値 (ミリ秒単位)。デフォルト値は512
です。 -
"numRetries"
: (オプション) Kafka オフセットのフェッチが失敗したと判断される前の再試行回数。デフォルト値は3
です。 -
"retryIntervalMs"
: (オプション) Kafka オフセットのフェッチを開始するまでの待機時間 (ミリ秒)。デフォルト値は10
です。 -
"maxOffsetsPerTrigger"
: (オプション) 処理されるオフセットの最大数を、トリガー間隔ごとのレート上限で指定する値。指定されたオフセットの合計数は、異なるボリュームのtopicPartitions
間で均等に分割されます。デフォルト値は「null」です。この場合、コンシューマーは既知の最新のオフセットまで、すべてのオフセットを読み取ります。 -
"minPartitions"
: (オプション) Kafka から読み取ることを想定する、最小のパーティション数。デフォルト値は「null」です。これは、Spark パーティションの数が Kafka パーティションの数に等しいことを意味します。 -
"includeHeaders"
: (オプション) Kafka ヘッダーを含めるかどうかを決定します。このオプションが「true」に設定されている場合、データ出力には、「glue_streaming_kafka_headers」という名前でArray[Struct(key: String, value: String)]
型の列が追加されます。デフォルト値は「false」です。このオプションは、AWS Glue バージョン 3.0 以降でのみ使用可能です。 -
"schema"
: (inferSchema に false を設定した場合は必須) ペイロードの処理に使用するスキーマ。分類がavro
である場合、提供されるスキーマには Avro スキーマ形式を使用する必要があります。分類がavro
以外の場合、提供されるスキーマは DDL スキーマ形式である必要があります。以下に、スキーマの例を示します。
-
"inferSchema"
: (オプション) デフォルト値は「false」です。「true」に設定すると、実行時に、スキーマがforeachbatch
内のペイロードから検出されます。 -
"avroSchema"
: (非推奨) Avro 形式を使用する場合に、Avro データのスキーマを指定するために使用されるパラメータです。このパラメータは非推奨となりました。schema
パラメータを使用します。 -
"addRecordTimestamp"
: (オプション) このオプションを「true」に設定すると、トピックが対応するレコードを受信した時刻を表示する「__src_timestamp」という列が、データ出力に追加で表示されます。デフォルト値は、「false」です。このオプションは AWS Glue のバージョン 4.0 以降でサポートされています。 -
"emitConsumerLagMetrics"
: (オプション) このオプションを「true」に設定すると、バッチごとに、トピックが受信した最も古いレコードと、それが AWS Glue で CloudWatch に到着した時間との、間隔のメトリクスが出力されます。このメトリクスの名前は「glue.driver.streaming.maxConsumerLagInMs」です。デフォルト値は、「false」です。このオプションは AWS Glue バージョン 4.0 以降でサポートされています。
書き込むとき、"connectionType": "kafka"
に次の接続オプションを使用します。
-
"connectionName"
(必須) Kafka クラスターへの接続に使用される AWS Glue 接続の名前 (Kafka ソースと同様)。 -
"topic"
(必須) トピック列が存在する場合、トピック設定オプションが設定されていない限り、指定された行を Kafka に書き込む際にトピック列の値がトピックとして使用されます。つまり、topic
設定オプションはトピック列をオーバーライドします。 -
"partition"
(オプション) 有効なパーティション番号が指定された場合、レコードの送信時にそのpartition
が使用されます。パーティションが指定されずに
key
が存在する場合、キーのハッシュを使用してパーティションが選択されます。key
とpartition
のどちらも存在しない場合、そのパーティションに少なくとも batch.size バイトが生成された際に変更内容のスティッキーパーティション分割に基づいてパーティションが選択されます。 -
"key"
(オプション)partition
が null の場合、パーティション分割に使用されます。 -
"classification"
(オプション) レコードのデータに使用されるファイル形式。JSON、CSV、Avro のみをサポートしています。Avro 形式を使用すると、シリアル化するためにカスタムの AvroSchema を提供できますが、シリアル化解除する場合にもソースに提供する必要があることに注意してください。それ以外の場合、デフォルトで Apache AvroSchema を使用してシリアル化します。
さらに、Kafka プロデューサーの設定パラメーター
ただし、有効にならないオプションの小さな拒否リストがあります。詳細については、「Apache 固有の設定