Kafka 接続 - AWS Glue

Kafka 接続

Kafka 接続を使用すると、Data Catalog テーブルに格納されている情報を使用するか、データストリームに直接アクセスするための情報を指定することにより、Kafka データストリームへ読み込むまたは書き込むことができます。接続は、Kafka クラスターまたは Amazon Managed Streaming for Apache Kafka クラスターをサポートします。Kafka から Spark DataFrame に情報を読み込み、AWS Glue DynamicFrame に変換できます。DynamicFrames を JSON 形式で Kafka に書き込むことができます。データストリームに直接アクセスする場合は、これらのオプションを使用して、データストリームへのアクセス方法に関する情報を提供します。

getCatalogSource または create_data_frame_from_catalog を使用して Kafka ストリーミングソースからレコードを消費するか、getCatalogSink または write_dynamic_frame_from_catalog を使用して Kafka にレコードを書き込み、ジョブに Data Catalog データベースおよびテーブル名の情報があり、それを使用して Kafka ストリーミングソースから読み込むためのいくつかの基本パラメータを取得できる場合。getSourcegetCatalogSinkgetSourceWithFormatgetSinkWithFormatcreateDataFrameFromOptionscreate_data_frame_from_optionswrite_dynamic_frame_from_catalog を使用する場合、ここで説明する接続オプションを使用し、これらの基本パラメータを指定する必要があります。

GlueContext クラスで指定されたメソッドの次の引数を使用し、Kafka の接続オプションを指定できます。

  • Scala

    • connectionOptions: getSourcecreateDataFrameFromOptionsgetSink で使用

    • additionalOptions: getCatalogSourcegetCatalogSink で使用

    • options: getSourceWithFormatgetSinkWithFormat で使用

  • Python

    • connection_options: create_data_frame_from_optionswrite_dynamic_frame_from_options で使用

    • additional_options: create_data_frame_from_catalogwrite_dynamic_frame_from_catalog で使用

    • options: getSourcegetSink で使用

ストリーミング ETL ジョブに関する注意事項と制限事項については、「ストリーミング ETL に関する注意と制限」を参照してください。

トピック

    Kafka の設定

    インターネット経由で利用可能な Kafka ストリームに接続するための AWS の前提条件はありません。

    AWS Glue Kafka 接続を作成して、接続認証情報を管理できます。詳細については、「Apache Kafka データストリームの AWS Glue 接続の作成」を参照してください。AWS Glue ジョブ設定で、connectionName追加ネットワーク接続として指定し、メソッド呼び出しで connectionName パラメータに connectionName を指定します。

    場合によっては、追加の前提条件を設定する必要があります。

    • IAM 認証で Amazon Managed Streaming for Apache Kafka を使用する場合は、適切な IAM 設定が必要になります。

    • Amazon VPC で Amazon Managed Streaming for Apache Kafka を使用する場合は、適切な Amazon VPC 設定が必要になります。Amazon VPC 接続情報を提供する AWS Glue 接続を作成する必要があります。AWS Glue 接続を追加ネットワーク接続として含めるには、ジョブ設定が必要です。

    ストリーミング ETL ジョブの前提条件の詳細については、「AWS Glue でのストリーミング ETL ジョブ」を参照してください。

    例: Kafka ストリームからの読み込み

    forEachBatch と組み合わせて使用します。

    Kafka ストリーミングソースの例:

    kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)

    例: Kafka ストリームへの書き込み

    Kafka への書き込み例:

    getSink メソッドを使った例:

    data_frame_datasource0 = glueContext.getSink( connectionType="kafka", connectionOptions={ JsonOptions("""{ "connectionName": "ConfluentKafka", "classification": "json", "topic": "kafka-auth-topic", "typeOfData": "kafka"} """)}, transformationContext="dataframe_ApacheKafka_node1711729173428") .getDataFrame()

    write_dynamic_frame.from_options メソッドを使った例:

    kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "classification": "json" } data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)

    Kafka 接続オプションのリファレンス

    読み込むとき、"connectionType": "kafka" に次の接続オプションを使用します。

    • "bootstrap.servers" (必須) ブートストラップサーバーの URL のリスト (例: b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094)。このオプションは API 呼び出しで指定するか、データカタログ内のテーブルメタデータで定義する必要があります。

    • "security.protocol" (必須) ブローカーと通信するために使用されるプロトコル。使用できる値は、"SSL" または "PLAINTEXT" です。

    • "topicName" (必須) サブスクライブするトピックのカンマ区切りリスト。"topicName""assign"、または "subscribePattern" の中から、いずれか 1 つのみを指定する必要があります。

    • "assign": (必須) 消費する特定の TopicPartitions を指定する JSON 文字列。"topicName""assign"、または "subscribePattern" の中から、いずれか 1 つのみを指定する必要があります。

      例: '{"topicA":[0,1],"topicB":[2,4]}'

    • "subscribePattern": (必須) サブスクライブする先のトピックリストを識別する Java の正規表現文字列。"topicName""assign"、または "subscribePattern" の中から、いずれか 1 つのみを指定する必要があります。

      例: 'topic.*'

    • "classification" (必須) レコード内のデータで使用されるファイル形式。データカタログを通じて提供されていない限り、必須です。

    • "delimiter" (オプション) classification が CSV の場合に使用される値の区切り文字。デフォルトは「,」です。

    • "startingOffsets": (オプション) Kafka トピック内で、データの読み取りを開始する位置 使用できる値は、"earliest" または "latest" です。デフォルト値は "latest" です。

    • "startingTimestamp": (オプション、AWS Glue バージョン 4.0 以降でのみサポート) Kafka トピック内で、データの読み込みを開始するレコードのタイムスタンプ。指定できる値は UTC 形式 (yyyy-mm-ddTHH:MM:SSZ のパターン) のタイムスタンプ文字列です (Z は UTC タイムゾーンのオフセットを +/- で表します。例: 「2023-04-04T08:00:00-04:00」)。

      注: AWS Glue ストリーミングスクリプトの接続オプションリストには、「startingOffsets」または「startingTimestamp」のいずれかしか表示できません。これらのプロパティの両方を含めると、ジョブが失敗します。

    • "endingOffsets": (オプション) バッチクエリの終了位置。設定が可能な値は、"latest" または、各 TopicPartition の終了オフセットを指定する JSON 文字列のいずれかです。

      JSON 文字列の場合、{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} の形式を使用します。オフセットとして値 -1 を設定する場合、"latest" の意味になります。

    • "pollTimeoutMs": (オプション) Spark ジョブエグゼキュータで、Kafka からデータをポーリングする際のタイムアウト値 (ミリ秒単位)。デフォルト値は 512 です。

    • "numRetries": (オプション) Kafka オフセットのフェッチが失敗したと判断される前の再試行回数。デフォルト値は 3 です。

    • "retryIntervalMs": (オプション) Kafka オフセットのフェッチを開始するまでの待機時間 (ミリ秒)。デフォルト値は 10 です。

    • "maxOffsetsPerTrigger": (オプション) 処理されるオフセットの最大数を、トリガー間隔ごとのレート上限で指定する値。指定されたオフセットの合計数は、異なるボリュームの topicPartitions 間で均等に分割されます。デフォルト値は「null」です。この場合、コンシューマーは既知の最新のオフセットまで、すべてのオフセットを読み取ります。

    • "minPartitions": (オプション) Kafka から読み取ることを想定する、最小のパーティション数。デフォルト値は「null」です。これは、Spark パーティションの数が Kafka パーティションの数に等しいことを意味します。

    • "includeHeaders": (オプション) Kafka ヘッダーを含めるかどうかを決定します。このオプションが「true」に設定されている場合、データ出力には、「glue_streaming_kafka_headers」という名前で Array[Struct(key: String, value: String)] 型の列が追加されます。デフォルト値は「false」です。このオプションは、AWS Glue バージョン 3.0 以降でのみ使用可能です。

    • "schema": (inferSchema に false を設定した場合は必須) ペイロードの処理に使用するスキーマ。分類が avro である場合、提供されるスキーマには Avro スキーマ形式を使用する必要があります。分類が avro 以外の場合、提供されるスキーマは DDL スキーマ形式である必要があります。

      以下に、スキーマの例を示します。

      Example in DDL schema format
      'column1' INT, 'column2' STRING , 'column3' FLOAT
      Example in Avro schema format
      { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
    • "inferSchema": (オプション) デフォルト値は「false」です。「true」に設定すると、実行時に、スキーマが foreachbatch 内のペイロードから検出されます。

    • "avroSchema": (非推奨) Avro 形式を使用する場合に、Avro データのスキーマを指定するために使用されるパラメータです。このパラメータは非推奨となりました。schema パラメータを使用します。

    • "addRecordTimestamp": (オプション) このオプションを「true」に設定すると、トピックが対応するレコードを受信した時刻を表示する「__src_timestamp」という列が、データ出力に追加で表示されます。デフォルト値は、「false」です。このオプションは AWS Glue のバージョン 4.0 以降でサポートされています。

    • "emitConsumerLagMetrics": (オプション) このオプションを「true」に設定すると、バッチごとに、トピックが受信した最も古いレコードと、それが AWS Glue で CloudWatch に到着した時間との、間隔のメトリクスが出力されます。このメトリクスの名前は「glue.driver.streaming.maxConsumerLagInMs」です。デフォルト値は、「false」です。このオプションは AWS Glue バージョン 4.0 以降でサポートされています。

    書き込むとき、"connectionType": "kafka" に次の接続オプションを使用します。

    • "connectionName" (必須) Kafka クラスターへの接続に使用される AWS Glue 接続の名前 (Kafka ソースと同様)。

    • "topic" (必須) トピック列が存在する場合、トピック設定オプションが設定されていない限り、指定された行を Kafka に書き込む際にトピック列の値がトピックとして使用されます。つまり、topic 設定オプションはトピック列をオーバーライドします。

    • "partition" (オプション) 有効なパーティション番号が指定された場合、レコードの送信時にその partition が使用されます。

      パーティションが指定されずに key が存在する場合、キーのハッシュを使用してパーティションが選択されます。

      keypartition のどちらも存在しない場合、そのパーティションに少なくとも batch.size バイトが生成された際に変更内容のスティッキーパーティション分割に基づいてパーティションが選択されます。

    • "key" (オプション) partition が null の場合、パーティション分割に使用されます。

    • "classification" (オプション) レコードのデータに使用されるファイル形式。JSON、CSV、Avro のみをサポートしています。

      Avro 形式を使用すると、シリアル化するためにカスタムの AvroSchema を提供できますが、シリアル化解除する場合にもソースに提供する必要があることに注意してください。それ以外の場合、デフォルトで Apache AvroSchema を使用してシリアル化します。

    さらに、Kafka プロデューサーの設定パラメーターを更新することにより、必要に応じて Kafka シンクを微調整できます。接続オプションには許可リストがないことに注意してください。すべてのキー値のペアはそのままシンクに保持されます。

    ただし、有効にならないオプションの小さな拒否リストがあります。詳細については、「Apache 固有の設定」を参照してください。