

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 从 Amazon DynamoDB 表中加载流数据
<a name="integrations-dynamodb"></a>

您可以使用 AWS Lambda 将数据从亚马逊 DynamoDB 发送到您的 OpenSearch 服务域。到达数据库表的新数据将触发事件通知到 Lambda，这将运行自定义代码以执行编制索引。

## 先决条件
<a name="integrations-dynamodb-prereq"></a>

继续操作之前，必须具有以下资源。


| 先决条件 | 说明 | 
| --- | --- | 
| DynamoDB 表 | 此表包含源数据。有关更多信息，请参阅 *Amazon DynamoDB 开发人员指南*中的 [DynamoDB Tables 中的基本操作](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html)。该表必须与您的 OpenSearch 服务域位于同一区域，并且必须将直播设置为 “**新图像**”。要了解更多信息，请参阅[启用流](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling)。 | 
| OpenSearch 服务域 | Lambda 函数处理数据之后数据的目的地。有关更多信息，请参阅 [创建 OpenSearch 服务域](createupdatedomains.md#createdomains)。 | 
| IAM 角色 | 此角色必须具有基本的 OpenSearch 服务、DynamoDB 和 Lambda 执行权限，例如：  JSON   

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "es:ESHttpPost",
        "es:ESHttpPut",
        "dynamodb:DescribeStream",
        "dynamodb:GetRecords",
        "dynamodb:GetShardIterator",
        "dynamodb:ListStreams",
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "*"
    }
  ]
}
```    角色必须拥有以下信任关系：  JSON   

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```    要了解更多信息，请参阅 *IAM 用户手册*中的[创建 IAM 角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create.html)。 | 

## 创建 Lambda 函数
<a name="integrations-dynamodb-lambda"></a>

按照[创建 Lambda 部署程序包](integrations-s3-lambda.md#integrations-s3-lambda-deployment-package)中的说明操作，但创建一个名为 `ddb-to-opensearch` 的目录并对 `sample.py` 使用以下代码：

```
import boto3
import requests
from requests_aws4auth import AWS4Auth

region = '' # e.g. us-east-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com
index = 'lambda-index'
datatype = '_doc'
url = host + '/' + index + '/' + datatype + '/'

headers = { "Content-Type": "application/json" }

def handler(event, context):
    count = 0
    for record in event['Records']:
        # Get the primary key for use as the OpenSearch ID
        id = record['dynamodb']['Keys']['id']['S']

        if record['eventName'] == 'REMOVE':
            r = requests.delete(url + id, auth=awsauth)
        else:
            document = record['dynamodb']['NewImage']
            r = requests.put(url + id, auth=awsauth, json=document, headers=headers)
        count += 1
    return str(count) + ' records processed.'
```

编辑 `region` 和 `host` 的变量。

[安装 pip](https://pip.pypa.io/en/stable/installation/)——如果您尚未安装，则使用以下命令安装依赖项：

```
cd ddb-to-opensearch

pip install --target ./package requests
pip install --target ./package requests_aws4auth
```

然后按照[创建 Lambda 函数](integrations-s3-lambda.md#integrations-s3-lambda-create)中的说明操作，但指定[先决条件](#integrations-dynamodb-prereq)中的 IAM 角色和以下触发器设置：
+ **表**：DynamoDB 表
+ **批处理大小**：100
+ **起始位置**：时间范围

要了解更多信息，请参阅 *Amazon DynamoDB 开发人员指南*中[使用 DynamoDB Streams 和 Lambda 处理新项目](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.Lambda.Tutorial.html)。

此时，您已拥有一整套资源：用于存放源数据的 DynamoDB 表、表更改的 DynamoDB 流、在源数据更改并索引这些更改后运行的函数，以及用于搜索和可视化的服务域。 OpenSearch 

## 测试 Lambda 函数
<a name="integrations-dynamodb-lambda-test"></a>

创建此函数后，可以通过使用 AWS CLI将新项目添加到 DynamoDB 表来测试它：

```
aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1
```

然后使用 OpenSearch 服务控制台或 OpenSearch 仪表板验证是否`lambda-index`包含文档。还可使用以下请求：

```
GET https://domain-name/lambda-index/_doc/00001
{
    "_index": "lambda-index",
    "_type": "_doc",
    "_id": "00001",
    "_version": 1,
    "found": true,
    "_source": {
        "director": {
            "S": "Kevin Costner"
        },
        "id": {
            "S": "00001"
        },
        "title": {
            "S": "The Postman"
        }
    }
}
```