

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 教學課程： AWS Lambda 搭配 Amazon DynamoDB 串流使用
<a name="with-ddb-example"></a>

 在此教學課程中，您建立 Lambda 函數以從 Amazon DynamoDB Streams 中取用事件。

## 先決條件
<a name="with-ddb-prepare"></a>

### 安裝 AWS Command Line Interface
<a name="install_aws_cli"></a>

如果您尚未安裝 AWS Command Line Interface，請依照[安裝或更新最新版本的 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)中的步驟進行安裝。

本教學課程需使用命令列終端機或 Shell 來執行命令。在 Linux 和 macOS 中，使用您偏好的 Shell 和套件管理工具。

**注意**  
在 Windows 中，作業系統的內建終端不支援您常與 Lambda 搭配使用的某些 Bash CLI 命令 (例如 `zip`)。若要取得 Ubuntu 和 Bash 的 Windows 整合版本，請[安裝適用於 Linux 的 Windows 子系統](https://docs.microsoft.com/en-us/windows/wsl/install-win10)。

## 建立執行角色
<a name="with-ddb-create-execution-role"></a>

建立 [執行角色](lambda-intro-execution-role.md)，授予函數存取 AWS 資源的許可。

**若要建立執行角色**

1. 在 IAM 主控台中開啟[角色頁面](https://console.aws.amazon.com/iam/home#/roles)。

1. 選擇建**立角色**。

1. 建立具備下列屬性的角色。
   + **信任實體** - Lambda。
   + **許可** - **AWSLambdaDynamoDBExecutionRole**。
   + **角色名稱** - **lambda-dynamodb-role**。

**AWSLambdaDynamoDBExecutionRole** 具備函數自 DynamoDB 讀取項目以及寫入日誌到 CloudWatch Logs 時所需的許可。

## 建立函數
<a name="with-ddb-example-create-function"></a>

建立 Lambda 函數來處理 DynamoDB 事件。函數程式碼將一些傳入的事件資料寫入 CloudWatch Logs。

------
#### [ .NET ]

**適用於 .NET 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)儲存庫中設定和執行。
使用 .NET 搭配 Lambda 來使用 DynamoDB 事件。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");

        foreach (var record in dynamoEvent.Records)
        {
            context.Logger.LogInformation($"Event ID: {record.EventID}");
            context.Logger.LogInformation($"Event Name: {record.EventName}");

            context.Logger.LogInformation(JsonSerializer.Serialize(record));
        }

        context.Logger.LogInformation("Stream processing complete.");
    }
}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)儲存庫中設定和執行。
使用 Go 搭配 Lambda 來使用 DynamoDB 事件。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-lambda-go/events"
	"fmt"
)

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) {
	if len(event.Records) == 0 {
		return nil, fmt.Errorf("received empty event")
	}

	for _, record := range event.Records {
	 	LogDynamoDBRecord(record)
	}

	message := fmt.Sprintf("Records processed: %d", len(event.Records))
	return &message, nil
}

func main() {
	lambda.Start(HandleRequest)
}

func LogDynamoDBRecord(record events.DynamoDBEventRecord){
	fmt.Println(record.EventID)
	fmt.Println(record.EventName)
	fmt.Printf("%+v\n", record.Change)
}
```

------
#### [ Java ]

**適用於 Java 2.x 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)儲存庫中設定和執行。
使用 Java 搭配 Lambda 來使用 DynamoDB 事件。  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

public class example implements RequestHandler<DynamodbEvent, Void> {

    private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create();

    @Override
    public Void handleRequest(DynamodbEvent event, Context context) {
        System.out.println(GSON.toJson(event));
        event.getRecords().forEach(this::logDynamoDBRecord);
        return null;
    }

    private void logDynamoDBRecord(DynamodbStreamRecord record) {
        System.out.println(record.getEventID());
        System.out.println(record.getEventName());
        System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb()));
    }
}
```

------
#### [ JavaScript ]

**適用於 JavaScript (v3) 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)儲存庫中設定和執行。
使用 JavaScript 搭配 Lambda 來使用 DynamoDB 事件。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
};

const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```
使用 TypeScript 搭配 Lambda 來使用 DynamoDB 事件。  

```
export const handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
}
const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```

------
#### [ PHP ]

**適用於 PHP 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)儲存庫中設定和執行。
使用 PHP 搭配 Lambda 來使用 DynamoDB 事件。  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\DynamoDb\DynamoDbHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends DynamoDbHandler
{
    private StderrLogger $logger;

    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleDynamoDb(DynamoDbEvent $event, Context $context): void
    {
        $this->logger->info("Processing DynamoDb table items");
        $records = $event->getRecords();

        foreach ($records as $record) {
            $eventName = $record->getEventName();
            $keys = $record->getKeys();
            $old = $record->getOldImage();
            $new = $record->getNewImage();
            
            $this->logger->info("Event Name:".$eventName."\n");
            $this->logger->info("Keys:". json_encode($keys)."\n");
            $this->logger->info("Old Image:". json_encode($old)."\n");
            $this->logger->info("New Image:". json_encode($new));
            
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }

        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords items");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**適用於 Python (Boto3) 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)儲存庫中設定和執行。
使用 Python 搭配 Lambda 來使用 DynamoDB 事件。  

```
import json

