

# 教程：将 AWS Lambda 与 Amazon DocumentDB 流结合使用
<a name="with-documentdb-tutorial"></a>

 在本教程中，您将创建一个基本的 Lambda 函数，该函数会消耗来自 Amazon DocumentDB（与 MongoDB 兼容）更改流的事件。要完成本教程，您需要经历以下阶段：
+ 设置 Amazon DocumentDB 集群，连接到该集群，并在其上激活更改流。
+ 创建 Lambda 函数，并将 Amazon DocumentDB 集群配置为函数的事件源。
+ 通过将项目插入到 Amazon DocumentDB 数据库中来测试设置。

## 创建 Amazon DocumentDB 集群
<a name="docdb-documentdb-cluster"></a>

1. 打开 [Amazon DocumentDB 控制台](https://console.aws.amazon.com/docdb/home#)。在**集群**下，选择**创建**。

1. 使用以下配置创建集群：
   + 对于**集群类型**，选择**基于实例的集群**。这是默认选项。
   + 在**集群配置**下，确保已选择**引擎版本** 5.0.0。这是默认选项。
   + 在**实例配置**下：
     + 对于**数据库实例类**，选择**内存优化类**。这是默认选项。
     + 对于**常规副本实例数**，请选择 1。
     + 对于**实例类**，使用默认选择。
   + 在**身份验证**下，输入主要用户的用户名，然后选择**自行管理**。输入密码，然后确认密码。
   + 保留所有其他默认设置。

1. 选择**创建集群**。

## 在 Secrets Manager 中创建密钥
<a name="docdb-secret-in-secrets-manager"></a>

在 Amazon DocumentDB 创建您的集群时，请创建一个 AWS Secrets Manager 密钥来存储数据库凭证。在稍后的步骤中创建 Lambda 事件源映射时，您将提供此密钥。

**在 Secrets Manager 中创建密钥**

1. 打开 [Secrets Manager](https://console.aws.amazon.com/secretsmanager/home#) 控制台并选择**存储新密钥**。

1. 对于**选择密钥类型**，请选择以下选项之一：
   + 在**基本详细信息**下：
     + **密钥类型**：用于 Amazon DocumentDB 数据库的凭证
     + 在**凭证**下，输入用于创建 Amazon DocumentDB 集群的相同用户名和密码。
     + **数据库**：选择 Amazon DocumentDB 集群。
     + 选择**下一步**。

1. 对于**配置密钥**，请选择以下选项之一：
   + **密钥名称**：`DocumentDBSecret`
   + 选择**下一步**。

1. 选择**下一步**。

1. 选择**存储**。

1. 刷新控制台以验证 `DocumentDBSecret` 密钥是否成功存储。

记下**密钥 ARN**。您将在后面的步骤中用到它。

## 连接到集群
<a name="docdb-connect-to-cluster"></a>

**使用 AWS CloudShell 连接到 Amazon DocumentDB 集群**

1. 在 Amazon DocumentDB 管理控制台上的**集群**下，找到您创建的集群。单击集群旁边的复选框，选择您的集群。

1. 选择**连接到集群**。将出现 CloudShell **运行命令**屏幕。

1. 在**新环境名称**字段中，输入唯一的名称，例如“test”，然后选择**创建并运行**。

1. 出现提示时请输入密码。当提示符变成 `rs0 [direct: primary] <env-name>>` 时，您成功连接到您的 Amazon DocumentDB 集群。

## 激活更改流
<a name="docdb-activate-change-streams"></a>

在本教程中，您将跟踪对 Amazon DocumentDB 集群中 `docdbdemo` 数据库 `products` 集合的更改。您可以通过激活[更改流](https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html)来完成此操作。

**在集群内创建新数据库**

1. 运行以下命令，创建名为 `docdbdemo` 的新数据库：

   ```
   use docdbdemo
   ```

1. 在终端窗口中，使用以下命令将记录插入到 `docdbdemo` 中：

   ```
   db.products.insertOne({"hello":"world"})
   ```

   您应看到类似如下的输出：

   ```
   {
     acknowledged: true,
     insertedId: ObjectId('67f85066ca526410fd531d59')
   }
   ```

1. 接下来，使用以下命令激活 `docdbdemo` 数据库 `products` 集合上的更改流：

   ```
   db.adminCommand({modifyChangeStreams: 1,
       database: "docdbdemo",
       collection: "products", 
       enable: true});
   ```

    应看到类似如下内容的输出：

   ```
   { "ok" : 1, "operationTime" : Timestamp(1680126165, 1) }
   ```

## 创建接口 VPC 端点
<a name="docdb-create-interface-vpc-endpoints"></a>

接下来，创建[接口 VPC 端点](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html#create-interface-endpoint-aws)，以确保 Lambda 和 Secrets Manager（稍后用于存储集群访问凭证）能够连接到默认 VPC。

**创建接口 VPC 端点**

1. 打开 [VPC 控制台](https://console.aws.amazon.com/vpc/home#)。在左侧菜单的**虚拟私有云**下，选择**端点**。

1. 选择**创建端点**。使用以下配置创建端点：
   + 对于**名称标签**，输入 `lambda-default-vpc`。
   + 对于**服务类别**，选择 AWS 服务。
   + 对于**服务**，在搜索框中输入 `lambda`。选择格式为 `com.amazonaws.<region>.lambda` 的服务。
   + 对于 **VPC**，选择您的 Amazon DocumentDB 集群所在的 VPC。这通常是[默认 VPC](https://docs.aws.amazon.com/vpc/latest/userguide/default-vpc.html)。
   + 对于**子网**，选中每个可用区旁边的复选框。为每个可用区选择正确的子网 ID。
   + 对于 **IP 地址类型**，选择 IPv4。
   + 对于**安全组**，选择 Amazon DocumentDB 集群使用的安全组。这通常是 `default` 安全组。
   + 保留所有其他默认设置。
   + 选择**创建端点**。

1. 再次选择**创建端点**。使用以下配置创建端点：
   + 对于**名称标签**，输入 `secretsmanager-default-vpc`。
   + 对于**服务类别**，选择 AWS 服务。
   + 对于**服务**，在搜索框中输入 `secretsmanager`。选择格式为 `com.amazonaws.<region>.secretsmanager` 的服务。
   + 对于 **VPC**，选择您的 Amazon DocumentDB 集群所在的 VPC。这通常是[默认 VPC](https://docs.aws.amazon.com/vpc/latest/userguide/default-vpc.html)。
   + 对于**子网**，选中每个可用区旁边的复选框。为每个可用区选择正确的子网 ID。
   + 对于 **IP 地址类型**，选择 IPv4。
   + 对于**安全组**，选择 Amazon DocumentDB 集群使用的安全组。这通常是 `default` 安全组。
   + 保留所有其他默认设置。
   + 选择**创建端点**。

 本教程的集群设置部分到此完成。

## 创建执行角色
<a name="docdb-create-the-execution-role"></a>

 在接下来的一组步骤中，您将创建 Lambda 函数。首先，您需要创建执行角色，以向函数授予访问集群的权限。为此，您可以先创建 IAM policy，然后将此策略附加到 IAM 角色。

**创建 IAM policy**

1. 在 IAM 控制台中打开[策略页面](https://console.aws.amazon.com/iam/home#/policies)，然后选择**创建策略**。

1. 选择 **JSON** 选项卡。在以下策略中，将语句最后一行中的 Secrets Manager 资源 ARN 替换为之前的密钥 ARN，然后将策略复制到编辑器中。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "LambdaESMNetworkingAccess",
               "Effect": "Allow",
               "Action": [
                   "ec2:CreateNetworkInterface",
                   "ec2:DescribeNetworkInterfaces",
                   "ec2:DescribeVpcs",
                   "ec2:DeleteNetworkInterface",
                   "ec2:DescribeSubnets",
                   "ec2:DescribeSecurityGroups",
                   "kms:Decrypt"
               ],
               "Resource": "*"
           },
           {
               "Sid": "LambdaDocDBESMAccess",
               "Effect": "Allow",
               "Action": [
                   "rds:DescribeDBClusters",
                   "rds:DescribeDBClusterParameters",
                   "rds:DescribeDBSubnetGroups"
               ],
               "Resource": "*"
           },
           {
               "Sid": "LambdaDocDBESMGetSecretValueAccess",
               "Effect": "Allow",
               "Action": [
                   "secretsmanager:GetSecretValue"
               ],
               "Resource": "arn:aws:secretsmanager:us-east-1:123456789012:secret:DocumentDBSecret"
           }
       ]
   }
   ```

------

1. 选择**下一步：标签**，然后选择**下一步：审核**。

1. 对于 **Name (名称)**，请输入 `AWSDocumentDBLambdaPolicy`。

1. 选择**创建策略**。

**创建 IAM 角色**

1. 在 IAM 控制台中打开[角色页面](https://console.aws.amazon.com/iam/home#/roles)，然后选择**创建角色**。

1. 对于**选择可信实体**，请选择以下选项之一：
   + **可信实体类型**：AWS 服务
   + **服务或使用案例**：Lambda
   + 选择**下一步**。

1. 对于**添加权限**，选择刚刚创建的 `AWSDocumentDBLambdaPolicy` 策略以及 `AWSLambdaBasicExecutionRole`，以向函数授予写入 Amazon CloudWatch Logs 的权限。

1. 选择**下一步**。

1. 对于 **Role name（角色名称）**，输入 `AWSDocumentDBLambdaExecutionRole`。

1. 请选择**创建角色**。

## 创建 Lambda 函数
<a name="docdb-create-the-lambda-function"></a>

本教程使用 Python 3.14 运行时系统，但我们还提供了适用于其他运行时系统的示例代码文件。您可以选择以下框中的选项卡，查看适用于您感兴趣的运行时系统的代码。

代码接收 Amazon DocumentDB 事件输入并对其所包含的消息进行处理。

**创建 Lambda 函数**

1. 打开 Lamba 控制台的 [Functions page](https://console.aws.amazon.com/lambda/home#/functions)（函数页面）。

1. 选择 **Create function**（创建函数）。

1. 选择**从头开始编写**。

1. 在**基本信息**中，执行以下操作：

   1. 对于**函数名称**，输入 `ProcessDocumentDBRecords`。

   1. 对于**运行时系统**，选择 **Python 3.14**。

   1. 对于**架构**，选择 **x86\$164**。

1. 在**更改默认执行角色**选项卡中，执行以下操作：

   1. 展开选项卡，然后选择**使用现有角色**。

   1. 选择您之前创建的 `AWSDocumentDBLambdaExecutionRole`。

1. 选择**创建函数**。

**要部署函数代码**

1. 在下框中选择 **Python** 选项卡并复制代码。

------
#### [ .NET ]

**适用于 .NET 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-docdb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 .NET 将 Amazon DocumentDB 事件与 Lambda 结合使用。  

   ```
   using Amazon.Lambda.Core;
   using System.Text.Json;
   using System;
   using System.Collections.Generic;
   using System.Text.Json.Serialization;
   //Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
   [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
   
   namespace LambdaDocDb;
   
   public class Function
   {
       
        /// <summary>
       /// Lambda function entry point to process Amazon DocumentDB events.
       /// </summary>
       /// <param name="event">The Amazon DocumentDB event.</param>
       /// <param name="context">The Lambda context object.</param>
       /// <returns>A string to indicate successful processing.</returns>
       public string FunctionHandler(Event evnt, ILambdaContext context)
       {
           
           foreach (var record in evnt.Events)
           {
               ProcessDocumentDBEvent(record, context);
           }
   
           return "OK";
       }
   
        private void ProcessDocumentDBEvent(DocumentDBEventRecord record, ILambdaContext context)
       {
           
           var eventData = record.Event;
           var operationType = eventData.OperationType;
           var databaseName = eventData.Ns.Db;
           var collectionName = eventData.Ns.Coll;
           var fullDocument = JsonSerializer.Serialize(eventData.FullDocument, new JsonSerializerOptions { WriteIndented = true });
   
           context.Logger.LogLine($"Operation type: {operationType}");
           context.Logger.LogLine($"Database: {databaseName}");
           context.Logger.LogLine($"Collection: {collectionName}");
           context.Logger.LogLine($"Full document:\n{fullDocument}");
       }
   
   
   
       public class Event
       {
           [JsonPropertyName("eventSourceArn")]
           public string EventSourceArn { get; set; }
   
           [JsonPropertyName("events")]
           public List<DocumentDBEventRecord> Events { get; set; }
   
           [JsonPropertyName("eventSource")]
           public string EventSource { get; set; }
       }
   
       public class DocumentDBEventRecord
       {
           [JsonPropertyName("event")]
           public EventData Event { get; set; }
       }
   
       public class EventData
       {
           [JsonPropertyName("_id")]
           public IdData Id { get; set; }
   
           [JsonPropertyName("clusterTime")]
           public ClusterTime ClusterTime { get; set; }
   
           [JsonPropertyName("documentKey")]
           public DocumentKey DocumentKey { get; set; }
   
           [JsonPropertyName("fullDocument")]
           public Dictionary<string, object> FullDocument { get; set; }
   
           [JsonPropertyName("ns")]
           public Namespace Ns { get; set; }
   
           [JsonPropertyName("operationType")]
           public string OperationType { get; set; }
       }
   
       public class IdData
       {
           [JsonPropertyName("_data")]
           public string Data { get; set; }
       }
   
       public class ClusterTime
       {
           [JsonPropertyName("$timestamp")]
           public Timestamp Timestamp { get; set; }
       }
   
       public class Timestamp
       {
           [JsonPropertyName("t")]
           public long T { get; set; }
   
           [JsonPropertyName("i")]
           public int I { get; set; }
       }
   
       public class DocumentKey
       {
           [JsonPropertyName("_id")]
           public Id Id { get; set; }
       }
   
       public class Id
       {
           [JsonPropertyName("$oid")]
           public string Oid { get; set; }
       }
   
       public class Namespace
       {
           [JsonPropertyName("db")]
           public string Db { get; set; }
   
           [JsonPropertyName("coll")]
           public string Coll { get; set; }
       }
   }
   ```

------
#### [ Go ]

**适用于 Go 的 SDK V2**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-docdb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Go 将 Amazon DocumentDB 事件与 Lambda 结合使用。  

   ```
   package main
   
   import (
   	"context"
   	"encoding/json"
   	"fmt"
   
   	"github.com/aws/aws-lambda-go/lambda"
   )
   
   type Event struct {
   	Events []Record `json:"events"`
   }
   
   type Record struct {
   	Event struct {
   		OperationType string `json:"operationType"`
   		NS            struct {
   			DB   string `json:"db"`
   			Coll string `json:"coll"`
   		} `json:"ns"`
   		FullDocument interface{} `json:"fullDocument"`
   	} `json:"event"`
   }
   
   func main() {
   	lambda.Start(handler)
   }
   
   func handler(ctx context.Context, event Event) (string, error) {
   	fmt.Println("Loading function")
   	for _, record := range event.Events {
   		logDocumentDBEvent(record)
   	}
   
   	return "OK", nil
   }
   
   func logDocumentDBEvent(record Record) {
   	fmt.Printf("Operation type: %s\n", record.Event.OperationType)
   	fmt.Printf("db: %s\n", record.Event.NS.DB)
   	fmt.Printf("collection: %s\n", record.Event.NS.Coll)
   	docBytes, _ := json.MarshalIndent(record.Event.FullDocument, "", "  ")
   	fmt.Printf("Full document: %s\n", string(docBytes))
   }
   ```

------
#### [ Java ]

**适用于 Java 的 SDK 2.x**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-docdb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Java 将 Amazon DocumentDB 事件与 Lambda 结合使用。  

   ```
   import java.util.List;
   import java.util.Map;
   
   import com.amazonaws.services.lambda.runtime.Context;
   import com.amazonaws.services.lambda.runtime.RequestHandler;
   
   public class Example implements RequestHandler<Map<String, Object>, String> {
   
       @SuppressWarnings("unchecked")
       @Override
       public String handleRequest(Map<String, Object> event, Context context) {
           List<Map<String, Object>> events = (List<Map<String, Object>>) event.get("events");
           for (Map<String, Object> record : events) {
               Map<String, Object> eventData = (Map<String, Object>) record.get("event");
               processEventData(eventData);
           }
   
           return "OK";
       }
   
       @SuppressWarnings("unchecked")
       private void processEventData(Map<String, Object> eventData) {
           String operationType = (String) eventData.get("operationType");
           System.out.println("operationType: %s".formatted(operationType));
   
           Map<String, Object> ns = (Map<String, Object>) eventData.get("ns");
   
           String db = (String) ns.get("db");
           System.out.println("db: %s".formatted(db));
           String coll = (String) ns.get("coll");
           System.out.println("coll: %s".formatted(coll));
   
           Map<String, Object> fullDocument = (Map<String, Object>) eventData.get("fullDocument");
           System.out.println("fullDocument: %s".formatted(fullDocument));
       }
   
   }
   ```

------
#### [ JavaScript ]

**SDK for JavaScript (v3)**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-docdb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 JavaScript 将 Amazon DocumentDB 事件与 Lambda 结合使用。  

   ```
   console.log('Loading function');
   exports.handler = async (event, context) => {
       event.events.forEach(record => {
           logDocumentDBEvent(record);
       });
       return 'OK';
   };
   
   const logDocumentDBEvent = (record) => {
       console.log('Operation type: ' + record.event.operationType);
       console.log('db: ' + record.event.ns.db);
       console.log('collection: ' + record.event.ns.coll);
       console.log('Full document:', JSON.stringify(record.event.fullDocument, null, 2));
   };
   ```
使用 TypeScript 将 Amazon DocumentDB 事件与 Lambda 结合使用  

   ```
   import { DocumentDBEventRecord, DocumentDBEventSubscriptionContext } from 'aws-lambda';
   
   console.log('Loading function');
   
   export const handler = async (
     event: DocumentDBEventSubscriptionContext,
     context: any
   ): Promise<string> => {
     event.events.forEach((record: DocumentDBEventRecord) => {
       logDocumentDBEvent(record);
     });
     return 'OK';
   };
   
   const logDocumentDBEvent = (record: DocumentDBEventRecord): void => {
     console.log('Operation type: ' + record.event.operationType);
     console.log('db: ' + record.event.ns.db);
     console.log('collection: ' + record.event.ns.coll);
     console.log('Full document:', JSON.stringify(record.event.fullDocument, null, 2));
   };
   ```

------
#### [ PHP ]

**适用于 PHP 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-docdb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 PHP 将 Amazon DocumentDB 事件与 Lambda 结合使用。  

   ```
   <?php
   
   require __DIR__.'/vendor/autoload.php';
   
   use Bref\Context\Context;
   use Bref\Event\Handler;
   
   class DocumentDBEventHandler implements Handler
   {
       public function handle($event, Context $context): string
       {
   
           $events = $event['events'] ?? [];
           foreach ($events as $record) {
               $this->logDocumentDBEvent($record['event']);
           }
           return 'OK';
       }
   
       private function logDocumentDBEvent($event): void
       {
           // Extract information from the event record
   
           $operationType = $event['operationType'] ?? 'Unknown';
           $db = $event['ns']['db'] ?? 'Unknown';
           $collection = $event['ns']['coll'] ?? 'Unknown';
           $fullDocument = $event['fullDocument'] ?? [];
   
           // Log the event details
   
           echo "Operation type: $operationType\n";
           echo "Database: $db\n";
           echo "Collection: $collection\n";
           echo "Full document: " . json_encode($fullDocument, JSON_PRETTY_PRINT) . "\n";
       }
   }
   return new DocumentDBEventHandler();
   ```

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-docdb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Python 将 Amazon DocumentDB 事件与 Lambda 结合使用。  

   ```
   import json
   
   def lambda_handler(event, context):
       for record in event.get('events', []):
           log_document_db_event(record)
       return 'OK'
   
   def log_document_db_event(record):
       event_data = record.get('event', {})
       operation_type = event_data.get('operationType', 'Unknown')
       db = event_data.get('ns', {}).get('db', 'Unknown')
       collection = event_data.get('ns', {}).get('coll', 'Unknown')
       full_document = event_data.get('fullDocument', {})
   
       print(f"Operation type: {operation_type}")
       print(f"db: {db}")
       print(f"collection: {collection}")
       print("Full document:", json.dumps(full_document, indent=2))
   ```

------
#### [ Ruby ]

**适用于 Ruby 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-docdb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Ruby 将 Amazon DocumentDB 事件与 Lambda 结合使用。  

   ```
   require 'json'
   
   def lambda_handler(event:, context:)
     event['events'].each do |record|
       log_document_db_event(record)
     end
     'OK'
   end
   
   def log_document_db_event(record)
     event_data = record['event'] || {}
     operation_type = event_data['operationType'] || 'Unknown'
     db = event_data.dig('ns', 'db') || 'Unknown'
     collection = event_data.dig('ns', 'coll') || 'Unknown'
     full_document = event_data['fullDocument'] || {}
   
     puts "Operation type: #{operation_type}"
     puts "db: #{db}"
     puts "collection: #{collection}"
     puts "Full document: #{JSON.pretty_generate(full_document)}"
   end
   ```

------
#### [ Rust ]

**适用于 Rust 的 SDK**  
 查看 GitHub，了解更多信息。在[无服务器示例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-docdb-to-lambda)存储库中查找完整示例，并了解如何进行设置和运行。
使用 Rust 将 Amazon DocumentDB 事件与 Lambda 结合使用。  

   ```
   use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
   use aws_lambda_events::{
       event::documentdb::{DocumentDbEvent, DocumentDbInnerEvent},
      };
   
   
   // Built with the following dependencies:
   //lambda_runtime = "0.11.1"
   //serde_json = "1.0"
   //tokio = { version = "1", features = ["macros"] }
   //tracing = { version = "0.1", features = ["log"] }
   //tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
   //aws_lambda_events = "0.15.0"
   
   async fn function_handler(event: LambdaEvent<DocumentDbEvent>) ->Result<(), Error> {
       
       tracing::info!("Event Source ARN: {:?}", event.payload.event_source_arn);
       tracing::info!("Event Source: {:?}", event.payload.event_source);
     
       let records = &event.payload.events;
      
       if records.is_empty() {
           tracing::info!("No records found. Exiting.");
           return Ok(());
       }
   
       for record in records{
           log_document_db_event(record);
       }
   
       tracing::info!("Document db records processed");
   
       // Prepare the response
       Ok(())
   
   }
   
   fn log_document_db_event(record: &DocumentDbInnerEvent)-> Result<(), Error>{
       tracing::info!("Change Event: {:?}", record.event);
       
       Ok(())
   
   }
   
   #[tokio::main]
   async fn main() -> Result<(), Error> {
       tracing_subscriber::fmt()
       .with_max_level(tracing::Level::INFO)
       .with_target(false)
       .without_time()
       .init();
   
       let func = service_fn(function_handler);
       lambda_runtime::run(func).await?;
       Ok(())
       
   }
   ```

------

1. 在 Lambda 控制台的**代码源**窗格中，将代码粘贴到代码编辑器中，替换 Lambda 创建的代码。

1. 在**部署**部分，选择**部署**以更新函数的代码：  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/getting-started-tutorial/deploy-console.png)

## 创建 Lambda 事件源映射
<a name="docdb-create-the-lambda-event-source-mapping"></a>

 创建事件源映射，将 Amazon DocumentDB 更改流与 Lambda 函数相关联。创建此事件源映射后，AWS Lambda 即开始轮询该流。

**创建事件源映射**

1. 在 Lambda 控制台中打开[函数页面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 选择您之前创建的 `ProcessDocumentDBRecords` 函数。

1. 选择**配置**选项卡，然后从左侧菜单中选择**触发器**。

1. 选择**添加触发器**。

1. 在**触发器配置**下，为源选择 **Amazon DocumentDB**。

1. 使用以下配置创建事件源映射：
   + **Amazon DocumentDB 集群**：选择之前创建的集群。
   + **数据库名称**：docdbdemo
   + **集合名称**：产品
   + **批处理大小**：1
   + **起始位置**：最新
   + **身份验证**：BASIC\$1AUTH
   + **Secrets Manager 密钥**：选择 Amazon DocumentDB 集群的密钥。其名称类似于 `rds!cluster-12345678-a6f0-52c0-b290-db4aga89274f`。
   + **批处理时段**：1
   + **完整文档配置**：UpdateLookup

1. 选择**添加**。创建事件源映射可能需要花费几分钟的时间。

## 测试函数
<a name="docdb-test-insert"></a>

等待事件源映射达到**已启用**状态。这个过程可能需要几分钟。然后，通过插入、更新和删除数据库记录来测试端到端设置。开始前的准备工作：

1. 在 CloudShell 环境中[重新连接到 Amazon DocumentDB 集群](#docdb-connect-to-cluster)。

1. 运行以下命令以确保使用的是 `docdbdemo` 数据库：

   ```
   use docdbdemo
   ```

### 插入记录
<a name="docdb-test-insert"></a>

在 `docdbdemo` 数据库的 `products` 集合中插入记录：

```
db.products.insertOne({"name":"Pencil", "price": 1.00})
```

通过[查看 CloudWatch Logs](monitoring-cloudwatchlogs-view.md#monitoring-cloudwatchlogs-console) 来验证函数是否成功处理该事件。您会看到如下日志条目：

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/documentdb-insert-log.png)


### 更新记录
<a name="docdb-test-update"></a>

使用以下命令更新您刚刚插入的记录：

```
db.products.updateOne(
    { "name": "Pencil" },
    { $set: { "price": 0.50 }}
)
```

通过[查看 CloudWatch Logs](monitoring-cloudwatchlogs-view.md#monitoring-cloudwatchlogs-console) 来验证函数是否成功处理该事件。您会看到如下日志条目：

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/documentdb-update-log.png)


### 删除记录
<a name="docdb-test-delete"></a>

使用以下命令删除您刚刚更新的记录：

```
db.products.deleteOne( { "name": "Pencil" } )
```

通过[查看 CloudWatch Logs](monitoring-cloudwatchlogs-view.md#monitoring-cloudwatchlogs-console) 来验证函数是否成功处理该事件。您会看到如下日志条目：

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/documentdb-delete-log.png)


## 问题排查
<a name="docdb-lambda-troubleshooting"></a>

如果您在函数的 CloudWatch 日志中没有看到任何数据库事件，请检查以下各项：
+ 确保 Lambda 事件源映射（也称为触发器）处于**已启用**状态。创建事件源映射可能需要几分钟时间。
+ 如果事件源映射**已启用**，但您仍然无法在 CloudWatch 中看到数据库事件：
  + 确保事件源映射中的**数据库名称**设置为 `docdbdemo`。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/documentdb-trigger.png)
  + 检查事件源映射**上次处理结果**字段中是否显示以下消息：“问题：连接错误。您的 VPC 必须能够连接到 Lambda 和 STS，以及 Secrets Manager（如果需要身份验证）。” 如果看到此错误，请确保您[已创建 Lambda 和 Secrets Manager VPC 接口端点](#docdb-create-interface-vpc-endpoints)，并且这些端点使用与 Amazon DocumentDB 集群相同的 VPC 和子网。  
![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/documentdb-lastprocessingresult.png)

## 清除资源
<a name="docdb-cleanup"></a>

 除非您想要保留为本教程创建的资源，否则可立即将其删除。通过删除您不再使用的 AWS 资源，可防止您的 AWS 账户 产生不必要的费用。

**删除 Lambda 函数**

1. 打开 Lamba 控制台的 [Functions（函数）页面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 选择您创建的函数。

1. 依次选择**操作**和**删除**。

1. 在文本输入字段中键入 **confirm**，然后选择**删除**。

**删除执行角色**

1. 打开 IAM 控制台的[角色页面](https://console.aws.amazon.com/iam/home#/roles)。

1. 选择您创建的执行角色。

1. 选择**删除**。

1. 在文本输入字段中输入角色名称，然后选择**删除**。

**删除 VPC 端点**

1. 打开 [VPC 控制台](https://console.aws.amazon.com/vpc/home#)。在左侧菜单的**虚拟私有云**下，选择**端点**。

1. 选择您创建的端点。

1. 选择**操作**、**删除 VPC 端点**。

1. 在文本输入字段中输入 **delete**。

1. 选择**删除**。

**删除 Amazon DocumentDB 集群**

1. 打开 [Amazon DocumentDB 控制台](https://console.aws.amazon.com/docdb/home#)。

1. 选择您为本教程创建的 Amazon DocumentDB 集群，并禁用删除保护。

1. 在主**集群**页面中，再次选择 Amazon DocumentDB 集群。

1. 依次选择**操作**和**删除**。

1. 对于**创建最终集群快照**，选择**否**。

1. 在文本输入字段中输入 **delete**。

1. 选择**删除**。

**在 Secrets Manager 中删除密钥**

1. 打开 [Secrets Manager 控制台](https://console.aws.amazon.com/secretsmanager/home#)。

1. 选择您为本教程创建的密钥。

1. 依次选择**操作**、**删除密钥**。

1. 选择**计划删除**。