Kinesis 接続 - AWS Glue

Kinesis 接続

Kinesis 接続を使用すると、データカタログ テーブルに保存されている情報を使用して、またはデータ ストリームに直接アクセスするための情報を提供することで、Amazon Kinesis データ ストリームの読み取りと書き込みを行うことができます。Kinesis から Spark DataFrame に情報を読み取り、それを AWS Glue DynamicFrame に変換できます。DynamicFrames は JSON 形式で Kinesis に書き込むことができます。データストリームに直接アクセスする場合は、これらのオプションを使用して、データストリームへのアクセス方法に関する情報を提供します。

getCatalogSource または create_data_frame_from_catalog を使用して Kinesis ストリーミングソースからレコードを消費する場合、ジョブはデータカタログデータベースとテーブル名の情報を持っており、それを使用して Kinesis ストリーミングソースから読み込むためのいくつかの基本パラメータを取得することができます。getSourcegetSourceWithFormatcreateDataFrameFromOptions、または create_data_frame_from_options を使用している場合、ここで説明する接続オプションを使用して、これらの基本パラメータを指定する必要があります。

Kinesis の接続オプションは、GlueContext クラス内の指定されたメソッドに対して以下の引数で指定することができます。

  • Scala

    • connectionOptions: getSourcecreateDataFrameFromOptionsgetSink で使用

    • additionalOptions: getCatalogSourcegetCatalogSink で使用

    • options: getSourceWithFormatgetSinkWithFormat で使用

  • Python

    • connection_options: create_data_frame_from_optionswrite_dynamic_frame_from_options で使用

    • additional_options: create_data_frame_from_catalogwrite_dynamic_frame_from_catalog で使用

    • options: getSourcegetSink で使用

ストリーミング ETL ジョブに関する注意事項と制限事項については、「ストリーミング ETL に関する注意と制限」を参照してください。

Kinesis の設定

AWS Glue Spark ジョブで Kinesis データストリームに接続するには、いくつかの前提条件が必要です。

  • 読み取る場合、AWS Glue ジョブには、Kinesis データストリームへの読み取りアクセスレベルの IAM 権限が必要です。

  • 書き込みの場合、AWS Glue ジョブには、Kinesis データストリームへの書き込みアクセスレベルの IAM 権限が必要です。

場合によっては、追加の前提条件を設定する必要があります。

  • AWS Glue ジョブが (通常は他のデータセットに接続するための) 追加のネットワーク接続で設定されていて、その接続のいずれかが Amazon VPC ネットワークオプションを提供している場合、ジョブは Amazon VPC 経由で通信するように指示されます。この場合、Amazon VPC を介して通信するように Kinesis データストリームを設定する必要もあります。そのためには、Amazon VPC と Kinesis データストリームの間にインターフェイス VPC エンドポイントを作成します。詳細については、「インターフェイス VPC エンドポイントと Amazon Kinesis Data Streams の使用」を参照してください。

  • 別のアカウントで Amazon Kinesis Data Streams を指定する場合は、クロスアカウントアクセスを許可するようにロールとポリシーを設定する必要があります。詳細については、「Example: Read From a Kinesis Stream in a Different Account」を参照してください。

ストリーミング ETL ジョブの前提条件の詳細については、「AWS Glue でのストリーミング ETL ジョブ」を参照してください。

例: Kinesis ストリームからの読み込み

例: Kinesis ストリームからの読み込み

forEachBatch と組み合わせて使用します。

Amazon Kinesis ストリーミングソースの例:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

例: Kinesis ストリームへの書き込み

例: Kinesis ストリームからの読み込み

forEachBatch と組み合わせて使用します。

Amazon Kinesis ストリーミングソースの例:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Kinesis 接続オプションのリファレンス

Amazon Kinesis Data Streams への接続を指定します。

Kinesis ストリーミングデータソースには、次の接続オプションを使用します。

  • "streamARN" (必須) 読み取り/書き込みに使用されます。Kinesis データストリームの ARN。

  • "classification" (読み取りに必須) 読み取りで使用。レコード内のデータで使用されるファイル形式。データカタログを通じて提供されていない限り、必須です。

  • "streamName" - (オプション) 読み取りで使用。読み取り対象/読み取り元の Kinesis データストリームの名前。endpointUrl で使用。

  • "endpointUrl" - (オプション) 読み取りで使用。デフォルト: "https://kinesis.us-east-1.amazonaws.com"。Kinesis エンドポイントの AWS エンドポイント。特別なリージョンに接続する場合を除き、これを変更する必要はありません。

  • "partitionKey" - (オプション) 書き込みに使用。レコードを作成する際に使用される Kinesis パーティションキー。

  • "delimiter" (オプション) 読み取りに使用。classification が CSV の場合に使用される値の区切り文字。デフォルトは「,」です。

  • "startingPosition": (オプション) 読み込みに使用。Kinesis データストリーム内の、データの読み取り開始位置。指定できる値は "latest""trim_horizon""earliest"、または UTC 形式のタイムスタンプ文字列であり、この文字列のパターンは yyyy-mm-ddTHH:MM:SSZ です (Z は UTC タイムゾーンのオフセットを +/- で表します。例:「2023-04-04T08:00:00-04:00」)。デフォルト値は "latest" です。注意: "startingPosition" の UTC 形式のタイムスタンプ文字列は AWS Glue バージョン 4.0 以降のみでサポートされます。

  • "failOnDataLoss": (オプション) アクティブなシャードがないか、有効期限が切れている場合、ジョブは失敗します。デフォルト値は "false" です。

  • "awsSTSRoleARN": (オプション) 読み取り/書き込みに使用。AWS Security Token Service (AWS STS) を使用して担うロールの Amazon リソースネーム (ARN)。このロールには、Kinesis データストリームのレコードの説明操作または読み取り操作の権限が必要です。このパラメーターは、別のアカウントのデータストリームにアクセスするときに使用する必要があります。"awsSTSSessionName" と組み合わせて使用します。

  • "awsSTSSessionName": (オプション) 読み取り/書き込みに使用。AWS STS を使って、ロールを担うセッションの識別子。このパラメーターは、別のアカウントのデータストリームにアクセスするときに使用する必要があります。"awsSTSRoleARN" と組み合わせて使用します。

  • "awsSTSEndpoint": (オプション) 引き受けたロールで Kinesis に接続するときに使用する AWS STS エンドポイント。これにより、デフォルトのグローバルエンドポイントでは不可能な VPC 内のリージョナル AWS STS エンドポイントを使用できます。

  • "maxFetchTimeInMs": (オプション) 読み込みに使用。ジョブエグゼキューターが Kinesis データストリームから現在のバッチのレコードを読み取るために費やした最大時間は、ミリ秒 (ms) 単位で指定されます。この時間内に複数の GetRecords API コールを行うことができます。デフォルト値は 1000 です。

  • "maxFetchRecordsPerShard": (オプション) 読み込みに使用。1 マイクロバッチ当たりに Kinesis データストリームでシャードごとにフェッチするレコードの最大数。メモ: ストリーミングジョブが既に Kinesis (同じ get-records 呼び出しで) から余分なレコードを読み取っている場合、クライアントはこの制限を超えることができます。maxFetchRecordsPerShard が厳密である必要がある場合、maxRecordPerRead の倍数にする必要があります。デフォルト値は 100000 です。

  • "maxRecordPerRead": (オプション) 読み込みに使用。getRecords オペレーションごとに、Kinesis データストリームからフェッチするレコードの最大数。デフォルト値は 10000 です。

  • "addIdleTimeBetweenReads": (オプション) 読み込みに使用。2 つの連続する getRecords オペレーション間の遅延時間を追加します。デフォルト値は "False" です。このオプションは、Glue バージョン 2.0 以降でのみ設定可能です。

  • "idleTimeBetweenReadsInMs": (オプション) 読み込みに使用。2 つの連続する getRecords オペレーション間での、最短の遅延時間 (ミリ秒単位で指定)。デフォルト値は 1000 です。このオプションは、Glue バージョン 2.0 以降でのみ設定可能です。

  • "describeShardInterval": (オプション) 読み込みに使用。スクリプトが呼び出す 2 つの ListShards API 間での、再シャーディングを考慮すべき最小時間。詳細については、Amazon Kinesis Data Streams デベロッパーガイドの「リシャーディングのための戦略」を参照してください。デフォルト値は 1s です。

  • "numRetries": (オプション) 読み込みに使用。Kinesis Data Streams API リクエストを再試行する最大の回数。デフォルト値は 3 です。

  • "retryIntervalMs": (オプション) 読み込みに使用。Kinesis Data Streams API 呼び出しを再試行するまでのクールオフ期間 (ミリ秒単位で指定)。デフォルト値は 1000 です。

  • "maxRetryIntervalMs": (オプション) 読み込みに使用。再試行で 2 つの Kinesis Data Streams API を呼び出す間の最大クールオフ期間 (ミリ秒単位で指定)。デフォルト値は 10000 です。

  • "avoidEmptyBatches": (オプション) 読み込みに使用。バッチ処理を開始する前に、Kinesis データストリームで未読のデータをチェックすることで、空のマイクロバッチジョブを作成しないようにします。デフォルト値は "False" です。

  • "schema": (inferSchema に false を設定した場合は必須) 読み取りに使用。ペイロードの処理に使用するスキーマ。分類が avro である場合、提供されるスキーマには Avro スキーマ形式を使用する必要があります。分類が avro 以外の場合、提供されるスキーマは DDL スキーマ形式である必要があります。

    以下に、スキーマの例を示します。

    Example in DDL schema format
    `column1` INT, `column2` STRING , `column3` FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
  • "inferSchema": (オプション) 読み込みに使用。デフォルト値は、「false」です。「true」に設定すると、実行時に、スキーマが foreachbatch 内のペイロードから検出されます。

  • "avroSchema": (非推奨) 読み取りに使用。Avro 形式を使用する場合に、Avro データのスキーマを指定するために使用されるパラメータです。このパラメータは非推奨となりました。schema パラメータを使用します。

  • "addRecordTimestamp": (オプション) 読み込みに使用。このオプションが「true」に設定されている場合、データ出力には、対応するレコードがストリームによって受信された時刻を表示する「__src_timestamp」という名前が付けられた追加の列が含まれます。デフォルト値は、「false」です。このオプションは AWS Glue バージョン 4.0 以降でサポートされています。

  • "emitConsumerLagMetrics": (オプション) 読み込みに使用。このオプションを「true」に設定すると、バッチごとに、ストリームが受信した最も古いレコードと、それが AWS Glue で CloudWatch に到着した時間との間隔のメトリクスが出力されます。メトリクスの名前は「glue.driver.streaming.maxConsumerLagInMs」です。デフォルト値は、「false」です。このオプションは AWS Glue バージョン 4.0 以降でサポートされています。

  • "fanoutConsumerARN": (オプション) 読み込みに使用。streamARN で指定されたストリームの Kinesis ストリームコンシューマの ARN。Kinesis 接続の拡張ファンアウトモードを有効にするために使用されます。拡張ファンアウトが使用された Kinesis ストリームの使用に関する詳細については、「Kinesis ストリーミングジョブでの拡張ファンアウトの使用」を参照してください。

  • "recordMaxBufferedTime" - (オプション) 書き込みに使用。デフォルト: 1000 (ミリ秒)。レコードが書き込まれるのを待っている間にバッファリングされる最大時間。

  • "aggregationEnabled" - (オプション) 書き込みに使用。デフォルト: true。Kinesis に送信する前にレコードを集約するかどうかを指定します。

  • "aggregationMaxSize" - (オプション) 書き込みに使用。デフォルト: 51200 (バイト) レコードがこの制限よりも大きい場合、そのレコードはアグリゲータをバイパスします。注: Kinesis では、レコードサイズに 50 KB の制限が適用されます。これを 50 KB を超えて設定すると、サイズ超過のレコードは Kinesis によって拒否されます。

  • "aggregationMaxCount" - (オプション) 書き込みに使用。デフォルト: 4294967295。集計されたレコードにパックされる項目の最大数。

  • "producerRateLimit" - (オプション) 書き込みに使用。デフォルト: 150 (%)。1 つのプロデューサー (ジョブなど) からの送信されるシャード単位のスループットをバックエンド制限のパーセンテージとして制限できます。

  • "collectionMaxCount" - (オプション) 書き込みに使用。デフォルト: 500。PutRecords レコードにパックされる項目の最大数。

  • "collectionMaxSize" - (オプション) 書き込みに使用。デフォルト: 5242880 (バイト)。PutRecords リクエストで送信するデータの最大量。