def lambda_handler(event, context):
    print(json.dumps(event, indent=2))

    for record in event['Records']:
        log_dynamodb_record(record)

def log_dynamodb_record(record):
    print(record['eventID'])
    print(record['eventName'])
    print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)儲存庫中設定和執行。
使用 Ruby 搭配 Lambda 來使用 DynamoDB 事件。  

```
def lambda_handler(event:, context:)
    return 'received empty event' if event['Records'].empty?
  
    event['Records'].each do |record|
      log_dynamodb_record(record)
    end
  
    "Records processed: #{event['Records'].length}"
  end
  
  def log_dynamodb_record(record)
    puts record['eventID']
    puts record['eventName']
    puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}"
  end
```

------
#### [ Rust ]

**適用於 Rust 的 SDK**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在[無伺服器範例](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)儲存庫中設定和執行。
使用 Rust 搭配 Lambda 來使用 DynamoDB 事件。  

```
use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord},
   };


// Built with the following dependencies:
//lambda_runtime = "0.11.1"
//serde_json = "1.0"
//tokio = { version = "1", features = ["macros"] }
//tracing = { version = "0.1", features = ["log"] }
//tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
//aws_lambda_events = "0.15.0"

async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> {
    
    let records = &event.payload.records;
    tracing::info!("event payload: {:?}",records);
    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    for record in records{
        log_dynamo_dbrecord(record);
    }

    tracing::info!("Dynamo db records processed");

    // Prepare the response
    Ok(())

}

fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{
    tracing::info!("EventId: {}", record.event_id);
    tracing::info!("EventName: {}", record.event_name);
    tracing::info!("DynamoDB Record: {:?}", record.change );
    Ok(())

}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
    .with_max_level(tracing::Level::INFO)
    .with_target(false)
    .without_time()
    .init();

    let func = service_fn(function_handler);
    lambda_runtime::run(func).await?;
    Ok(())
    
}
```

------

**建立函數**

1. 將範本程式碼複製到名為 `example.js` 的檔案。

1. 建立部署套件。

   ```
   zip function.zip example.js
   ```

1. 使用 `create-function` 命令建立一個 Lambda 函數。

   ```
   aws lambda create-function --function-name ProcessDynamoDBRecords \
       --zip-file fileb://function.zip --handler example.handler --runtime nodejs24.x \
       --role arn:aws:iam::111122223333:role/lambda-dynamodb-role
   ```

## 測試 Lambda 函數
<a name="with-dbb-invoke-manually"></a>

在此步驟中，您會使用 `invoke` AWS Lambda CLI 命令和下列範例 DynamoDB 事件手動叫用 Lambda 函數。將下列內容複製到名為 `input.txt` 的檔案。

**Example input.txt**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ]
}
```

執行以下 `invoke` 命令。

```
aws lambda invoke --function-name ProcessDynamoDBRecords \
    --cli-binary-format raw-in-base64-out \
    --payload file://input.txt outputfile.txt
