將 OpenSearch 擷取管道與 Amazon DynamoDB 使用 - Amazon OpenSearch 服務

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

將 OpenSearch 擷取管道與 Amazon DynamoDB 使用

您可以將 OpenSearch 擷取管道與 DynamoDB 搭配使用,將 DynamoDB 表格事件 (例如建立、更新和刪除) 串流至 Amazon OpenSearch 服務網域和集合。 OpenSearch 擷取管道整合了變更資料擷取 (CDC) 基礎架構,以提供高規模、低延遲的方式來持續從 DynamoDB 表格串流資料。

您可以透過兩種方式使用 DynamoDB 做為處理資料的來源,無論是否有完整的初始快照。

完整初始快照是 DynamoDB 使用point-in-time 復原 (PITR) 功能所採用的資料表備份。DynamoDB 此快照上傳到 Amazon S3。從那裡, OpenSearch 擷取管線會將其傳送到網域中的一個索引,或將其分割為網域中的多個索引。為了保持 DynamoDB 中的資料 OpenSearch 一致性,管線會將 DynamoDB 表中的所有建立、更新和刪除事件與儲存在索引中的文件同步。 OpenSearch

使用完整初始快照時,擷取管道會先 OpenSearch 擷取快照,然後開始從 DynamoDB Streams 讀取資料。它最終可以追趕並維持 DynamoDB 和. 之間近乎即時的資料一致性。 OpenSearch選擇此選項時,您必須在表格上同時啟用PITR和 DynamoDB 串流。

您也可以使用與 DynamoDB 的 OpenSearch 擷取整合,在沒有快照的情況下串流事件。如果您已經擁有其他機制的完整快照,或者您只想從 DynamoDB 資料表中 DynamoDB Streams 目前的事件,請選擇此選項。選擇此選項時,您只需要在表格上啟用 DynamoDB 串流即可。

如需有關此整合的詳細資訊,請參閱Amazon DynamoDB 開發人員指南中的 DynamoDB 與 Amazon OpenSearch 服務零ETL整合

必要條件

若要設定管線,您必須啟用 DynamoDB 串流的 DynamoDB 表格。您的串流應該使用NEW_IMAGE串流檢視類型。不過,NEW_AND_OLD_IMAGES如果此串流檢視類型符合您的使用案例, OpenSearch 擷取管線也可以串流事件。

如果您使用的是快照,您也必須在資料表上啟用 point-in-time 復原功能。如需詳細資訊,請參閱 Amazon DynamoDB 開發人員指南中的建立表格啟用 point-in-time 復原和啟用串流。

步驟 1:設定管線角色

