本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
从 Amazon DynamoDB 表中加载流数据
您可以使用 AWS Lambda 从 Amazon DynamoDB 将数据发送到您的 OpenSearch Service 域。到达数据库表的新数据将触发事件通知到 Lambda,这将运行自定义代码以执行编制索引。
先决条件
继续操作之前,必须具有以下资源。
先决条件 | 描述 |
---|---|
DynamoDB 表 | 此表包含源数据。有关更多信息,请参阅 Amazon DynamoDB 开发人员指南中的 DynamoDB Tables 中的基本操作。 此表必须与 OpenSearch Service 域驻留在同一个区域并且流设置为新映像。要了解更多信息,请参阅启用流。 |
OpenSearch Service 域 | Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅 创建 OpenSearch 服务域。 |
IAM 角色 | 此角色必须具有基本的 OpenSearch Service、DynamoDB 和 Lambda 执行权限,如以下内容:
角色必须拥有以下信任关系:
要了解更多信息,请参阅 IAM 用户手册中的创建 IAM 角色。 |
创建 Lambda 函数
按照创建 Lambda 部署程序包中的说明操作,但创建一个名为 ddb-to-opensearch
的目录并对 sample.py
使用以下代码:
import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'
编辑 region
和 host
的变量。
安装 pip
cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth
然后按照创建 Lambda 函数中的说明操作,但指定先决条件中的 IAM 角色和以下触发器设置:
-
表:DynamoDB 表
-
批处理大小:100
-
起始位置:时间范围
要了解更多信息,请参阅 Amazon DynamoDB 开发人员指南中使用 DynamoDB Streams 和 Lambda 处理新项目。
此时,您具有一整套资源:源数据的 DynamoDB 表、表更改的 DynamoDB 流、在源数据更改并为这些更改编制索引之后执行的函数以及用于搜索和可视化的 OpenSearch Service 域。
测试 Lambda 函数
创建此函数后,可以通过使用 AWS CLI 将新项目添加到 DynamoDB 表来测试它:
aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region
us-west-1
然后使用 OpenSearch Service 控制台或 OpenSearch 控制面板以验证 lambda-index
是否包含一个文档。还可使用以下请求:
GET https://domain-name
/lambda-index/_doc/00001
{
"_index": "lambda-index",
"_type": "_doc",
"_id": "00001",
"_version": 1,
"found": true,
"_source": {
"director": {
"S": "Kevin Costner"
},
"id": {
"S": "00001"
},
"title": {
"S": "The Postman"
}
}
}