翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Amazon Kinesis
の使用 Amazon Managed Service for Apache Flink
Managed Service for Apache Flink のサンプルデータコネクタを使用して、Kinesis Data Streams から Timestream for LiveAnalytics に Timestream データを送信できます。詳細については、「 for Apache FlinkAmazon Managed Service for Apache Flink」を参照してください。
EventBridge Pipes を使用して Kinesis データを に送信する Timestream
EventBridge Pipes を使用して、Kinesis ストリームから Amazon Timestream for LiveAnalytics テーブルにデータを送信できます。
パイプは、サポートされているソースとターゲット間のポイントツーポイント統合を目的としており、高度な変換とエンリッチメントをサポートしています。パイプを使用すると、イベント駆動型アーキテクチャを開発する際の専門知識や統合コードの必要性が軽減されます。パイプをセットアップするには、ソースを選択し、オプションのフィルタリングを追加し、オプションのエンリッチメントを定義し、イベントデータのターゲットを選択します。
この統合により、データインジェストパイプラインを簡素化しながら、 Timestream時系列データ分析機能の能力を活用できます。
で EventBridge パイプを使用すると、次の利点 Timestream があります。
リアルタイムのデータ取り込み: Kinesis から Timestream for LiveAnalytics にデータを直接ストリーミングし、リアルタイムの分析とモニタリングを可能にします。
シームレスな統合: EventBridge Pipes を使用して、複雑なカスタム統合を必要とせずにデータフローを管理します。
フィルタリングと変換の強化: 特定のデータ処理要件を満たす Timestream ために、 に保存される前に Kinesis レコードをフィルタリングまたは変換します。
スケーラビリティ: 高スループットのデータストリームを処理し、組み込みの並列処理とバッチ処理機能を使用して効率的なデータ処理を実現します。
設定
Kinesis から にデータをストリーミングするように EventBridge パイプを設定するには Timestream、次の手順に従います。
Kinesis Stream を作成する
データを取り込むアクティブな Kinesis データストリームがあることを確認します。
Timestream データベースとテーブルを作成する
データを保存する Timestream データベースとテーブルを設定します。
EventBridge パイプを設定します。
ソース: ソースとして Kinesis ストリームを選択します。
ターゲット: ターゲット Timestream として を選択します。
バッチ処理設定: バッチ処理ウィンドウとバッチサイズを定義してデータ処理を最適化し、レイテンシーを短縮します。
重要
パイプを設定するときは、いくつかのレコードを取り込んで、すべての設定の正確性をテストすることをお勧めします。パイプが正常に作成されても、パイプラインが正しいことは保証されず、データはエラーなしで流れることに注意してください。マッピングの適用後に、誤ったテーブル、誤った動的パスパラメータ、無効な Timestream レコードなどのランタイムエラーが発生し、実際のデータがパイプを通過するときに検出される可能性があります。
以下の設定により、データを取り込む速度が決まります。
BatchSize: LiveAnalytics の Timestream に送信されるバッチの最大サイズ。範囲: 0~100。最大スループットを得るには、この値を 100 のままにしておくことをお勧めします。
MaximumBatchingWindowInSeconds: バッチが LiveAnalytics ターゲットの Timestream に送信されるまでに batchSize がいっぱいになるまでの最大待機時間。受信イベントのレートに応じて、この設定によって取り込みの遅延が決まります。データをほぼリアルタイムで に送信し続けるために、この値を < 10 秒 Timestream に維持することをお勧めします。
ParallelizationFactor: 各シャードから同時に処理するバッチの数。最大スループットとほぼリアルタイムの取り込みを得るには、最大値の 10 を使用することをお勧めします。
ストリームが複数のターゲットによって読み取られる場合は、拡張ファンアウトを使用してパイプに専用のコンシューマーを提供し、高スループットを実現します。詳細については、「 ユーザーガイド」の Kinesis Data Streams 「 API を使用した拡張ファンアウトコンシューマーの開発」を参照してください。 Kinesis Data Streams
注記
達成できる最大スループットは、アカウントあたりの同時パイプ実行数によって制限されます。
次の設定により、データ損失を確実に防止できます。
DeadLetterConfig: ユーザーエラーが原因でイベントを Timestream for LiveAnalytics に取り込めなかった場合にデータが失われないように、常に DeadLetterConfig を設定することをお勧めします。
以下の設定でパイプのパフォーマンスを最適化し、レコードが速度低下やブロックを引き起こすのを防ぐことができます。
MaximumRecordAgeInSeconds: これより古いレコードは処理されず、DLQ に直接移動されます。この値は、ターゲット Timestream テーブルの設定済みメモリストアの保持期間を超えないように設定することをお勧めします。
MaximumRetryAttempts: レコードが DeadLetterQueue に送信されるまでのレコードの再試行回数。これを 10 に設定することをお勧めします。これにより、一時的な問題に対処し、永続的な問題については、レコードが DeadLetterQueue に移動され、ストリームの残りのブロックが解除されます。
OnPartialBatchItemFailure: 部分的なバッチ処理をサポートするソースの場合は、これを有効にして、DLQ へのドロップ/送信前に失敗したレコードをさらに再試行するように AUTOMATIC_BISECT として設定することをお勧めします。
設定例
Kinesis ストリームから Timestream テーブルにデータをストリーミングするように EventBridge パイプを設定する方法の例を次に示します。
例 IAM の ポリシーの更新 Timestream
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "timestream:WriteRecords" ], "Resource": [ "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table" ] }, { "Effect": "Allow", "Action": [ "timestream:DescribeEndpoints" ], "Resource": "*" } ] }
例 Kinesis ストリーム設定
{ "Source": "arn:aws:kinesis:us-east-1:123456789012:stream/my-kinesis-stream", "SourceParameters": { "KinesisStreamParameters": { "BatchSize": 100, "DeadLetterConfig": { "Arn": "arn:aws:sqs:us-east-1:123456789012:my-sqs-queue" }, "MaximumBatchingWindowInSeconds": 5, "MaximumRecordAgeInSeconds": 1800, "MaximumRetryAttempts": 10, "StartingPosition": "LATEST", "OnPartialBatchItemFailure": "AUTOMATIC_BISECT" } } }
例 Timestream ターゲット設定
{ "Target": "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table", "TargetParameters": { "TimestreamParameters": { "DimensionMappings": [ { "DimensionName": "sensor_id", "DimensionValue": "$.data.device_id", "DimensionValueType": "VARCHAR" }, { "DimensionName": "sensor_type", "DimensionValue": "$.data.sensor_type", "DimensionValueType": "VARCHAR" }, { "DimensionName": "sensor_location", "DimensionValue": "$.data.sensor_loc", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": [ { "MultiMeasureName": "readings", "MultiMeasureAttributeMappings": [ { "MultiMeasureAttributeName": "temperature", "MeasureValue": "$.data.temperature", "MeasureValueType": "DOUBLE" }, { "MultiMeasureAttributeName": "humidity", "MeasureValue": "$.data.humidity", "MeasureValueType": "DOUBLE" }, { "MultiMeasureAttributeName": "pressure", "MeasureValue": "$.data.pressure", "MeasureValueType": "DOUBLE" } ] } ], "SingleMeasureMappings": [], "TimeFieldType": "TIMESTAMP_FORMAT", "TimestampFormat": "yyyy-MM-dd HH:mm:ss.SSS", "TimeValue": "$.data.time", "VersionValue": "$.approximateArrivalTimestamp" } } }
イベント変換
EventBridge パイプを使用すると、到達する前にデータを変換できます Timestream。変換ルールを定義して、フィールド名の変更など、受信 Kinesis レコードを変更できます。
Kinesis ストリームに温度と湿度のデータが含まれているとします。 EventBridge 変換を使用して、挿入する前にこれらのフィールドの名前を変更できます Timestream。
ベストプラクティス
バッチ処理とバッファリング
書き込みレイテンシーと処理効率のバランスをとるようにバッチ処理ウィンドウとサイズを設定します。
バッチ処理ウィンドウを使用して処理前に十分なデータを蓄積し、頻繁な小さなバッチのオーバーヘッドを削減します。
並列処理
ParallelizationFactor 設定を使用して、特に高スループットストリームの同時実行数を増やします。これにより、各シャードの複数のバッチを同時に処理できます。
データ変換
EventBridge Pipes の変換機能を活用して、レコードを保存する前にレコードをフィルタリングして強化します Timestream。これは、データを分析要件に合わせるのに役立ちます。
セキュリティ
EventBridge Pipes に使用される IAM ロールに、読み Kinesis 書きに必要なアクセス許可があることを確認します Timestream。
暗号化とアクセスコントロールの対策を使用して、転送中および保管中のデータを保護します。
デバッグの失敗
-
パイプの自動無効化
ターゲットが存在しないか、アクセス許可の問題がある場合、パイプは約 2 時間で自動的に無効になります。
-
Throttles
パイプには、スロットルが減少するまで自動的にバックオフして再試行する機能があります。
-
ログの有効化
ERROR レベルでログを有効にし、実行データを含めて、失敗した に関するインサイトをさらに取得することをお勧めします。障害が発生した場合、これらのログには、送受信されたリクエスト/レスポンスが含まれます Timestream。これにより、関連するエラーを理解し、必要に応じて修正後にレコードを再処理できます。
モニタリング
データフローの問題を検出するために、以下のアラームを設定することをお勧めします。
ソースのレコードの最大期間
GetRecords.IteratorAgeMilliseconds
Pipes の障害メトリクス
ExecutionFailed
TargetStageFailed
Timestream API エラーの書き込み
UserErrors
その他のモニタリングメトリクスについては、「 EventBridge ユーザーガイド」の「モニタリング EventBridge」を参照してください。