```

如果您使用的是第 2 AWS CLI 版，則需要 **cli-binary-format**選項。若要讓此成為預設的設定，請執行 `aws configure set cli-binary-format raw-in-base64-out`。若要取得更多資訊，請參閱*《AWS Command Line Interface 使用者指南第 2 版》*中 [AWS CLI 支援的全域命令列選項](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list)。

該函數會在回應本文中傳回字串 `message` 訊息。

在 `outputfile.txt` 檔案中確認輸出。

## 建立啟用串流的 DynamoDB 資料表
<a name="with-ddb-create-buckets"></a>

建立啟用串流的 Amazon DynamoDB 資料表。

**建立 DynamoDB 資料表**

1. 開啟 [DynamoDB 主控台](https://console.aws.amazon.com/dynamodb)。

1. 選擇 **Create Table** (建立資料表)。

1. 根據下列設定建立資料表。
   + **Table name (資料表名稱)** – **lambda-dynamodb-stream** 
   + **Primary key** (主要金鑰) - **id** (字串)

1. 選擇**建立**。

**啟用串流**

1. 開啟 [DynamoDB 主控台](https://console.aws.amazon.com/dynamodb)。

1. 選擇 **資料表**。

1. 選擇 **lambda-dynamodb-stream** 資料表。

1. 在 **匯出與串流** 下，選擇 **DynamoDB 串流詳細資訊** 。

1. 選擇 **Turn on (開啟)**。

1. 對於**檢視類型**，選擇**僅索引鍵屬性**。

1. 選擇**開啟串流**。

寫下串流 ARN。在下個步驟將串流與您的 Lambda 函數相關聯時會需要用到它。如需啟用串流的詳細資訊，請參閱[使用 DynamoDB Streams 擷取資料表活動](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)。

## 在 中新增事件來源 AWS Lambda
<a name="with-ddb-attach-notification-configuration"></a>

在 AWS Lambda中建立事件來源映射。事件來源映射可將 DynamoDB 串流與您的 Lambda 函數相關聯。建立此事件來源映射後， 會 AWS Lambda 開始輪詢串流。

執行下列 AWS CLI `create-event-source-mapping` 命令。命令執行後，請記下 UUID。使用任何命令時，您會需要此 UUID 以參考到事件來源映射，例如當刪除事件來源映射的時候。

```
aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \
    --batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn
```

 這會在指定的 DynamoDB 串流和 Lambda 函數之間建立映射。您可以將 DynamoDB 串流與多個 Lambda 函數相關聯，也可以將相同的 Lambda 函數與多個串流相關聯。但是 Lambda 函數會共用其所共有之串流的讀取傳送量。

您可以執行下列命令來取得事件來源映射的清單。

```
aws lambda list-event-source-mappings
```

該清單傳回所有您建立的事件來源映射，並針對每個映射顯示 `LastProcessingResult` 和其他內容。此欄位可用在發生問題時，提供資訊豐富的訊息。例如 `No records processed`（表示 AWS Lambda 尚未開始輪詢或串流中沒有記錄） 和 `OK`（表示 AWS Lambda 成功從串流讀取記錄並叫用您的 Lambda 函數） 的值表示沒有問題。如果發生問題，您會收到錯誤訊息。

如果您有許多事件來源映射，請使用函數式稱參數來縮小結果。

```
aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords
```

## 測試設定
<a name="with-ddb-final-integration-test-no-iam"></a>

測試端對端的體驗。當您更新資料表時，DynamoDB 會將事件記錄寫入串流。當 AWS Lambda 輪詢串流時，若偵測到串流上的新記錄，便會透過傳送事件到函數來代表您調用 Lambda 函數。

1. 在 DynamoDB 主控台上針對資料表新增、更新、刪除項目。DynamoDB 會將這些動作的記錄寫入串流。

1. AWS Lambda 會輪詢串流，並在偵測到串流的更新時，透過傳入在串流中找到的事件資料來叫用 Lambda 函數。

1. 您的函數會執行並在 Amazon CloudWatch 中建立日誌。您可以驗證在 Amazon CloudWatch 主控台中報告的日誌。

## 後續步驟
<a name="with-ddb-next-steps"></a>

此教學課程介紹了使用 Lambda 處理 DynamoDB 串流事件的基礎知識。對於生產工作負載，建議實作部分批次回應邏輯，更有效率地處理個別記錄失敗。適用於 的 Powertools 批次[處理器公用程式](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) AWS Lambda 可在 Python、TypeScript、.NET 和 Java 中使用，並為此提供強大的解決方案，可自動處理部分批次回應的複雜性，並減少成功處理記錄的重試次數。

## 清除您的資源
<a name="cleanup"></a>

除非您想要保留為此教學課程建立的資源，否則您現在便可刪除。透過刪除您不再使用 AWS 的資源，您可以避免不必要的 費用 AWS 帳戶。

**若要刪除 Lambda 函數**

1. 開啟 Lambda 主控台中的 [函數頁面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 選擇您建立的函數。

1. 選擇 **Actions** (動作)、**Delete** (刪除)。

1. 在文字輸入欄位中輸入 **confirm**，然後選擇**刪除**。

**刪除執行角色**

1. 開啟 IAM 主控台中的 [角色頁面](https://console.aws.amazon.com/iam/home#/roles) 。

1. 選取您建立的執行角色。

1. 選擇**刪除**。

1. 在文字輸入欄位中輸入角色的名稱，然後選擇 **刪除** 。

**若要刪除 DynamoDB 資料表**

1. 開啟 DynamoDB 主控台的 [資料表頁面](https://console.aws.amazon.com//dynamodb/home#tables:) 。

1. 選取您建立的資料表。

1. 選擇 **刪除** 。

1. 在文字方塊中輸入 **delete**。

1. 選擇 **刪除資料表** 。