AWS Glue ストリーミング接続
以下のセクションでは、AWS Glue Streaming の接続の使用方法についての情報を提供します。
Kafka 接続の操作
Kafka 接続を使用すると、Data Catalog テーブルに格納されている情報を使用するか、データストリームに直接アクセスするための情報を指定することにより、Kafka データストリームへ読み込むまたは書き込むことができます。接続は、Kafka クラスターまたは Amazon Managed Streaming for Apache Kafka クラスターをサポートします。Kafka から Spark DataFrame に情報を読み込み、AWS Glue DynamicFrame に変換できます。DynamicFrames を JSON 形式で Kafka に書き込むことができます。データストリームに直接アクセスする場合は、これらのオプションを使用して、データストリームへのアクセス方法に関する情報を提供します。
getCatalogSource
または create_data_frame_from_catalog
を使用して Kafka ストリーミングソースからレコードを消費するか、getCatalogSink
または write_dynamic_frame_from_catalog
を使用して Kafka にレコードを書き込み、ジョブに Data Catalog データベースおよびテーブル名の情報があり、それを使用して Kafka ストリーミングソースから読み込むためのいくつかの基本パラメータを取得できる場合。getSource
、getCatalogSink
、getSourceWithFormat
、getSinkWithFormat
、createDataFrameFromOptions
、create_data_frame_from_options
、write_dynamic_frame_from_catalog
を使用する場合、ここで説明する接続オプションを使用し、これらの基本パラメータを指定する必要があります。
GlueContext
クラスで指定されたメソッドの次の引数を使用し、Kafka の接続オプションを指定できます。
-
Scala
-
connectionOptions
:getSource
、createDataFrameFromOptions
、getSink
で使用 -
additionalOptions
:getCatalogSource
、getCatalogSink
で使用 -
options
:getSourceWithFormat
、getSinkWithFormat
で使用
-
-
Python
-
connection_options
:create_data_frame_from_options
、write_dynamic_frame_from_options
で使用 -
additional_options
:create_data_frame_from_catalog
、write_dynamic_frame_from_catalog
で使用 -
options
:getSource
、getSink
で使用
-
ストリーミング ETL ジョブに関する注意事項と制限事項については、「ストリーミング ETL に関する注意と制限」を参照してください。
トピック
Kafka の設定
インターネット経由で利用可能な Kafka ストリームに接続するための AWS の前提条件はありません。
AWS Glue Kafka 接続を作成して、接続認証情報を管理できます。詳細については、「Apache Kafka データストリームの AWS Glue 接続の作成」を参照してください。AWS Glue ジョブ設定で、connectionName
を追加ネットワーク接続として指定し、メソッド呼び出しで connectionName
パラメータに connectionName
を指定します。
場合によっては、追加の前提条件を設定する必要があります。
-
IAM 認証で Amazon Managed Streaming for Apache Kafka を使用する場合は、適切な IAM 設定が必要になります。
-
Amazon VPC で Amazon Managed Streaming for Apache Kafka を使用する場合は、適切な Amazon VPC 設定が必要になります。Amazon VPC 接続情報を提供する AWS Glue 接続を作成する必要があります。AWS Glue 接続を追加ネットワーク接続として含めるには、ジョブ設定が必要です。
ストリーミング ETL ジョブの前提条件の詳細については、「AWS Glue でのストリーミング ETL ジョブ」を参照してください。
例: Kafka ストリームからの読み込み
forEachBatch と組み合わせて使用します。
Kafka ストリーミングソースの例:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
例: Kafka ストリームへの書き込み
Kafka への書き込み例:
getSink
メソッドを使った例:
data_frame_datasource0 = glueContext.getSink( connectionType="kafka", connectionOptions={ JsonOptions("""{ "connectionName": "ConfluentKafka", "classification": "json", "topic": "kafka-auth-topic", "typeOfData": "kafka"} """)}, transformationContext="dataframe_ApacheKafka_node1711729173428") .getDataFrame()
write_dynamic_frame.from_options
メソッドを使った例:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "classification": "json" } data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)
Kafka 接続オプションのリファレンス
読み込むとき、"connectionType": "kafka"
に次の接続オプションを使用します。
-
"bootstrap.servers"
(必須) ブートストラップサーバーの URL のリスト (例:b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094
)。このオプションは API 呼び出しで指定するか、データカタログ内のテーブルメタデータで定義する必要があります。 -
"security.protocol"
(必須) ブローカーと通信するために使用されるプロトコル。使用できる値は、"SSL"
または"PLAINTEXT"
です。 -
"topicName"
(必須) サブスクライブするトピックのカンマ区切りリスト。"topicName"
、"assign"
、または"subscribePattern"
の中から、いずれか 1 つのみを指定する必要があります。 -
"assign"
: (必須) 消費する特定のTopicPartitions
を指定する JSON 文字列。"topicName"
、"assign"
、または"subscribePattern"
の中から、いずれか 1 つのみを指定する必要があります。例: '{"topicA":[0,1],"topicB":[2,4]}'
-
"subscribePattern"
: (必須) サブスクライブする先のトピックリストを識別する Java の正規表現文字列。"topicName"
、"assign"
、または"subscribePattern"
の中から、いずれか 1 つのみを指定する必要があります。例: 'topic.*'
-
"classification"
(必須) レコード内のデータで使用されるファイル形式。データカタログを通じて提供されていない限り、必須です。 -
"delimiter"
(オプション)classification
が CSV の場合に使用される値の区切り文字。デフォルトは「,
」です。 -
"startingOffsets"
: (オプション) Kafka トピック内で、データの読み取りを開始する位置 使用できる値は、"earliest"
または"latest"
です。デフォルト値は"latest"
です。 -
"startingTimestamp"
: (オプション、AWS Glue バージョン 4.0 以降でのみサポート) Kafka トピック内で、データの読み込みを開始するレコードのタイムスタンプ。指定できる値は UTC 形式 (yyyy-mm-ddTHH:MM:SSZ
のパターン) のタイムスタンプ文字列です (Z
は UTC タイムゾーンのオフセットを +/- で表します。例: 「2023-04-04T08:00:00-04:00」)。注: AWS Glue ストリーミングスクリプトの接続オプションリストには、「startingOffsets」または「startingTimestamp」のいずれかしか表示できません。これらのプロパティの両方を含めると、ジョブが失敗します。
-
"endingOffsets"
: (オプション) バッチクエリの終了位置。設定が可能な値は、"latest"
または、各TopicPartition
の終了オフセットを指定する JSON 文字列のいずれかです。JSON 文字列の場合、
{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
の形式を使用します。オフセットとして値-1
を設定する場合、"latest"
の意味になります。 -
"pollTimeoutMs"
: (オプション) Spark ジョブエグゼキュータで、Kafka からデータをポーリングする際のタイムアウト値 (ミリ秒単位)。デフォルト値は512
です。 -
"numRetries"
: (オプション) Kafka オフセットのフェッチが失敗したと判断される前の再試行回数。デフォルト値は3
です。 -
"retryIntervalMs"
: (オプション) Kafka オフセットのフェッチを開始するまでの待機時間 (ミリ秒)。デフォルト値は10
です。 -
"maxOffsetsPerTrigger"
: (オプション) 処理されるオフセットの最大数を、トリガー間隔ごとのレート上限で指定する値。指定されたオフセットの合計数は、異なるボリュームのtopicPartitions
間で均等に分割されます。デフォルト値は「null」です。この場合、コンシューマーは既知の最新のオフセットまで、すべてのオフセットを読み取ります。 -
"minPartitions"
: (オプション) Kafka から読み取ることを想定する、最小のパーティション数。デフォルト値は「null」です。これは、Spark パーティションの数が Kafka パーティションの数に等しいことを意味します。 -
"includeHeaders"
: (オプション) Kafka ヘッダーを含めるかどうかを決定します。このオプションが「true」に設定されている場合、データ出力には、「glue_streaming_kafka_headers」という名前でArray[Struct(key: String, value: String)]
型の列が追加されます。デフォルト値は「false」です。このオプションは、AWS Glue バージョン 3.0 以降でのみ使用可能です。 -
"schema"
: (inferSchema に false を設定した場合は必須) ペイロードの処理に使用するスキーマ。分類がavro
である場合、提供されるスキーマには Avro スキーマ形式を使用する必要があります。分類がavro
以外の場合、提供されるスキーマは DDL スキーマ形式である必要があります。以下に、スキーマの例を示します。
-
"inferSchema"
: (オプション) デフォルト値は「false」です。「true」に設定すると、実行時に、スキーマがforeachbatch
内のペイロードから検出されます。 -
"avroSchema"
: (非推奨) Avro 形式を使用する場合に、Avro データのスキーマを指定するために使用されるパラメータです。このパラメータは非推奨となりました。schema
パラメータを使用します。 -
"addRecordTimestamp"
: (オプション) このオプションを「true」に設定すると、トピックが対応するレコードを受信した時刻を表示する「__src_timestamp」という列が、データ出力に追加で表示されます。デフォルト値は、「false」です。このオプションは AWS Glue のバージョン 4.0 以降でサポートされています。 -
"emitConsumerLagMetrics"
: (オプション) このオプションを「true」に設定すると、バッチごとに、トピックが受信した最も古いレコードと、それが AWS Glue で CloudWatch に到着した時間との、間隔のメトリクスが出力されます。このメトリクスの名前は「glue.driver.streaming.maxConsumerLagInMs」です。デフォルト値は、「false」です。このオプションは AWS Glue バージョン 4.0 以降でサポートされています。
書き込むとき、"connectionType": "kafka"
に次の接続オプションを使用します。
-
"connectionName"
(必須) Kafka クラスターへの接続に使用される AWS Glue 接続の名前 (Kafka ソースと同様)。 -
"topic"
(必須) トピック列が存在する場合、トピック設定オプションが設定されていない限り、指定された行を Kafka に書き込む際にトピック列の値がトピックとして使用されます。つまり、topic
設定オプションはトピック列をオーバーライドします。 -
"partition"
(オプション) 有効なパーティション番号が指定された場合、レコードの送信時にそのpartition
が使用されます。パーティションが指定されずに
key
が存在する場合、キーのハッシュを使用してパーティションが選択されます。key
とpartition
のどちらも存在しない場合、そのパーティションに少なくとも batch.size バイトが生成された際に変更内容のスティッキーパーティション分割に基づいてパーティションが選択されます。 -
"key"
(オプション)partition
が null の場合、パーティション分割に使用されます。 -
"classification"
(オプション) レコードのデータに使用されるファイル形式。JSON、CSV、Avro のみをサポートしています。Avro 形式を使用すると、シリアル化するためにカスタムの AvroSchema を提供できますが、シリアル化解除する場合にもソースに提供する必要があることに注意してください。それ以外の場合、デフォルトで Apache AvroSchema を使用してシリアル化します。
さらに、Kafka プロデューサーの設定パラメーター
ただし、有効にならないオプションの小さな拒否リストがあります。詳細については、「Apache 固有の設定
Kinesis 接続の操作
Kinesis 接続を使用すると、データカタログ テーブルに保存されている情報を使用して、またはデータ ストリームに直接アクセスするための情報を提供することで、Amazon Kinesis データ ストリームの読み取りと書き込みを行うことができます。Kinesis から Spark DataFrame に情報を読み取り、それを AWS Glue DynamicFrame に変換できます。DynamicFrames は JSON 形式で Kinesis に書き込むことができます。データストリームに直接アクセスする場合は、これらのオプションを使用して、データストリームへのアクセス方法に関する情報を提供します。
getCatalogSource
または create_data_frame_from_catalog
を使用して Kinesis ストリーミングソースからレコードを消費する場合、ジョブはデータカタログデータベースとテーブル名の情報を持っており、それを使用して Kinesis ストリーミングソースから読み込むためのいくつかの基本パラメータを取得することができます。getSource
、getSourceWithFormat
、createDataFrameFromOptions
、または create_data_frame_from_options
を使用している場合、ここで説明する接続オプションを使用して、これらの基本パラメータを指定する必要があります。
Kinesis の接続オプションは、GlueContext
クラス内の指定されたメソッドに対して以下の引数で指定することができます。
-
Scala
-
connectionOptions
:getSource
、createDataFrameFromOptions
、getSink
で使用 -
additionalOptions
:getCatalogSource
、getCatalogSink
で使用 -
options
:getSourceWithFormat
、getSinkWithFormat
で使用
-
-
Python
-
connection_options
:create_data_frame_from_options
、write_dynamic_frame_from_options
で使用 -
additional_options
:create_data_frame_from_catalog
、write_dynamic_frame_from_catalog
で使用 -
options
:getSource
、getSink
で使用
-
ストリーミング ETL ジョブに関する注意事項と制限事項については、「ストリーミング ETL に関する注意と制限」を参照してください。
Kinesis の設定
AWS Glue Spark ジョブで Kinesis データストリームに接続するには、いくつかの前提条件が必要です。
読み取る場合、AWS Glue ジョブには、Kinesis データストリームへの読み取りアクセスレベルの IAM 権限が必要です。
書き込みの場合、AWS Glue ジョブには、Kinesis データストリームへの書き込みアクセスレベルの IAM 権限が必要です。
場合によっては、追加の前提条件を設定する必要があります。
-
AWS Glue ジョブが (通常は他のデータセットに接続するための) 追加のネットワーク接続で設定されていて、その接続のいずれかが Amazon VPC ネットワークオプションを提供している場合、ジョブは Amazon VPC 経由で通信するように指示されます。この場合、Amazon VPC を介して通信するように Kinesis データストリームを設定する必要もあります。そのためには、Amazon VPC と Kinesis データストリームの間にインターフェイス VPC エンドポイントを作成します。詳細については、「インターフェイス VPC エンドポイントと Amazon Kinesis Data Streams の使用」を参照してください。
-
別のアカウントで Amazon Kinesis Data Streams を指定する場合は、クロスアカウントアクセスを許可するようにロールとポリシーを設定する必要があります。詳細については、「Example: Read From a Kinesis Stream in a Different Account」を参照してください。
ストリーミング ETL ジョブの前提条件の詳細については、「AWS Glue でのストリーミング ETL ジョブ」を参照してください。
Kinesis からの読み込み
例: Kinesis ストリームからの読み込み
forEachBatch と組み合わせて使用します。
Amazon Kinesis ストリーミングソースの例:
kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
Kinesis への書き込み
例: Kinesis ストリームへの書き込み
forEachBatch と組み合わせて使用します。DynamicFrame は JSON 形式でストリームに書き込まれます。何度か再試行してもジョブの書き込みが実行できないと、ジョブは失敗します。デフォルトでは、各 DynamicFrame レコードは Kinesis ストリームに個別に送信されます。この動作は、aggregationEnabled
と関連するパラメータを使用して設定できます。
ストリーミングジョブから Amazon Kinesis への書き込みの例:
Kinesis 接続パラメータ
Amazon Kinesis Data Streams への接続を指定します。
Kinesis ストリーミングデータソースには、次の接続オプションを使用します。
-
"streamARN"
(必須) 読み取り/書き込みに使用されます。Kinesis データストリームの ARN。 -
"classification"
(読み取りに必須) 読み取りで使用。レコード内のデータで使用されるファイル形式。データカタログを通じて提供されていない限り、必須です。 -
"streamName"
- (オプション) 読み取りで使用。読み取り対象/読み取り元の Kinesis データストリームの名前。endpointUrl
で使用。 -
"endpointUrl"
- (オプション) 読み取りで使用。デフォルト: "https://kinesis.us-east-1.amazonaws.com"。Kinesis エンドポイントの AWS エンドポイント。特別なリージョンに接続する場合を除き、これを変更する必要はありません。 -
"partitionKey"
- (オプション) 書き込みに使用。レコードを作成する際に使用される Kinesis パーティションキー。 -
"delimiter"
(オプション) 読み取りに使用。classification
が CSV の場合に使用される値の区切り文字。デフォルトは「,
」です。 -
"startingPosition"
: (オプション) 読み込みに使用。Kinesis データストリーム内の、データの読み取り開始位置。指定できる値は"latest"
、"trim_horizon"
、"earliest"
、または UTC 形式のタイムスタンプ文字列であり、この文字列のパターンはyyyy-mm-ddTHH:MM:SSZ
です (Z
は UTC タイムゾーンのオフセットを +/- で表します。例:「2023-04-04T08:00:00-04:00」)。デフォルト値は"latest"
です。注意:"startingPosition"
の UTC 形式のタイムスタンプ文字列は AWS Glue バージョン 4.0 以降のみでサポートされます。 -
"failOnDataLoss"
: (オプション) アクティブなシャードがないか、有効期限が切れている場合、ジョブは失敗します。デフォルト値は"false"
です。 -
"awsSTSRoleARN"
: (オプション) 読み取り/書き込みに使用。AWS Security Token Service (AWS STS) を使用して担うロールの Amazon リソースネーム (ARN)。このロールには、Kinesis データストリームのレコードの説明操作または読み取り操作の権限が必要です。このパラメーターは、別のアカウントのデータストリームにアクセスするときに使用する必要があります。"awsSTSSessionName"
と組み合わせて使用します。 -
"awsSTSSessionName"
: (オプション) 読み取り/書き込みに使用。AWS STS を使って、ロールを担うセッションの識別子。このパラメーターは、別のアカウントのデータストリームにアクセスするときに使用する必要があります。"awsSTSRoleARN"
と組み合わせて使用します。 -
"awsSTSEndpoint"
: (オプション) 引き受けたロールで Kinesis に接続するときに使用する AWS STS エンドポイント。これにより、デフォルトのグローバルエンドポイントでは不可能な VPC 内のリージョナル AWS STS エンドポイントを使用できます。 -
"maxFetchTimeInMs"
: (オプション) 読み込みに使用。ジョブエグゼキューターが Kinesis データストリームから現在のバッチのレコードを読み取るために費やした最大時間は、ミリ秒 (ms) 単位で指定されます。この時間内に複数のGetRecords
API コールを行うことができます。デフォルト値は1000
です。 -
"maxFetchRecordsPerShard"
: (オプション) 読み込みに使用。1 マイクロバッチ当たりに Kinesis データストリームでシャードごとにフェッチするレコードの最大数。メモ: ストリーミングジョブが既に Kinesis (同じ get-records 呼び出しで) から余分なレコードを読み取っている場合、クライアントはこの制限を超えることができます。maxFetchRecordsPerShard
が厳密である必要がある場合、maxRecordPerRead
の倍数にする必要があります。デフォルト値は100000
です。 -
"maxRecordPerRead"
: (オプション) 読み込みに使用。getRecords
オペレーションごとに、Kinesis データストリームからフェッチするレコードの最大数。デフォルト値は10000
です。 -
"addIdleTimeBetweenReads"
: (オプション) 読み込みに使用。2 つの連続するgetRecords
オペレーション間の遅延時間を追加します。デフォルト値は"False"
です。このオプションは、Glue バージョン 2.0 以降でのみ設定可能です。 -
"idleTimeBetweenReadsInMs"
: (オプション) 読み込みに使用。2 つの連続するgetRecords
オペレーション間での、最短の遅延時間 (ミリ秒単位で指定)。デフォルト値は1000
です。このオプションは、Glue バージョン 2.0 以降でのみ設定可能です。 -
"describeShardInterval"
: (オプション) 読み込みに使用。スクリプトが呼び出す 2 つのListShards
API 間での、再シャーディングを考慮すべき最小時間。詳細については、Amazon Kinesis Data Streams デベロッパーガイドの「リシャーディングのための戦略」を参照してください。デフォルト値は1s
です。 -
"numRetries"
: (オプション) 読み込みに使用。Kinesis Data Streams API リクエストを再試行する最大の回数。デフォルト値は3
です。 -
"retryIntervalMs"
: (オプション) 読み込みに使用。Kinesis Data Streams API 呼び出しを再試行するまでのクールオフ期間 (ミリ秒単位で指定)。デフォルト値は1000
です。 -
"maxRetryIntervalMs"
: (オプション) 読み込みに使用。再試行で 2 つの Kinesis Data Streams API を呼び出す間の最大クールオフ期間 (ミリ秒単位で指定)。デフォルト値は10000
です。 -
"avoidEmptyBatches"
: (オプション) 読み込みに使用。バッチ処理を開始する前に、Kinesis データストリームで未読のデータをチェックすることで、空のマイクロバッチジョブを作成しないようにします。デフォルト値は"False"
です。 -
"schema"
: (inferSchema に false を設定した場合は必須) 読み取りに使用。ペイロードの処理に使用するスキーマ。分類がavro
である場合、提供されるスキーマには Avro スキーマ形式を使用する必要があります。分類がavro
以外の場合、提供されるスキーマは DDL スキーマ形式である必要があります。以下に、スキーマの例を示します。
-
"inferSchema"
: (オプション) 読み込みに使用。デフォルト値は、「false」です。「true」に設定すると、実行時に、スキーマがforeachbatch
内のペイロードから検出されます。 -
"avroSchema"
: (非推奨) 読み取りに使用。Avro 形式を使用する場合に、Avro データのスキーマを指定するために使用されるパラメータです。このパラメータは非推奨となりました。schema
パラメータを使用します。 -
"addRecordTimestamp"
: (オプション) 読み込みに使用。このオプションが「true」に設定されている場合、データ出力には、対応するレコードがストリームによって受信された時刻を表示する「__src_timestamp」という名前が付けられた追加の列が含まれます。デフォルト値は、「false」です。このオプションは AWS Glue バージョン 4.0 以降でサポートされています。 -
"emitConsumerLagMetrics"
: (オプション) 読み込みに使用。このオプションを「true」に設定すると、バッチごとに、ストリームが受信した最も古いレコードと、それが AWS Glue で CloudWatch に到着した時間との間隔のメトリクスが出力されます。メトリクスの名前は「glue.driver.streaming.maxConsumerLagInMs」です。デフォルト値は、「false」です。このオプションは AWS Glue バージョン 4.0 以降でサポートされています。 -
"fanoutConsumerARN"
: (オプション) 読み込みに使用。streamARN
で指定されたストリームの Kinesis ストリームコンシューマの ARN。Kinesis 接続の拡張ファンアウトモードを有効にするために使用されます。拡張ファンアウトが使用された Kinesis ストリームの使用に関する詳細については、「Kinesis ストリーミングジョブでの拡張ファンアウトの使用」を参照してください。 -
"recordMaxBufferedTime"
- (オプション) 書き込みに使用。デフォルト: 1000 (ミリ秒)。レコードが書き込まれるのを待っている間にバッファリングされる最大時間。 -
"aggregationEnabled"
- (オプション) 書き込みに使用。デフォルト: true。Kinesis に送信する前にレコードを集約するかどうかを指定します。 -
"aggregationMaxSize"
- (オプション) 書き込みに使用。デフォルト: 51200 (バイト) レコードがこの制限よりも大きい場合、そのレコードはアグリゲータをバイパスします。注: Kinesis では、レコードサイズに 50 KB の制限が適用されます。これを 50 KB を超えて設定すると、サイズ超過のレコードは Kinesis によって拒否されます。 -
"aggregationMaxCount"
- (オプション) 書き込みに使用。デフォルト: 4294967295。集計されたレコードにパックされる項目の最大数。 -
"producerRateLimit"
- (オプション) 書き込みに使用。デフォルト: 150 (%)。1 つのプロデューサー (ジョブなど) からの送信されるシャード単位のスループットをバックエンド制限のパーセンテージとして制限できます。 -
"collectionMaxCount"
- (オプション) 書き込みに使用。デフォルト: 500。PutRecords レコードにパックされる項目の最大数。 -
"collectionMaxSize"
- (オプション) 書き込みに使用。デフォルト: 5242880 (バイト)。PutRecords リクエストで送信するデータの最大量。