設定 DynamoDB 表之後,請設定要在管線組態中使用的管線角色,並在角色中新增下列 DynamoDB 權限:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "allowRunExportJob", "Effect": "Allow", "Action": [ "dynamodb:DescribeTable", "dynamodb:DescribeContinuousBackups", "dynamodb:ExportTableToPointInTime" ], "Resource": [ "arn:aws:dynamodb:us-east-1:{account-id}:table/my-table" ] }, { "Sid": "allowCheckExportjob", "Effect": "Allow", "Action": [ "dynamodb:DescribeExport" ], "Resource": [ "arn:aws:dynamodb:us-east-1:{account-id}:table/my-table/export/*" ] }, { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator" ], "Resource": [ "arn:aws:dynamodb:us-east-1:{account-id}:table/my-table/stream/*" ] }, { "Sid": "allowReadAndWriteToS3ForExport", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:AbortMultipartUpload", "s3:PutObject", "s3:PutObjectAcl" ], "Resource": [ "arn:aws:s3:::my-bucket/{exportPath}/*" ] } ] }

您也可以使用 AWS KMS 客戶管理的金鑰來加密匯出資料檔案。若要解密匯出的物件,s3_sse_kms_key_id請以下列格式在管線的匯出組態中指定金鑰 ID:arn:aws:kms:us-west-2:{account-id}:key/my-key-id下列原則包含使用客戶管理金鑰的必要權限:

{ "Sid": "allowUseOfCustomManagedKey", "Effect": "Allow", "Action": [ "kms:GenerateDataKey", "kms:Decrypt" ], "Resource": arn:aws:kms:us-west-2:{account-id}:key/my-key-id }

步驟 2:建立管道

然後,您可以如下所示設定 OpenSearch 擷取管道,該管道會將 DynamoDB 指定為來源。此範例管道會從table-aPITR快照擷取資料,接著是 DynamoDB Streams 中的事件。的開始位置LATEST表示管道應該從 DynamoDB Streams 讀取最新資料。

version: "2" cdc-pipeline: source: dynamodb: tables: - table_arn: "arn:aws:dynamodb:us-west-2:{account-id}:table/table-a" export: s3_bucket: "my-bucket" s3_prefix: "export/" stream: start_position: "LATEST" aws: region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" sink: - opensearch: hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"] index: "${getMetadata(\"table_name\")}" index_type: custom normalize_index: true document_id: "${getMetadata(\"primary_key\")}" action: "${getMetadata(\"opensearch_action\")}" document_version: "${getMetadata(\"document_version\")}" document_version_type: "external"

您可以使用預先設定的 DynamoDB 藍圖來建立此管道。如需詳細資訊,請參閱使用藍圖建立管道

資料一致性

OpenSearch 擷取支援確 end-to-end 認,以確保資料耐久性。管道讀取快照或串流時,會動態建立分割區以進行 parallel 處理。管道會在擷取 OpenSearch 網域或集合中的所有記錄後收到確認後,將分割區標示為完整。

如果您想要內嵌至 OpenSearch 無伺服器搜尋集合,可以在管道中產生文件 ID。如果您想要導入 OpenSearch 無伺服器時間序列集合,請注意,管線不會產生文件 ID。

OpenSearch 擷取管線也會將傳入的事件動作對應至對應的大量索引動作,以協助擷取文件。這樣可以保持資料一致,以便 DynamoDB 中的每個資料變更都會與中的對應文件變更協調。 OpenSearch

對映資料類型

OpenSearch 服務會動態地將每個傳入文件中的資料類型對應至 DynamoDB 中的對應資料類型。下表顯示了 OpenSearch 服務如何自動映射各種數據類型。

資料類型 OpenSearch DynamoDB
Number

OpenSearch 自動對應數值資料。如果數字是整數,則將其 OpenSearch 映射為長值。如果數字是分數,則將其 OpenSearch 映射為浮點值。

OpenSearch 根據第一個發送的文檔動態映射各種屬性。如果 DynamoDB 中的相同屬性混合了資料類型 (例如整數和小數),則對應可能會失敗。

例如,如果您的第一個文件具有整數的屬性,而稍後的文件具有與小數編號相同的屬性,則 OpenSearch 無法內嵌第二個文件。在這些情況下,您應該提供明確的對應範本,如下所示:

{ "template": { "mappings": { "properties": { "MixedNumberAttribute": { "type": "float" } } } } }

如果您需要雙精度,請使用字符串類型的字段映射。中沒有支持 38 位精度的等效數字類型 OpenSearch。

支援數字。

數字, 集合 OpenSearch 自動將數字集合映射到長值或浮點值的數組中。與標量數一樣,這取決於攝入的第一個數字是整數還是小數。您可以使用與對映純量字串相同的方式,為數字集提供對映。

DynamoDB 支援代表一組數字的類型。

字串

OpenSearch 自動將字串值對應為文字。在某些情況下 (例如列舉值),您可以對應至關鍵字類型。

下列範例顯示如何將名為的 DynamoDB 屬性對應PartType至關鍵字。 OpenSearch

{ "template": { "mappings": { "properties": { "PartType": { "type": "keyword" } } } } }

支援字串。

字符串集

OpenSearch 自動將字符串集映射到字符串數組中。您可以使用與對映純量字串相同的方式,為字串集提供對應。

DynamoDB 支援代表字串集合的類型。
二進位

OpenSearch 自動將二進位資料對應為文字。您可以提供一個映射,將這些字段寫入為中的二進制字段 OpenSearch。

下列範例顯示如何將名為的 DynamoDB 屬性對應ImageData至 OpenSearch 二進位欄位。

{ "template": { "mappings": { "properties": { "ImageData": { "type": "binary" } } } } }
DynamoDB 進位類型屬性。
二進制集

OpenSearch 自動將二進制集映射到二進制數據的數組作為文本。您可以使用與對映純量二進位相同的方式,為數字集提供對應。

DynamoDB 支援代表二進位值集合的類型。
Boolean

OpenSearch 將 DynamoDB 布林型別對應至布 OpenSearch 林型別。

DynamoDB 林型別屬性。

Null

OpenSearch 可以擷取具有 DynamoDB 空值類型的文件。它將值保存為文檔中的空值。此類型沒有對應,且此欄位無法編製索引或搜尋。

如果空類型使用相同的屬性名稱,然後稍後變更為不同類型 (例如 string),則會為第一個非空值 OpenSearch 建立動態對應。後續的值仍然 DynamoDB 是空值。

支援空類型屬性。
Map

OpenSearch 將 DynamoDB 會將屬性對應至巢狀欄位。相同的對映適用於巢狀欄位中。

下列範例會將巢狀欄位中的字串對應至中的關鍵字類型 OpenSearch:

{ "template": { "mappings": { "properties": { "AdditionalDescriptions": { "properties": { "PartType": { "type": "keyword" } } } } } } }
支援對應類型屬性。
清單

OpenSearch 根據清單中的內容,為 DynamoDB 清單提供不同的結果。

當清單包含所有相同類型的純量類型時 (例如,所有字串的清單),則會將清單 OpenSearch 內嵌為該類型的陣列。這適用於字符串,數字,布爾和空類型。每種類型的限制都與該類型的標量的限制相同。

您也可以使用與用於地圖相同的對映來提供對映清單的對映。

您無法提供混合類型的清單。

支援清單類型屬性。

設定

OpenSearch 根據集合中的內容,為 DynamoDB 集提供不同的結果。

當一個集合包含所有相同類型的標量類型(例如,一組所有字符串),然後將該集合 OpenSearch 內嵌為該類型的數組。這適用於字符串,數字,布爾和空類型。每種類型的限制都與該類型的標量的限制相同。

您也可以使用與用於地圖相同的對映來提供對映集的對映。

您無法提供一組混合類型。

DynamoDB 支援代表集合的類型。

建議您在 OpenSearch 擷取管線中設定無效字母佇列 (DLQ)。如果您已設定佇列, OpenSearch Service 會將由於動態對應失敗而無法擷取的所有失敗文件傳送至佇列。

如果自動對映失敗,您可以在管線組態template_content中使用template_type和來定義明確的對應規則。或者,您可以在啟動管道之前,直接在搜尋網域或集合中建立對應範本。

限制

為 DynamoDB 設定 OpenSearch 擷取管線時,請考慮下列限制:

  • 與 DynamoDB 的 OpenSearch 擷取整合目前不支援跨區域擷取。您的 DynamoDB 資料表和 OpenSearch 擷取管道必須位於相同的資料表中。 AWS 區域

  • 您的 DynamoDB 資料表和 OpenSearch 擷取管道必須位於相同的資料表中。 AWS 帳戶

  • OpenSearch 擷取管線僅支援一個 DynamoDB 表做為其來源。

  • DynamoDB Streams 最多只能將資料儲存在記錄中,最多可儲存 24 小時。如果從大型資料表的初始快照擷取需要 24 小時或更長時間,則會有一些初始資料遺失。若要減少此資料遺失,請預估資料表的大小,並設定 OpenSearch 擷取管線的適當運算單元。

建議 CloudWatch DynamoDB 示

建議您使用下列 CloudWatch 指標來監控擷取管線的效能。這些量度可協助您識別從匯出中處理的資料量、從串流處理的事件數量、處理匯出和串流事件時的錯誤,以及寫入目標的文件數量。您可以設定 CloudWatch 警示,以便在其中一個量度超過指定值超過指定的時間長度時執行動作。

指標 描述
dynamodb-pipeline.BlockingBuffer.bufferUsage.value

指示正在使用多少緩衝區。

dynamodb-pipeline.dynamodb.activeExportS3ObjectConsumers.value

顯示主動處理匯出之 OCUs Amazon S3 物件的物件總數。

dynamodb-pipeline.dynamodb.bytesProcessed.count

從 DynamoDB 來源處理的位元組計數。

dynamodb-pipeline.dynamodb.changeEventsProcessed.count

從 DynamoDB 串流處理的變更事件數目。

dynamodb-pipeline.dynamodb.changeEventsProcessingErrors.count

從 DynamoDB 處理的變更事件所產生的錯誤數目。

dynamodb-pipeline.dynamodb.exportJobFailure.count 失敗的匯出工作提交嘗試次數。
dynamodb-pipeline.dynamodb.exportJobSuccess.count 已成功提交的匯出工作數目。
dynamodb-pipeline.dynamodb.exportRecordsProcessed.count

從匯出處理的記錄總數。

dynamodb-pipeline.dynamodb.exportRecordsTotal.count

從 DynamoDB 匯出的記錄總數,對於追蹤資料匯出量而言至關重要。

dynamodb-pipeline.dynamodb.exportS3ObjectsProcessed.count 已從 Amazon S3 成功處理的匯出資料檔案總數。
dynamodb-pipeline.opensearch.bulkBadRequestErrors.count 由於格式錯誤的請求而導致批量請求期間的錯誤計數。
dynamodb-pipeline.opensearch.bulkRequestLatency.avg 大量寫入要求的平均延遲時間 OpenSearch。
dynamodb-pipeline.opensearch.bulkRequestNotFoundErrors.count 因找不到目標資料而失敗的大量要求數目。
dynamodb-pipeline.opensearch.bulkRequestNumberOfRetries.count OpenSearch 擷取管線寫入 OpenSearch叢集的重試次數。
dynamodb-pipeline.opensearch.bulkRequestSizeBytes.sum 發出的所有大量請求的總大小(以字節為單位) OpenSearch。
dynamodb-pipeline.opensearch.documentErrors.count 將文件傳送至時發生的錯誤數 OpenSearch。導致錯誤的文件將被發送到DLQ。
dynamodb-pipeline.opensearch.documentsSuccess.count 成功寫入 OpenSearch 叢集或集合的文件數目。
dynamodb-pipeline.opensearch.documentsSuccessFirstAttempt.count 第一次嘗試時成功編製 OpenSearch 索引的文件數目。

dynamodb-pipeline.opensearch.documentsVersionConflictErrors.count

因處理期間文件中的版本衝突而導致的錯誤計數。

dynamodb-pipeline.opensearch.PipelineLatency.avg

透過從來源讀取以寫 OpenSearch 入目的地來處理資料的擷取管線平均延遲。
dynamodb-pipeline.opensearch.PipelineLatency.max 透過從來源讀取到寫 OpenSearch 入目的地來處理資料的擷取管線延遲上限。
dynamodb-pipeline.opensearch.recordsIn.count 成功攝入 OpenSearch的記錄計數。此指標對於跟踪正在處理和存儲的數據量至關重要。
dynamodb-pipeline.opensearch.s3.dlqS3RecordsFailed.count 無法寫入的記錄數DLQ。
dynamodb-pipeline.opensearch.s3.dlqS3RecordsSuccess.count 寫入的記錄數DLQ。
dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.count Amazon S3 無效字母佇列請求的延遲測量計數。
dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.sum Amazon S3 無效字母佇列的所有請求總延遲
dynamodb-pipeline.opensearch.s3.dlqS3RequestSizeBytes.sum 對 Amazon S3 無效字母佇列發出的所有請求的總大小 (以位元組為單位)。
dynamodb-pipeline.recordsProcessed.count 管道中處理的記錄總數,這是整體輸送量的關鍵指標。
dynamodb.changeEventsProcessed.count 沒有從 DynamoDB 串流收集任何記錄。這可能是因為資料表上沒有活動、匯出正在進行中,或是存取 DynamoDB 串流發生問題。

dynamodb.exportJobFailure.count

嘗試觸發匯出至 S3 失敗。

line.opensearch.bulkRequestInvalidInputErrors.count

由於輸入無效 OpenSearch 而導致的批量請求錯誤計數,這對於監視數據質量和操作問題至關重要。
opensearch.EndToEndLatency.avg 端對端位置高於從 DynamoDB 串流讀取時所需的值。這可能是因為 OpenSearch 叢集規模不足,或是 DynamoDB 表格上WCU輸送OCU量太低的最大管線容量所致。此端對端延遲在匯出後會很高,並且會隨著時間的推移而減少,因為它可以追溯到最新的 DynamoDB 串流。