翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Amazon DynamoDB OpenSearch での取り込みパイプラインの使用
DynamoDB で Ingestion OpenSearch パイプラインを使用して、DynamoDB テーブルイベント (作成、更新、削除など) を Amazon OpenSearch Service ドメインとコレクションにストリーミングできます。 OpenSearch Ingestion パイプラインには、変更データキャプチャ (CDC) インフラストラクチャが組み込まれており、DynamoDB テーブルからデータを継続的にストリーミングするための、高スケールで低レイテンシーの手段を提供します。
データを処理するソースとして DynamoDB を使用するには、完全な初期スナップショットを使用する方法と使用しない方法の 2 つがあります。
完全な初期スナップショットは、DynamoDB がpoint-in-time リカバリ (PITR) 機能を使用して作成するテーブルのバックアップです。DynamoDB はこのスナップショットを Amazon S3 にアップロードします。そこから、取り込みパイプラインはドメイン内の 1 OpenSearch つのインデックスに送信するか、ドメイン内の複数のインデックスにパーティション化します。DynamoDB 内のデータを OpenSearch 一貫性に保つために、パイプラインは DynamoDB テーブル内のすべての作成、更新、削除イベントを OpenSearch 、インデックスまたはインデックスに保存されたドキュメントと同期します。
完全な初期スナップショットを使用すると、 OpenSearch 取り込みパイプラインは最初にスナップショットを取り込んでから、DynamoDB Streams からのデータの読み取りを開始します。最終的には、DynamoDB と の間のほぼリアルタイムのデータ整合性が追いついて維持されます OpenSearch。このオプションを選択するときは、テーブルで PITRと DynamoDB ストリームの両方を有効にする必要があります。
DynamoDB OpenSearch との Ingestion 統合を使用して、スナップショットなしでイベントをストリーミングすることもできます。他のメカニズムからの完全なスナップショットが既にある場合、または DynamoDB Streams を使用して DynamoDB テーブルから現在のイベントをストリーミングするだけの場合は、このオプションを選択します。このオプションを選択した場合、必要なのは、テーブルで DynamoDB ストリームを有効にすることだけです。
この統合の詳細については、「 Amazon DynamoDB デベロッパーガイド」の「Amazon OpenSearch Service との ETLDynamoDB ゼロ統合」を参照してください。
前提条件
パイプラインを設定するには、DynamoDB Streams が有効になっている DynamoDB テーブルが必要です。ストリームでは NEW_IMAGE
ストリームビュータイプを使用する必要があります。ただし、このストリームビュータイプがユースケースに適合するNEW_AND_OLD_IMAGES
場合、 OpenSearch 取り込みパイプラインは でイベントをストリーミングすることもできます。
スナップショットを使用している場合は、テーブルで point-in-time 復旧を有効にする必要もあります。詳細については、「Amazon DynamoDB デベロッパーガイド」の「テーブルの作成」、 point-in-time 「復旧の有効化」、「ストリームの有効化」を参照してください。
ステップ 1: パイプラインロールを設定する
DynamoDB テーブルを設定したら、パイプライン設定で使用するパイプラインロールを設定し、そのロールに次の DynamoDB 許可を追加します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "allowRunExportJob", "Effect": "Allow", "Action": [ "dynamodb:DescribeTable", "dynamodb:DescribeContinuousBackups", "dynamodb:ExportTableToPointInTime" ], "Resource": [ "arn:aws:dynamodb:
us-east-1
:{account-id}
:table/my-table
" ] }, { "Sid": "allowCheckExportjob", "Effect": "Allow", "Action": [ "dynamodb:DescribeExport" ], "Resource": [ "arn:aws:dynamodb:us-east-1
:{account-id}
:table/my-table
/export/*" ] }, { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator" ], "Resource": [ "arn:aws:dynamodb:us-east-1
:{account-id}
:table/my-table
/stream/*" ] }, { "Sid": "allowReadAndWriteToS3ForExport", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:AbortMultipartUpload", "s3:PutObject", "s3:PutObjectAcl" ], "Resource": [ "arn:aws:s3:::my-bucket
/{exportPath}
/*" ] } ] }
AWS KMS カスタマーマネージドキーを使用して、エクスポートデータファイルを暗号化することもできます。エクスポートされたオブジェクトを復号するには、パイプラインのエクスポート設定において、次の形式でキー ID として s3_sse_kms_key_id
を指定します: arn:aws:kms:
。次のポリシーには、カスタマーマネージドキーを使用するために必要なアクセス許可が含まれています。us-west-2
:{account-id}
:key/my-key-id
{ "Sid": "allowUseOfCustomManagedKey", "Effect": "Allow", "Action": [ "kms:GenerateDataKey", "kms:Decrypt" ], "Resource":
arn:aws:kms:
}us-west-2
:{account-id}
:key/my-key-id
ステップ 2: パイプラインを作成する
その後、DynamoDB OpenSearch をソースとして指定する Ingestion パイプラインを次のように設定できます。このサンプルパイプラインは、PITRスナップショットtable-a
を使用して からデータを取り、その後に DynamoDB Streams からのイベントを取り込みます。開始位置が LATEST
であることは、パイプラインが DynamoDB Streams から最新のデータを読み取る必要があることを示します。
version: "2" cdc-pipeline: source: dynamodb: tables: - table_arn: "arn:aws:dynamodb:
us-west-2
:{account-id}
:table/table-a
" export: s3_bucket: "my-bucket
" s3_prefix: "export/" stream: start_position: "LATEST" aws: region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}
:role/pipeline-role" sink: - opensearch: hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com
"] index: "${getMetadata(\"table_name
\")}" index_type: custom normalize_index: true document_id: "${getMetadata(\"primary_key\")}" action: "${getMetadata(\"opensearch_action\")}" document_version: "${getMetadata(\"document_version\")}" document_version_type: "external"
事前設定された DynamoDB ブループリントを使用して、このパイプラインを作成できます。詳細については、「ブループリントを使用したパイプラインの作成」を参照してください。
データ整合性
OpenSearch 取り込みでは、データの耐久性を確保するため end-to-end の承認がサポートされています。パイプラインがスナップショットまたはストリームを読み取る際に、並列処理用のパーティションが動的に作成されます。パイプラインは、 OpenSearch ドメインまたはコレクション内のすべてのレコードを取り込んだ後に確認を受け取ると、パーティションを完了としてマークします。
OpenSearch Serverless 検索コレクションに取り込む場合は、パイプラインでドキュメント ID を生成できます。 OpenSearch Serverless 時系列コレクションに取り込む場合は、パイプラインがドキュメント ID を生成しないことに注意してください。
Ingestion OpenSearch パイプラインは、受信イベントアクションを対応する一括インデックス作成アクションにマッピングして、ドキュメントの取り込みにも役立ちます。これによりデータの整合性が保たれるため、DynamoDB のすべてのデータ変更が、 の対応するドキュメントの変更と照合されます OpenSearch。
データ型のマッピング
OpenSearch サービスは、各受信ドキュメントのデータ型を DynamoDB の対応するデータ型に動的にマッピングします。次の表は、 OpenSearch サービスがさまざまなデータ型を自動的にマッピングする方法を示しています。
データ型 | OpenSearch | DynamoDB |
---|---|---|
数 |
OpenSearch は数値データを自動的にマッピングします。数値が整数の場合、 はそれを長い値として OpenSearch マッピングします。数値が小数の場合、 はそれを浮動小数点値として OpenSearch マッピングします。 OpenSearch は、最初に送信されたドキュメントに基づいて、さまざまな属性を動的にマッピングします。DynamoDB の同じ属性についてデータ型が混在している場合 (整数と小数の両方など)、マッピングは失敗する可能性があります。 例えば、最初のドキュメントに整数の属性があり、後のドキュメントに小数と同じ属性がある場合、 は 2 番目のドキュメントの取り込みに OpenSearch 失敗します。このような場合は、次のような明示的なマッピングテンプレートを提供する必要があります:
倍精度が必要な場合は、文字列型のフィールドマッピングを使用します。では、38 桁の精度をサポートする同等の数値型はありません OpenSearch。 |
DynamoDB は数値をサポートしています。 |
数値セット | OpenSearch は、数値セットを長い値または浮動小数点値の配列に自動的にマッピングします。スカラー数値の場合と同様、これは、取り込まれた最初の数値が整数または小数のいずれであるかによって異なります。スカラー文字列をマッピングするのと同じ方法で、数値セットのマッピングを提供できます。 |
DynamoDB は、数値のセットを表す型をサポートしています。 |
文字列 |
OpenSearch は文字列値をテキストとして自動的にマッピングします。状況によっては (列挙型の値など)、キーワード型にマッピングできます。 次の例は、 という名前の DynamoDB 属性を OpenSearch キーワード
|
DynamoDB は文字列をサポートします。 |
文字列セット |
OpenSearch は、文字列セットを文字列の配列に自動的にマッピングします。スカラー文字列をマッピングするのと同じ方法で、文字列セットのマッピングを提供できます。 |
DynamoDB は、文字列のセットを表す型をサポートしています。 |
バイナリ |
OpenSearch はバイナリデータをテキストとして自動的にマッピングします。マッピングを指定して、これらを のバイナリフィールドとして記述できます OpenSearch。 次の例は、 という名前の DynamoDB 属性
|
DynamoDB はバイナリ型の属性をサポートしています。 |
バイナリセット |
OpenSearch は、バイナリセットをテキストとしてバイナリデータの配列に自動的にマッピングします。スカラーバイナリをマッピングするのと同じ方法で、数値セットのマッピングを提供できます。 |
DynamoDB は、バイナリ値のセットを表す型をサポートしています。 |
ブール値 |
OpenSearch は、DynamoDB ブール型を OpenSearch ブール型にマッピングします。 |
DynamoDB は、ブール型属性をサポートします。 |
Null |
OpenSearch は、DynamoDB null タイプのドキュメントを取り込むことができます。値は null 値としてドキュメントに保存されます。この型にはマッピングがなく、このフィールドにはインデックスが付けられず、検索もできません。 NULL 型に同じ属性名が使用され、後で文字列などの異なる型に変更された場合、 は最初の NULL 以外の値の動的マッピング OpenSearch を作成します。後続の値は引き続き DynamoDB の null 値にすることができます。 |
DynamoDB は、null 型属性をサポートします。 |
マッピング |
OpenSearch は、DynamoDB マップ属性をネストされたフィールドにマッピングします。同じマッピングがネストされたフィールド内でも適用されます。 次の例では、ネストされたフィールドの文字列を のキーワードタイプにマッピングします OpenSearch。
|
DynamoDB は、マッピング型属性をサポートします。 |
リスト |
OpenSearch は、リストの内容に応じて、DynamoDB リストに異なる結果を提供します。 リストに同じタイプのスカラー型がすべて含まれている場合 (すべての文字列のリストなど)、 はそのタイプの配列としてリストを取り OpenSearch 込みます。これは、文字列、数値、ブール型、および null 型について機能します。これらの各型についての制限は、その型のスカラーについての制限と同じです。 また、マップに使用するのと同じマッピングを使用して、マップのリストのマッピングを提供することもできます。 混合型のリストを提供することはできません。 |
DynamoDB は、リスト型属性をサポートします。 |
設定 |
OpenSearch は、セットの内容に応じて、DynamoDB セットに異なる結果を提供します。 セットに同じタイプのスカラー型がすべて含まれている場合 (例えば、すべての文字列のセット)、 はそのセットをその型の配列として OpenSearch 取り込みます。これは、文字列、数値、ブール型、および null 型について機能します。これらの各型についての制限は、その型のスカラーについての制限と同じです。 また、マップに使用するのと同じマッピングを使用して、マップのセットのマッピングを提供することもできます。 混合型のセットを提供することはできません。 |
DynamoDB は、セットを表す型をサポートします。 |
Ingestion パイプラインでデッドレターキュー (DLQ) OpenSearch を設定することをお勧めします。キューを設定した場合、 OpenSearch 動的マッピングの失敗により取り込むことができないすべての失敗したドキュメントがサービスからキューに送信されます。
自動マッピングが失敗した場合は、パイプライン設定で template_type
と template_content
を使用して明示的なマッピングルールを定義できます。あるいは、パイプラインを開始する前に、検索ドメインまたはコレクションにマッピングテンプレートを直接作成することもできます。
制限事項
DynamoDB OpenSearch の取り込みパイプラインを設定するときは、次の制限事項を考慮してください。
-
DynamoDB との OpenSearch Ingestion 統合は現在、クロスリージョン取り込みをサポートしていません。DynamoDB テーブルと OpenSearch 取り込みパイプラインは同じ にある必要があります AWS リージョン。
-
DynamoDB テーブルと Ingestion OpenSearch パイプラインは同じ にある必要があります AWS アカウント。
-
取り込みパイプラインは、ソースとして OpenSearch 1 つの DynamoDB テーブルのみをサポートします。
-
DynamoDB Streams は、最大 24 時間ログにデータを格納します。大きなテーブルの最初のスナップショットからの取り込みに 24 時間以上かかる場合、初期データの一部が失われます。このデータ損失を軽減するには、テーブルのサイズを見積もり、適切なコンピューティング単位の OpenSearch 取り込みパイプラインを設定します。
DynamoDB の推奨 CloudWatch アラーム
取り込みパイプラインのパフォーマンスをモニタリングするには、次の CloudWatch メトリクスをお勧めします。これらのメトリクスは、エクスポートから処理されたデータの量、ストリームから処理されたイベントの量、エクスポートとストリームイベントの処理エラー、および宛先に書き込まれたドキュメントの数を特定するのに役立ちます。これらのメトリクスの 1 つが指定された時間、指定された値を超えたときにアクションを実行するように CloudWatch アラームを設定できます。
メトリクス | 説明 |
---|---|
dynamodb-pipeline.BlockingBuffer.bufferUsage.value |
使用されているバッファの量を示します。 |
dynamodb-pipeline.dynamodb.activeExportS3ObjectConsumers.value
|
エクスポートのために Amazon S3 オブジェクトをアクティブに処理OCUsしている の合計数を示します。 Amazon S3 |
dynamodb-pipeline.dynamodb.bytesProcessed.count
|
DynamoDB ソースから処理されたバイト数。 |
dynamodb-pipeline.dynamodb.changeEventsProcessed.count
|
DynamoDB ストリームから処理された変更イベントの数。 |
dynamodb-pipeline.dynamodb.changeEventsProcessingErrors.count
|
DynamoDB から処理された変更イベントからのエラーの数。 |
dynamodb-pipeline.dynamodb.exportJobFailure.count
|
失敗したエクスポートジョブの送信試行回数。 |
dynamodb-pipeline.dynamodb.exportJobSuccess.count
|
正常に送信されたエクスポートジョブの数。 |
dynamodb-pipeline.dynamodb.exportRecordsProcessed.count
|
エクスポートから処理されたレコードの合計数。 |
dynamodb-pipeline.dynamodb.exportRecordsTotal.count
|
DynamoDB からエクスポートされたレコードの総数。データエクスポートボリュームの追跡に不可欠です。 |
dynamodb-pipeline.dynamodb.exportS3ObjectsProcessed.count
|
Amazon S3 から正常に処理されたエクスポートデータファイルの総数。 |
dynamodb-pipeline.opensearch.bulkBadRequestErrors.count
|
不正な形式のリクエストによる一括リクエスト中のエラーの数。 |
dynamodb-pipeline.opensearch.bulkRequestLatency.avg
|
に対して行われた一括書き込みリクエストの平均レイテンシー OpenSearch。 |
dynamodb-pipeline.opensearch.bulkRequestNotFoundErrors.count
|
ターゲットデータが見つからなかったために失敗した一括リクエストの数。 |
dynamodb-pipeline.opensearch.bulkRequestNumberOfRetries.count
|
クラスターを書き込む OpenSearch OpenSearchための取り込みパイプライン別の再試行回数。 |
dynamodb-pipeline.opensearch.bulkRequestSizeBytes.sum
|
に対して行われたすべての一括リクエストのバイト単位の合計サイズ OpenSearch。 |
dynamodb-pipeline.opensearch.documentErrors.count
|
にドキュメントを送信する際のエラーの数 OpenSearch。エラーの原因となったドキュメントは に送信されますDLQ。 |
dynamodb-pipeline.opensearch.documentsSuccess.count
|
OpenSearch クラスターまたはコレクションに正常に書き込まれたドキュメントの数。 |
dynamodb-pipeline.opensearch.documentsSuccessFirstAttempt.count
|
最初の試行 OpenSearch で正常にインデックス作成されたドキュメントの数。 |
|
処理中のドキュメントのバージョン競合によるエラーの数。 |
|
ソースから書き込み先への読み取りにより、データを処理するための OpenSearch Ingestion パイプラインの平均レイテンシー。 |
dynamodb-pipeline.opensearch.PipelineLatency.max
|
ソースから読み取り、宛先を書き込むことでデータを処理するための OpenSearch 取り込みパイプラインの最大レイテンシー。 |
dynamodb-pipeline.opensearch.recordsIn.count
|
に正常に取り込まれたレコードの数 OpenSearch。このメトリクスは、処理および保存されるデータの量を追跡するために不可欠です。 |
dynamodb-pipeline.opensearch.s3.dlqS3RecordsFailed.count
|
への書き込みに失敗したレコードの数DLQ。 |
dynamodb-pipeline.opensearch.s3.dlqS3RecordsSuccess.count
|
に書き込まれるレコードの数DLQ。 |
dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.count
|
Amazon S3 デッドレターキューへのリクエストのレイテンシー測定値の数。 |
dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.sum
|
Amazon S3 デッドレターキューへのすべてのリクエストの合計レイテンシー |
dynamodb-pipeline.opensearch.s3.dlqS3RequestSizeBytes.sum
|
Amazon S3 デッドレターキューに対して行われたすべてのリクエストの合計サイズ。 |
dynamodb-pipeline.recordsProcessed.count
|
パイプラインで処理されたレコードの総数。オーバーラルスループットのキーメトリクスです。 |
dynamodb.changeEventsProcessed.count
|
DynamoDB ストリームからレコードは収集されていません。これは、テーブルにアクティビティがないこと、エクスポートが進行中であること、または DynamoDB ストリームへのアクセスに問題があることが原因である可能性があります。 |
|
S3 へのエクスポートをトリガーできませんでした。 |
|
入力が無効 OpenSearch であるための の一括リクエストエラーの数。データ品質と運用上の問題をモニタリングするために重要です。 |
opensearch.EndToEndLatency.avg
|
エンドツーエンドのレイテンシーは、DynamoDB ストリームから読み込む場合の必要以上に高くなります。これは、 OpenSearch クラスターのスケーリングが不足しているか、最大パイプラインOCU容量が DynamoDB テーブルのWCUスループットに対して低すぎることが原因である可能性があります。このエンドツーエンドのレイテンシーはエクスポート後に高くなり、最新の DynamoDB ストリームに追いつくにつれて時間の経過とともに減少します。 |