从 Amazon DynamoDB 表中加载流数据 - 亚马逊 OpenSearch 服务

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

从 Amazon DynamoDB 表中加载流数据

您可以使用 AWS Lambda 从 Amazon DynamoDB 将数据发送到您的 OpenSearch Service 域。到达数据库表的新数据将触发事件通知到 Lambda,这将运行自定义代码以执行编制索引。

先决条件

继续操作之前,必须具有以下资源。

先决条件 描述
DynamoDB 表

此表包含源数据。有关更多信息,请参阅 Amazon DynamoDB 开发人员指南中的 DynamoDB Tables 中的基本操作

此表必须与 OpenSearch Service 域驻留在同一个区域并且流设置为新映像。要了解更多信息,请参阅启用流

OpenSearch Service 域 Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅 创建 OpenSearch 服务域
IAM 角色

此角色必须具有基本的 OpenSearch Service、DynamoDB 和 Lambda 执行权限,如以下内容:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

角色必须拥有以下信任关系:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

要了解更多信息,请参阅 IAM 用户手册中的创建 IAM 角色

创建 Lambda 函数

按照创建 Lambda 部署程序包中的说明操作,但创建一个名为 ddb-to-opensearch 的目录并对 sample.py 使用以下代码:

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

编辑 regionhost 的变量。

安装 pip——如果您尚未安装,则使用以下命令安装依赖项:

cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

然后按照创建 Lambda 函数中的说明操作,但指定先决条件中的 IAM 角色和以下触发器设置:

  • :DynamoDB 表

  • 批处理大小:100

  • 起始位置:时间范围

要了解更多信息,请参阅 Amazon DynamoDB 开发人员指南使用 DynamoDB Streams 和 Lambda 处理新项目

此时,您具有一整套资源:源数据的 DynamoDB 表、表更改的 DynamoDB 流、在源数据更改并为这些更改编制索引之后执行的函数以及用于搜索和可视化的 OpenSearch Service 域。

测试 Lambda 函数

创建此函数后,可以通过使用 AWS CLI 将新项目添加到 DynamoDB 表来测试它:

aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

然后使用 OpenSearch Service 控制台或 OpenSearch 控制面板以验证 lambda-index 是否包含一个文档。还可使用以下请求:

GET https://domain-name/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }