從 Amazon DynamoDB 中載入串流資料 - Amazon OpenSearch Service

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

從 Amazon DynamoDB 中載入串流資料

您可以使用 從 Amazon DynamoDB AWS Lambda 將資料傳送至您的 OpenSearch Service 網域。送達資料庫資料表的新資料會觸發 Lambda 的事件通知,然後執行您的自訂程式碼以執行索引。

必要條件

繼續之前,您必須準備好以下資源。

先決條件 描述
DynamoDB 表

表格中包含您的來源資料。如需詳細資訊,請參閱 Amazon DynamoDB 開發人員指南中的 DynamoDB 資料表上的基本操作

資料表必須與 OpenSearch Service 網域位於相同的 區域,且串流必須設為 New image 。如需進一步了解,請參閱啟用串流

OpenSearch 服務網域 您的 Lambda 函數處理資料後的資料目的地。如需詳細資訊,請參閱 建立 OpenSearch 服務網域
IAM 角色

此角色必須具有基本 OpenSearch Service、DynamoDB 和 Lambda 執行許可,例如下列項目:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

角色必須具有下列信任關係:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

若要進一步了解,請參閱 IAM 使用者指南 中的建立IAM角色

建立 Lambda 函式

遵循建立 Lambda 部署套件中的指示,但要建立名為 ddb-to-opensearch 的目錄,並使用以下適用於 sample.py 的程式碼:

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

編輯 regionhost 的變數。

安裝 pip (如果您尚未安裝的話),然後使用下列命令安裝相依項目:

cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

然後遵循 中的指示建立 Lambda 函式,但為觸發程序指定來自 IAM的角色必要條件和下列設定:

  • 資料表:您的 DynamoDB 資料表

  • 批次大小:100

  • 開始位置:水平修剪

如需進一步了解,請參閱 Amazon DynamoDB 開發人員指南中的使用 DynamoDB Streams 和 Lambda 來處理新項目

此時,您擁有一組完整的資源:來源資料的 DynamoDB 資料表、資料表變更的 DynamoDB 串流、來源資料變更後執行的函數,以及用於搜尋和視覺化 OpenSearch 的服務網域。

測試 Lambda 函數

在建立函數後,您可以進行測試,方法是將新項目新增至使用 AWS CLI的 DynamoDB 資料表:

aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

然後使用 OpenSearch 服務主控台或 OpenSearch 儀表板來驗證 是否lambda-index包含文件。您也可以使用以下請求:

GET https://domain-name/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }