

# 教程：将 AWS Lambda 与 Amazon DynamoDB Streams 结合使用
<a name="with-ddb-example"></a>

 在本教程中，您将创建 Lambda 函数处理来自 Amazon DynamoDB Streams 的事件。

## 先决条件
<a name="with-ddb-prepare"></a>

### 安装 AWS Command Line Interface
<a name="install_aws_cli"></a>

如果您尚未安装 AWS Command Line Interface，请按照[安装或更新最新版本的 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) 中的步骤进行安装。

本教程需要命令行终端或 Shell 来运行命令。在 Linux 和 macOS 中，可使用您首选的 Shell 和程序包管理器。

**注意**  
在 Windows 中，操作系统的内置终端不支持您经常与 Lambda 一起使用的某些 Bash CLI 命令（例如 `zip`）。[安装 Windows Subsystem for Linux](https://docs.microsoft.com/en-us/windows/wsl/install-win10)，获取 Ubuntu 和 Bash 与 Windows 集成的版本。

## 创建执行角色
<a name="with-ddb-create-execution-role"></a>

创建[执行角色](lambda-intro-execution-role.md)，向您的函数授予访问 AWS 资源的权限。

**创建执行角色**

1. 在 IAM 控制台中，打开 [Roles（角色）页面](https://console.aws.amazon.com/iam/home#/roles)。

1. 选择**创建角色**。

1. 创建具有以下属性的角色。
   + **Trusted entity**（可信任的实体）– Lambda。
   + **Permissions**（权限）– **AWSLambdaDynamoDBExecutionRole**。
   + **Role name**（角色名称）– **lambda-dynamodb-role**。

**AWSLambdaDynamoDBExecutionRole** 具有该函数所需的权限以从 DynamoDB 中读取项目并将日志写入 CloudWatch Logs。

## 创建函数
<a name="with-ddb-example-create-function"></a>

创建一个 Lambda 函数来处理 DynamoDB 事件。函数代码会将一些传入的事件数据写入 CloudWatch Logs。

------
#### [ .NET ]

**适用于 .NET 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 .NET 将 DynamoDB 事件与 Lambda 结合使用。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");

        foreach (var record in dynamoEvent.Records)
        {
            context.Logger.LogInformation($"Event ID: {record.EventID}");
            context.Logger.LogInformation($"Event Name: {record.EventName}");

            context.Logger.LogInformation(JsonSerializer.Serialize(record));
        }

        context.Logger.LogInformation("Stream processing complete.");
    }
}
```

------
#### [ Go ]

**适用于 Go 的 SDK V2**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Go 将 DynamoDB 事件与 Lambda 结合使用。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-lambda-go/events"
	"fmt"
)

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) {
	if len(event.Records) == 0 {
		return nil, fmt.Errorf("received empty event")
	}

	for _, record := range event.Records {
	 	LogDynamoDBRecord(record)
	}

	message := fmt.Sprintf("Records processed: %d", len(event.Records))
	return &message, nil
}

func main() {
	lambda.Start(HandleRequest)
}

func LogDynamoDBRecord(record events.DynamoDBEventRecord){
	fmt.Println(record.EventID)
	fmt.Println(record.EventName)
	fmt.Printf("%+v\n", record.Change)
}
```

------
#### [ Java ]

**适用于 Java 的 SDK 2.x**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Java 将 DynamoDB 事件与 Lambda 结合使用。  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

public class example implements RequestHandler<DynamodbEvent, Void> {

    private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create();

    @Override
    public Void handleRequest(DynamodbEvent event, Context context) {
        System.out.println(GSON.toJson(event));
        event.getRecords().forEach(this::logDynamoDBRecord);
        return null;
    }

    private void logDynamoDBRecord(DynamodbStreamRecord record) {
        System.out.println(record.getEventID());
        System.out.println(record.getEventName());
        System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb()));
    }
}
```

------
#### [ JavaScript ]

**SDK for JavaScript（v3）**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 JavaScript 将 DynamoDB 事件与 Lambda 结合使用。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
};

const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```
使用 TypeScript 将 DynamoDB 事件与 Lambda 结合使用。  

```
export const handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
}
const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```

------
#### [ PHP ]

**适用于 PHP 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 PHP 将 DynamoDB 事件与 Lambda 结合使用。  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\DynamoDb\DynamoDbHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends DynamoDbHandler
{
    private StderrLogger $logger;

    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleDynamoDb(DynamoDbEvent $event, Context $context): void
    {
        $this->logger->info("Processing DynamoDb table items");
        $records = $event->getRecords();

        foreach ($records as $record) {
            $eventName = $record->getEventName();
            $keys = $record->getKeys();
            $old = $record->getOldImage();
            $new = $record->getNewImage();
            
            $this->logger->info("Event Name:".$eventName."\n");
            $this->logger->info("Keys:". json_encode($keys)."\n");
            $this->logger->info("Old Image:". json_encode($old)."\n");
            $this->logger->info("New Image:". json_encode($new));
            
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }

        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords items");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 Python 将 DynamoDB 事件与 Lambda 结合使用。  

```
import json

def lambda_handler(event, context):
    print(json.dumps(event, indent=2))

    for record in event['Records']:
        log_dynamodb_record(record)

def log_dynamodb_record(record):
    print(record['eventID'])
    print(record['eventName'])
    print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
```

------
#### [ Ruby ]

**适用于 Ruby 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
通过 Ruby 将 DynamoDB 事件与 Lambda 结合使用。  

```
def lambda_handler(event:, context:)
    return 'received empty event' if event['Records'].empty?
  
    event['Records'].each do |record|
      log_dynamodb_record(record)
    end
  
    "Records processed: #{event['Records'].length}"
  end
  
  def log_dynamodb_record(record)
    puts record['eventID']
    puts record['eventName']
    puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}"
  end
```

------
#### [ Rust ]

**适用于 Rust 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Rust 将 DynamoDB 事件与 Lambda 结合使用。  

```
use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord},
   };


// Built with the following dependencies:
//lambda_runtime = "0.11.1"
//serde_json = "1.0"
//tokio = { version = "1", features = ["macros"] }
//tracing = { version = "0.1", features = ["log"] }
//tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
//aws_lambda_events = "0.15.0"

async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> {
    
    let records = &event.payload.records;
    tracing::info!("event payload: {:?}",records);
    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    for record in records{
        log_dynamo_dbrecord(record);
    }

    tracing::info!("Dynamo db records processed");

    // Prepare the response
    Ok(())

}

fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{
    tracing::info!("EventId: {}", record.event_id);
    tracing::info!("EventName: {}", record.event_name);
    tracing::info!("DynamoDB Record: {:?}", record.change );
    Ok(())

}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
    .with_max_level(tracing::Level::INFO)
    .with_target(false)
    .without_time()
    .init();

    let func = service_fn(function_handler);
    lambda_runtime::run(func).await?;
    Ok(())
    
}
```

------

**创建函数**

1. 将示例代码复制到名为 `example.js` 的文件中。

1. 创建部署程序包。

   ```
   zip function.zip example.js
   ```

1. 使用 `create-function` 命令创建 Lambda 函数。

   ```
   aws lambda create-function --function-name ProcessDynamoDBRecords \
       --zip-file fileb://function.zip --handler example.handler --runtime nodejs24.x \
       --role arn:aws:iam::111122223333:role/lambda-dynamodb-role
   ```

## 测试 Lambda 函数
<a name="with-dbb-invoke-manually"></a>

在本步骤中，您将使用 `invoke` AWS Lambda CLI 命令和以下示例 DynamoDB 事件手动调用 Lambda 函数。将以下内容复制到名为 `input.txt` 的文件中。

**Example input.txt**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ]
}
```

运行以下 `invoke` 命令：

```
aws lambda invoke --function-name ProcessDynamoDBRecords \
    --cli-binary-format raw-in-base64-out \
    --payload file://input.txt outputfile.txt
```

如果使用 **cli-binary-format** 版本 2，则 AWS CLI 选项是必需的。要将其设为默认设置，请运行 `aws configure set cli-binary-format raw-in-base64-out`。有关更多信息，请参阅*版本 2 的 AWS Command Line Interface 用户指南*中的 [AWS CLI 支持的全局命令行选项](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list)。

函数在响应正文中返回字符串 `message`。

在 `outputfile.txt` 文件中验证输出。

## 创建启用流的 DynamoDB 表
<a name="with-ddb-create-buckets"></a>

创建启用流的 Amazon DynamoDB 表。

**创建 DynamoDB 表**

1. 打开 [DynamoDB 控制台](https://console.aws.amazon.com/dynamodb)。

1. 选择 **Create Table**。

1. 使用以下设置创建表。
   + **Table name**（表名称）- **lambda-dynamodb-stream**
   + **Primary key**（主键）– **id**（字符串）

1. 选择**创建**。

**启用流**

1. 打开 [DynamoDB 控制台](https://console.aws.amazon.com/dynamodb)。

1. 选择**表**。

1. 选择 **lambda-dynamodb-stream** 表。

1. 在 **Exports and streams**（导出和流）下，选择 **DynamoDB stream details**（DynamoDB 流详细信息）。

1. 选择**打开**。

1. 对于**视图类型**，选择**仅键属性**。

1. 选择**开启流**。

记下流 ARN。在下一步中将该流与 Lambda 函数关联时，您将需要此类信息。有关启用流的更多信息，请参阅[使用 DynamoDB Streams 捕获表活动](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)。

## 在 AWS Lambda 中添加事件源
<a name="with-ddb-attach-notification-configuration"></a>

在 AWS Lambda 中创建事件源映射。此事件源映射将 DynamoDB Streams 与 Lambda 函数关联。创建此事件源映射后，AWS Lambda 即开始轮询该流。

运行以下 AWS CLI `create-event-source-mapping` 命令。命令运行后，记下 UUID。在任何命令中，如删除事件源映射时，您都需要该 UUID 来引用事件源映射。

```
aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \
    --batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn
```

 这会在指定的 DynamoDB Streams 和 Lambda 函数之间创建映射。您可将一个 DynamoDB Streams 关联到多个 Lambda 函数，也可将同一个 Lambda 函数关联到多个流。但是，Lambda 函数将共享其所共享的流的读取吞吐量。

您可以通过运行以下命令获取事件源映射的列表。

```
aws lambda list-event-source-mappings
```

该列表返回您创建的所有事件源映射，而对于每个映射，它都显示 `LastProcessingResult` 等信息。该字段用于在出现任何问题时提供信息性消息。`No records processed`（指示 AWS Lambda 未开始轮询或流中没有任何记录）和 `OK`（指示 AWS Lambda 已成功读取流中的记录并已调用 Lambda 函数）等值表示未出现任何问题。如果出现问题，您将收到一条错误消息。

如果您有大量事件源映射，请使用函数名称参数缩窄结果范围。

```
aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords
```

## 测试设置
<a name="with-ddb-final-integration-test-no-iam"></a>

测试端到端体验。当您更新表时，DynamoDB 会将事件记录写入流。当 AWS Lambda 轮询该流时，它将在流中检测新记录并通过向该函数传递事件来代表您调用 Lambda 函数。

1. 在 DynamoDB 控制台中，添加、更新、删除表中的项目。DynamoDB 会将这些操作记录写入流。

1. AWS Lambda 会轮询该流，当检测到流有更新时，它会通过传递在流中发现的事件数据来调用 Lambda 函数。

1. 函数运行并在 Amazon CloudWatch 中创建日志。您可以验证 Amazon CloudWatch 控制台中报告的日志。

## 后续步骤
<a name="with-ddb-next-steps"></a>

本教程介绍使用 Lambda 处理 DynamoDB 流事件的基础知识。对于生产工作负载，请考虑实施部分批处理响应逻辑，以更有效地处理单个记录故障。Powertools for AWS Lambda 中的[批处理器实用程序](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)适用于 Python、TypeScript、.NET 和 Java，并为此提供了强大的解决方案，可以自动处理部分批处理响应的复杂性并减少成功处理记录的重试次数。

## 清除资源
<a name="cleanup"></a>

除非您想要保留为本教程创建的资源，否则可立即将其删除。通过删除您不再使用的 AWS 资源，可防止您的 AWS 账户 产生不必要的费用。

**删除 Lambda 函数**

1. 打开 Lamba 控制台的 [Functions（函数）页面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 选择您创建的函数。

1. 依次选择**操作**和**删除**。

1. 在文本输入字段中键入 **confirm**，然后选择**删除**。

**删除执行角色**

1. 打开 IAM 控制台的[角色页面](https://console.aws.amazon.com/iam/home#/roles)。

1. 选择您创建的执行角色。

1. 选择**删除**。

1. 在文本输入字段中输入角色名称，然后选择**删除**。

**删除 DynamoDB 表**

1. 打开 DynamoDB 控制台中 [Tables page](https://console.aws.amazon.com//dynamodb/home#tables:)（表页面）。

1. 选择您创建的表。

1. 选择 **Delete**。

1. 在文本框中输入 **delete**。

1. 选择 **删除表**。