教程 1:对 Amazon DynamoDB 使用筛选器处理所有事件,以及对 AWS Lambda 使用 AWS CLI 进行处理
在本教程中,您将创建 AWS Lambda 触发器以处理来自 DynamoDB 表的流。
主题
本教程的场景就是 Woofer 这个简单的社交网络。Woofer 用户使用发送给其他 Woofer 用户的 bark(短文本消息)进行通信。下图显示了此应用程序的组件和工作流。
-
用户将项目写入 DynamoDB 表 (
BarkTable
)。表中的每个项目代表一个 bark。 -
写入新的流记录,体现添加到
BarkTable
中的新项目。 -
新的流记录触发 AWS Lambda 函数 (
publishNewBark
)。 -
如果流记录指示新项目已添加到
BarkTable
,则 Lambda 函数会从流记录读取数据并将消息发布到 Amazon Simple Notification Service (Amazon SNS) 中的主题。 -
Amazon SNS 主题的订阅者收到消息。(在本教程中,唯一的订阅者是一个电子邮件地址。)
开始前的准备工作
本教程使用 AWS Command Line Interface AWS CLI。如果您尚未配置,请按照 AWS Command Line Interface 用户指南中的说明安装和配置 AWS CLI。
第 1 步:创建一个启用了流的 DynamoDB 表
在此步骤中,您将创建 DynamoDB 表 (BarkTable
) 以存储来自 Woofer 用户的所有 bark。主键由 Username
(分区键)和 Timestamp
(排序键)组成。这两个属性的类型为字符串。
BarkTable
启用了流。在本教程后面的部分中,您通过将 AWS Lambda 函数与流关联来创建触发器。
-
输入以下命令以创建表。
aws dynamodb create-table \ --table-name BarkTable \ --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \ --key-schema AttributeName=Username,KeyType=HASH AttributeName=Timestamp,KeyType=RANGE \ --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \ --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
-
在输出中,查找
LatestStreamArn
。... "LatestStreamArn": "arn:aws:dynamodb:
region
:accountID
:table/BarkTable/stream/timestamp ...记录
和region
,因为您在本教程接下来的步骤中需要这些信息。accountID
第 2 步:创建一个 Lambda 执行角色
在此步骤中,您将创建 AWS Identity and Access Management (IAM) 角色 (WooferLambdaRole
) 并向其分配权限。此角色将由您在第 4 步:创建并测试一个 Lambda 函数中创建的 Lambda 函数使用。
您还将为角色创建策略。策略包含 Lambda 函数在运行时需要的所有权限。
-
使用以下内容创建名为
trust-relationship.json
的文件。{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
-
输入以下命令来创建
WooferLambdaRole
。aws iam create-role --role-name WooferLambdaRole \ --path "/service-role/" \ --assume-role-policy-document file://trust-relationship.json
-
使用以下内容创建名为
role-policy.json
的文件。(将
和region
替换为您的 AWS 区域和帐户 ID。)accountID
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:region:accountID:*" }, { "Effect": "Allow", "Action": [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams" ], "Resource": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/*" }, { "Effect": "Allow", "Action": [ "sns:Publish" ], "Resource": [ "*" ] } ] }
策略有四个语句,允许
WooferLambdaRole
执行以下操作:-
运行 Lambda 函数 (
publishNewBark
)。您将在本教程的后面部分中创建函数。 -
访问 Amazon CloudWatch Logs Lambda 函数在运行时将诊断信息写入 CloudWatch Logs。
-
从
BarkTable
的 DynamoDB 流读取数据。 -
向 Amazon SNS 发布消息。
-
-
输入以下命令以将策略附加到
WooferLambdaRole
。aws iam put-role-policy --role-name WooferLambdaRole \ --policy-name WooferLambdaRolePolicy \ --policy-document file://role-policy.json
第 3 步:创建一个 Amazon SNS 主题
在此步骤中,您将创建 Amazon SNS 主题 (wooferTopic
) 并使用电子邮件地址订阅该主题。您的 Lambda 函数使用此主题发布来自 Woofer 用户的新 bark。
-
输入以下命令以创建新 Amazon SNS 主题。
aws sns create-topic --name wooferTopic
-
输入以下命令以使用电子邮件地址订阅
wooferTopic
。(使用您的 AWS 区域和账户 ID 替换
和region
,并使用有效的电子邮件地址替换accountID
。)example@example.com
aws sns subscribe \ --topic-arn arn:aws:sns:
region
:accountID
:wooferTopic \ --protocol email \ --notification-endpointexample@example.com
-
Amazon SNS 将向您的电子邮件地址发送确认邮件。选择该邮件中的确认订阅链接以完成订阅过程。
第 4 步:创建并测试一个 Lambda 函数
在此步骤中,您将创建 AWS Lambda 函数 (publishNewBark
) 以处理来自 BarkTable
的流记录。
publishNewBark
函数仅处理与 BarkTable
中的新项目对应的流事件。该函数从此类事件读取数据,然后调用 Amazon SNS 以发布该事件。
-
使用以下内容创建名为
publishNewBark.js
的文件。将
和region
替换为您的 AWS 区域和帐户 ID。accountID
'use strict'; var AWS = require("aws-sdk"); var sns = new AWS.SNS(); exports.handler = (event, context, callback) => { event.Records.forEach((record) => { console.log('Stream record: ', JSON.stringify(record, null, 2)); if (record.eventName == 'INSERT') { var who = JSON.stringify(record.dynamodb.NewImage.Username.S); var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S); var what = JSON.stringify(record.dynamodb.NewImage.Message.S); var params = { Subject: 'A new bark from ' + who, Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what, TopicArn: 'arn:aws:sns:
region
:accountID
:wooferTopic' }; sns.publish(params, function(err, data) { if (err) { console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2)); } else { console.log("Results from sending message: ", JSON.stringify(data, null, 2)); } }); } }); callback(null, `Successfully processed ${event.Records.length} records.`); }; -
创建包含
publishNewBark.js
的 zip 文件。如果您有 zip 命令行实用程序,则可以输入以下命令来完成此操作。zip publishNewBark.zip publishNewBark.js
-
当您创建 Lambda 函数时,为
WooferLambdaRole
指定您在 第 2 步:创建一个 Lambda 执行角色 中创建的 Amazon 资源名称(ARN)。输入以下命令检索此 ARN。aws iam get-role --role-name WooferLambdaRole
在输出中,查找
WooferLambdaRole
的 ARN。... "Arn": "arn:aws:iam::
region
:role/service-role/WooferLambdaRole" ...输入以下命令以创建 Lambda 函数。将
roleARN
替换为WooferLambdaRole
的 ARN。aws lambda create-function \ --region
region
\ --function-name publishNewBark \ --zip-file fileb://publishNewBark.zip \ --roleroleARN
\ --handler publishNewBark.handler \ --timeout 5 \ --runtime nodejs16.x -
现在测试
publishNewBark
,验证它可以正常使用。为此,您将提供类似于来自 DynamoDB Streams 的真实记录的输入。使用以下内容创建名为
payload.json
的文件。将
和region
替换为您的 AWS 区域和账户 ID。accountID
{ "Records": [ { "eventID": "7de3041dd709b024af6f29e4fa13d34c", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "
region
", "dynamodb": { "ApproximateCreationDateTime": 1479499740, "Keys": { "Timestamp": { "S": "2016-11-18:12:09:36" }, "Username": { "S": "John Doe" } }, "NewImage": { "Timestamp": { "S": "2016-11-18:12:09:36" }, "Message": { "S": "This is a bark from the Woofer social network" }, "Username": { "S": "John Doe" } }, "SequenceNumber": "13021600000000001596893679", "SizeBytes": 112, "StreamViewType": "NEW_IMAGE" }, "eventSourceARN": "arn:aws:dynamodb:region
:account ID
:table/BarkTable/stream/2016-11-16T20:42:48.104" } ] }输入以下命令以测试
publishNewBark
函数。aws lambda invoke --function-name publishNewBark --payload file://payload.json --cli-binary-format raw-in-base64-out output.txt
如果测试成功,您将看到以下输出。
{ "StatusCode": 200, "ExecutedVersion": "$LATEST" }
此外,
output.txt
文件将包含以下文本。"Successfully processed 1 records."
您还会在数分钟内收到一封新电子邮件。
注意
AWS Lambda 将诊断信息写入 Amazon CloudWatch Logs。如果您的 Lambda 函数出现错误,可以使用这些诊断信息排除故障:
打开 CloudWatch 控制台:https://console.aws.amazon.com/cloudwatch/
。 -
在导航窗格中,选择日志。
-
选择下列日志组:
/aws/lambda/publishNewBark
-
选择最新日志流以查看函数输出(以及错误)。
第 5 步:创建并测试一个触发器
在 第 4 步:创建并测试一个 Lambda 函数 中,您测试了 Lambda 函数以确保它正确运行。在此步骤中,关联 Lambda 函数 (publishNewBark
) 与事件源(BarkTable
流),创建触发器。
-
在创建触发器时,您需要为
BarkTable
流指定 ARN。输入以下命令检索此 ARN。aws dynamodb describe-table --table-name BarkTable
在输出中,查找
LatestStreamArn
。... "LatestStreamArn": "arn:aws:dynamodb:
region
:accountID
:table/BarkTable/stream/timestamp ... -
输入以下命令以创建触发器。使用实际流 ARN 替换
。streamARN
aws lambda create-event-source-mapping \ --region
region
\ --function-name publishNewBark \ --event-sourcestreamARN
\ --batch-size 1 \ --starting-position TRIM_HORIZON -
测试触发器。键入以下命令以将项目添加到
BarkTable
。aws dynamodb put-item \ --table-name BarkTable \ --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}
您应在数分钟内收到一封新电子邮件。
-
打开 DynamoDB 控制台并再将几个项目添加到
BarkTable
。您必须为Username
和Timestamp
属性指定值。(您还应为Message
指定值,虽然该值并非必需。) 对于添加到BarkTable
中的每个项目,您应收到一封新电子邮件。Lambda 函数仅处理您添加到
BarkTable
的新项目。如果您在表中更新或删除项目,函数不执行任何操作。
注意
AWS Lambda 将诊断信息写入 Amazon CloudWatch Logs。如果您的 Lambda 函数出现错误,可以使用这些诊断信息排除故障。
通过以下网址打开 CloudWatch 控制台:https://console.aws.amazon.com/cloudwatch/
。 -
在导航窗格中,选择日志。
-
选择下列日志组:
/aws/lambda/publishNewBark
-
选择最新日志流以查看函数输出(以及错误)。