

# 教程 1：对 Amazon DynamoDB 使用筛选器处理所有事件，以及对 AWS Lambda 使用 AWS CLI 进行处理
<a name="Streams.Lambda.Tutorial"></a>

 

在本教程中，您将创建 AWS Lambda 触发器以处理来自 DynamoDB 表的流。

**Topics**
+ [第 1 步：创建一个启用了流的 DynamoDB 表](#Streams.Lambda.Tutorial.CreateTable)
+ [第 2 步：创建一个 Lambda 执行角色](#Streams.Lambda.Tutorial.CreateRole)
+ [第 3 步：创建一个 Amazon SNS 主题](#Streams.Lambda.Tutorial.SNSTopic)
+ [第 4 步：创建并测试一个 Lambda 函数](#Streams.Lambda.Tutorial.LambdaFunction)
+ [第 5 步：创建并测试一个触发器](#Streams.Lambda.Tutorial.CreateTrigger)

本教程的场景就是 Woofer 这个简单的社交网络。Woofer 用户使用发送给其他 Woofer 用户的 *bark*（短文本消息）进行通信。下图显示了此应用程序的组件和工作流。

![\[DynamoDB 表、流记录、Lambda 函数和 Amazon SNS 主题的 Woofer 应用程序工作流。\]](http://docs.aws.amazon.com/zh_cn/amazondynamodb/latest/developerguide/images/StreamsAndTriggers.png)


1. 用户将项目写入 DynamoDB 表 (`BarkTable`)。表中的每个项目代表一个 bark。

1. 写入新的流记录，体现添加到 `BarkTable` 中的新项目。

1. 新的流记录触发 AWS Lambda 函数 (`publishNewBark`)。

1. 如果流记录指示新项目已添加到 `BarkTable`，则 Lambda 函数会从流记录读取数据并将消息发布到 Amazon Simple Notification Service (Amazon SNS) 中的主题。

1. Amazon SNS 主题的订阅者收到消息。（在本教程中，唯一的订阅者是一个电子邮件地址。）

**开始前的准备工作**  
本教程使用 AWS Command Line Interface AWS CLI。如果您尚未配置，请按照 [AWS Command Line Interface 用户指南](https://docs.aws.amazon.com/cli/latest/userguide/)中的说明安装和配置 AWS CLI。

## 第 1 步：创建一个启用了流的 DynamoDB 表
<a name="Streams.Lambda.Tutorial.CreateTable"></a>

在此步骤中，您将创建 DynamoDB 表 (`BarkTable`) 以存储来自 Woofer 用户的所有 bark。主键由 `Username`（分区键）和 `Timestamp`（排序键）组成。这两个属性的类型为字符串。

`BarkTable` 启用了流。在本教程后面的部分中，您通过将 AWS Lambda 函数与流关联来创建触发器。

1. 输入以下命令以创建表。

   ```
   aws dynamodb create-table \
       --table-name BarkTable \
       --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \
       --key-schema AttributeName=Username,KeyType=HASH  AttributeName=Timestamp,KeyType=RANGE \
       --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
       --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
   ```

1. 在输出中，查找 `LatestStreamArn`。

   ```
   ...
   "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

   记录 `region` 和 `accountID`，因为您在本教程接下来的步骤中需要这些信息。

## 第 2 步：创建一个 Lambda 执行角色
<a name="Streams.Lambda.Tutorial.CreateRole"></a>

在此步骤中，您将创建 AWS Identity and Access Management (IAM) 角色 (`WooferLambdaRole`) 并向其分配权限。此角色将由您在[第 4 步：创建并测试一个 Lambda 函数](#Streams.Lambda.Tutorial.LambdaFunction)中创建的 Lambda 函数使用。

您还将为角色创建策略。策略包含 Lambda 函数在运行时需要的所有权限。

1. 使用以下内容创建名为 `trust-relationship.json` 的文件。

------
#### [ JSON ]

****  

   ```
   {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "lambda.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
   ```

------

1. 输入以下命令来创建 `WooferLambdaRole`。

   ```
   aws iam create-role --role-name WooferLambdaRole \
       --path "/service-role/" \
       --assume-role-policy-document file://trust-relationship.json
   ```

1. 使用以下内容创建名为 `role-policy.json` 的文件。（将 `region` 和 `accountID` 替换为您的 AWS 区域和帐户 ID。）

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogGroup",
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": "arn:aws:logs:us-east-1:111122223333:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "dynamodb:DescribeStream",
                   "dynamodb:GetRecords",
                   "dynamodb:GetShardIterator",
                   "dynamodb:ListStreams"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:111122223333:table/BarkTable/stream/*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "sns:Publish"
               ],
               "Resource": [
                   "*"
               ]
           }
       ]
   }
   ```

------

   策略有四个语句，允许 `WooferLambdaRole` 执行以下操作：
   + 运行 Lambda 函数 (`publishNewBark`)。您将在本教程的后面部分中创建函数。
   + 访问 Amazon CloudWatch Logs Lambda 函数在运行时将诊断信息写入 CloudWatch Logs。
   + 从 `BarkTable` 的 DynamoDB 流读取数据。
   + 向 Amazon SNS 发布消息。

1. 输入以下命令以将策略附加到 `WooferLambdaRole`。

   ```
   aws iam put-role-policy --role-name WooferLambdaRole \
       --policy-name WooferLambdaRolePolicy \
       --policy-document file://role-policy.json
   ```

## 第 3 步：创建一个 Amazon SNS 主题
<a name="Streams.Lambda.Tutorial.SNSTopic"></a>

在此步骤中，您将创建 Amazon SNS 主题 (`wooferTopic`) 并使用电子邮件地址订阅该主题。您的 Lambda 函数使用此主题发布来自 Woofer 用户的新 bark。

1. 输入以下命令以创建新 Amazon SNS 主题。

   ```
   aws sns create-topic --name wooferTopic
   ```

1. 输入以下命令以使用电子邮件地址订阅 `wooferTopic`。（使用您的 AWS 区域和账户 ID 替换 `region` 和 `accountID`，并使用有效的电子邮件地址替换 `example@example.com`。）

   ```
   aws sns subscribe \
       --topic-arn arn:aws:sns:region:accountID:wooferTopic \
       --protocol email \
       --notification-endpoint example@example.com
   ```

1. Amazon SNS 将向您的电子邮件地址发送确认邮件。选择该邮件中的**确认订阅**链接以完成订阅过程。

## 第 4 步：创建并测试一个 Lambda 函数
<a name="Streams.Lambda.Tutorial.LambdaFunction"></a>

在此步骤中，您将创建 AWS Lambda 函数 (`publishNewBark`) 以处理来自 `BarkTable` 的流记录。

`publishNewBark` 函数仅处理与 `BarkTable` 中的新项目对应的流事件。该函数从此类事件读取数据，然后调用 Amazon SNS 以发布该事件。

1. 使用以下内容创建名为 `publishNewBark.js` 的文件。将 `region` 和 `accountID` 替换为您的 AWS 区域和帐户 ID。

   ```
   'use strict';
   var AWS = require("aws-sdk");
   var sns = new AWS.SNS();
   
   exports.handler = (event, context, callback) => {
   
       event.Records.forEach((record) => {
           console.log('Stream record: ', JSON.stringify(record, null, 2));
   
           if (record.eventName == 'INSERT') {
               var who = JSON.stringify(record.dynamodb.NewImage.Username.S);
               var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S);
               var what = JSON.stringify(record.dynamodb.NewImage.Message.S);
               var params = {
                   Subject: 'A new bark from ' + who,
                   Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what,
                   TopicArn: 'arn:aws:sns:region:accountID:wooferTopic'
               };
               sns.publish(params, function(err, data) {
                   if (err) {
                       console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2));
                   } else {
                       console.log("Results from sending message: ", JSON.stringify(data, null, 2));
                   }
               });
           }
       });
       callback(null, `Successfully processed ${event.Records.length} records.`);
   };
   ```

1. 创建包含 `publishNewBark.js` 的 zip 文件。如果您有 zip 命令行实用程序，则可以输入以下命令来完成此操作。

   ```
   zip publishNewBark.zip publishNewBark.js
   ```

1. 当您创建 Lambda 函数时，为 `WooferLambdaRole` 指定您在 [第 2 步：创建一个 Lambda 执行角色](#Streams.Lambda.Tutorial.CreateRole) 中创建的 Amazon 资源名称（ARN）。输入以下命令检索此 ARN。

   ```
   aws iam get-role --role-name WooferLambdaRole
   ```

   在输出中，查找 `WooferLambdaRole` 的 ARN。

   ```
   ...
   "Arn": "arn:aws:iam::region:role/service-role/WooferLambdaRole"
   ...
   ```

   输入以下命令以创建 Lambda 函数。将 *roleARN* 替换为 `WooferLambdaRole` 的 ARN。

   ```
   aws lambda create-function \
       --region region \
       --function-name publishNewBark \
       --zip-file fileb://publishNewBark.zip \
       --role roleARN \
       --handler publishNewBark.handler \
       --timeout 5 \
       --runtime nodejs16.x
   ```

1. 现在测试 `publishNewBark`，验证它可以正常使用。为此，您将提供类似于来自 DynamoDB Streams 的真实记录的输入。

   使用以下内容创建名为 `payload.json` 的文件。将 `region` 和 `accountID` 替换为您的 AWS 区域和账户 ID。

   ```
   {
       "Records": [
           {
               "eventID": "7de3041dd709b024af6f29e4fa13d34c",
               "eventName": "INSERT",
               "eventVersion": "1.1",
               "eventSource": "aws:dynamodb",
               "awsRegion": "region",
               "dynamodb": {
                   "ApproximateCreationDateTime": 1479499740,
                   "Keys": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "NewImage": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Message": {
                           "S": "This is a bark from the Woofer social network"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "SequenceNumber": "13021600000000001596893679",
                   "SizeBytes": 112,
                   "StreamViewType": "NEW_IMAGE"
               },
               "eventSourceARN": "arn:aws:dynamodb:region:account ID:table/BarkTable/stream/2016-11-16T20:42:48.104"
           }
       ]
   }
   ```

   输入以下命令以测试 `publishNewBark` 函数。

   ```
   aws lambda invoke --function-name publishNewBark --payload file://payload.json --cli-binary-format raw-in-base64-out output.txt
   ```

   如果测试成功，您将看到以下输出。

   ```
   {
       "StatusCode": 200,
       "ExecutedVersion": "$LATEST"
   }
   ```

   此外，`output.txt` 文件将包含以下文本。

   ```
   "Successfully processed 1 records."
   ```

   您还会在数分钟内收到一封新电子邮件。
**注意**  
AWS Lambda 将诊断信息写入 Amazon CloudWatch Logs。如果您的 Lambda 函数出现错误，可以使用这些诊断信息排除故障：  
打开 CloudWatch 控制台：[https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)。
在导航窗格中，选择**日志**。
选择下列日志组：`/aws/lambda/publishNewBark`
选择最新日志流以查看函数输出（以及错误）。

## 第 5 步：创建并测试一个触发器
<a name="Streams.Lambda.Tutorial.CreateTrigger"></a>

在 [第 4 步：创建并测试一个 Lambda 函数](#Streams.Lambda.Tutorial.LambdaFunction) 中，您测试了 Lambda 函数以确保它正确运行。在此步骤中，关联 Lambda 函数 (`publishNewBark`) 与事件源（`BarkTable` 流），创建*触发器*。

1. 在创建触发器时，您需要为 `BarkTable` 流指定 ARN。输入以下命令检索此 ARN。

   ```
   aws dynamodb describe-table --table-name BarkTable
   ```

   在输出中，查找 `LatestStreamArn`。

   ```
   ...
    "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

1. 输入以下命令以创建触发器。使用实际流 ARN 替换 `streamARN`。

   ```
   aws lambda create-event-source-mapping \
       --region region \
       --function-name publishNewBark \
       --event-source streamARN  \
       --batch-size 1 \
       --starting-position TRIM_HORIZON
   ```

1. 测试触发器。键入以下命令以将项目添加到 `BarkTable`。

   ```
   aws dynamodb put-item \
       --table-name BarkTable \
       --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}
   ```

   您应在数分钟内收到一封新电子邮件。

1. 打开 DynamoDB 控制台并再将几个项目添加到 `BarkTable`。您必须为 `Username` 和 `Timestamp` 属性指定值。（您还应为 `Message` 指定值，虽然该值并非必需。） 对于添加到 `BarkTable` 中的每个项目，您应收到一封新电子邮件。

   Lambda 函数仅处理您添加到 `BarkTable` 的新项目。如果您在表中更新或删除项目，函数不执行任何操作。

**注意**  
AWS Lambda 将诊断信息写入 Amazon CloudWatch Logs。如果您的 Lambda 函数出现错误，可以使用这些诊断信息排除故障。  
通过以下网址打开 CloudWatch 控制台：[https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)。
在导航窗格中，选择**日志**。
选择下列日志组：`/aws/lambda/publishNewBark`
选择最新日志流以查看函数输出（以及错误）。