

# チュートリアル: Amazon DynamoDB Streams で AWS Lambda を使用する
<a name="with-ddb-example"></a>

 このチュートリアルでは、Amazon DynamoDB ストリームからのイベントを処理する Lambda 関数を作成します。

## 前提条件
<a name="with-ddb-prepare"></a>

### AWS Command Line Interface のインストール
<a name="install_aws_cli"></a>

AWS Command Line Interface をまだインストールしていない場合は、「[最新バージョンの AWS CLI のインストールまたは更新](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)」にある手順に従ってインストールしてください。

このチュートリアルでは、コマンドを実行するためのコマンドラインターミナルまたはシェルが必要です。Linux および macOS では、任意のシェルとパッケージマネージャーを使用してください。

**注記**  
Windows では、Lambda でよく使用される一部の Bash CLI コマンド (`zip` など) が、オペレーティングシステムの組み込みターミナルでサポートされていません。Ubuntu および Bash の Windows 統合バージョンを取得するには、[Windows Subsystem for Linux をインストール](https://docs.microsoft.com/en-us/windows/wsl/install-win10)します。

## 実行ロールを作成する
<a name="with-ddb-create-execution-role"></a>

AWS リソースにアクセスするためのアクセス権限を関数に付与する[実行ロール](lambda-intro-execution-role.md)を作成します。

**実行ロールを作成するには**

1. IAM コンソールの [[ロールページ](https://console.aws.amazon.com/iam/home#/roles)] を開きます。

1. [**ロールの作成**] を選択します。

1. 次のプロパティでロールを作成します。
   + **信頼されたエンティティ** – Lambda
   + **アクセス許可** - **AWSLambdaDynamoDBExecutionRole**
   + **ロール名** -**lambda-dynamodb-role** 

**AWSLambdaDynamoDBExecutionRole** には、DynamoDB から項目を読み取り、 に CloudWatch Logs ログを書き込むために、関数が必要とするアクセス許可があります。

## 関数を作成する
<a name="with-ddb-example-create-function"></a>

DynamoDB イベントを処理する Lambda 関数を作成します。関数コードは、受信イベントデータの一部を CloudWatch ログに書き込みます。

------
#### [ .NET ]

**SDK for .NET**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
.NET を使用して Lambda で DynamoDB イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");

        foreach (var record in dynamoEvent.Records)
        {
            context.Logger.LogInformation($"Event ID: {record.EventID}");
            context.Logger.LogInformation($"Event Name: {record.EventName}");

            context.Logger.LogInformation(JsonSerializer.Serialize(record));
        }

        context.Logger.LogInformation("Stream processing complete.");
    }
}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Go を使用して Lambda で DynamoDB イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-lambda-go/events"
	"fmt"
)

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) {
	if len(event.Records) == 0 {
		return nil, fmt.Errorf("received empty event")
	}

	for _, record := range event.Records {
	 	LogDynamoDBRecord(record)
	}

	message := fmt.Sprintf("Records processed: %d", len(event.Records))
	return &message, nil
}

func main() {
	lambda.Start(HandleRequest)
}

func LogDynamoDBRecord(record events.DynamoDBEventRecord){
	fmt.Println(record.EventID)
	fmt.Println(record.EventName)
	fmt.Printf("%+v\n", record.Change)
}
```

------
#### [ Java ]

**SDK for Java 2.x**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Java を使用して Lambda で DynamoDB イベントの消費。  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

public class example implements RequestHandler<DynamodbEvent, Void> {

    private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create();

    @Override
    public Void handleRequest(DynamodbEvent event, Context context) {
        System.out.println(GSON.toJson(event));
        event.getRecords().forEach(this::logDynamoDBRecord);
        return null;
    }

    private void logDynamoDBRecord(DynamodbStreamRecord record) {
        System.out.println(record.getEventID());
        System.out.println(record.getEventName());
        System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb()));
    }
}
```

------
#### [ JavaScript ]

**SDK for JavaScript (v3)**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
JavaScript を使用した Lambda での DynamoDB イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
};

const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```
TypeScript を使用した Lambda での DynamoDB イベントの消費。  

```
export const handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
}
const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```

------
#### [ PHP ]

**SDK for PHP**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
PHP を使用した Lambda での DynamoDB イベントの消費。  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\DynamoDb\DynamoDbHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends DynamoDbHandler
{
    private StderrLogger $logger;

    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleDynamoDb(DynamoDbEvent $event, Context $context): void
    {
        $this->logger->info("Processing DynamoDb table items");
        $records = $event->getRecords();

        foreach ($records as $record) {
            $eventName = $record->getEventName();
            $keys = $record->getKeys();
            $old = $record->getOldImage();
            $new = $record->getNewImage();
            
            $this->logger->info("Event Name:".$eventName."\n");
            $this->logger->info("Keys:". json_encode($keys)."\n");
            $this->logger->info("Old Image:". json_encode($old)."\n");
            $this->logger->info("New Image:". json_encode($new));
            
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }

        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords items");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK for Python (Boto3)**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Python を使用して Lambda で DynamoDB イベントの消費。  

```
import json

def lambda_handler(event, context):
    print(json.dumps(event, indent=2))

    for record in event['Records']:
        log_dynamodb_record(record)

def log_dynamodb_record(record):
    print(record['eventID'])
    print(record['eventName'])
    print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Ruby を使用して Lambda で DynamoDB イベントの消費。  

```
def lambda_handler(event:, context:)
    return 'received empty event' if event['Records'].empty?
  
    event['Records'].each do |record|
      log_dynamodb_record(record)
    end
  
    "Records processed: #{event['Records'].length}"
  end
  
  def log_dynamodb_record(record)
    puts record['eventID']
    puts record['eventName']
    puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}"
  end
```

------
#### [ Rust ]

**SDK for Rust**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Rust を使用して Lambda で DynamoDB イベントを利用します。  

```
use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord},
   };


// Built with the following dependencies:
//lambda_runtime = "0.11.1"
//serde_json = "1.0"
//tokio = { version = "1", features = ["macros"] }
//tracing = { version = "0.1", features = ["log"] }
//tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
//aws_lambda_events = "0.15.0"

async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> {
    
    let records = &event.payload.records;
    tracing::info!("event payload: {:?}",records);
    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    for record in records{
        log_dynamo_dbrecord(record);
    }

    tracing::info!("Dynamo db records processed");

    // Prepare the response
    Ok(())

}

fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{
    tracing::info!("EventId: {}", record.event_id);
    tracing::info!("EventName: {}", record.event_name);
    tracing::info!("DynamoDB Record: {:?}", record.change );
    Ok(())

}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
    .with_max_level(tracing::Level::INFO)
    .with_target(false)
    .without_time()
    .init();

    let func = service_fn(function_handler);
    lambda_runtime::run(func).await?;
    Ok(())
    
}
```

------

**関数を作成するには**

1. サンプルコードを `example.js` という名前のファイルにコピーします。

1. デプロイパッケージを作成します。

   ```
   zip function.zip example.js
   ```

1. `create-function` コマンドを使用して Lambda 関数を作成します。

   ```
   aws lambda create-function --function-name ProcessDynamoDBRecords \
       --zip-file fileb://function.zip --handler example.handler --runtime nodejs24.x \
       --role arn:aws:iam::111122223333:role/lambda-dynamodb-role
   ```

## Lambda 関数をテストする
<a name="with-dbb-invoke-manually"></a>

このセットアップでは、`invoke`AWS Lambda の CLI コマンドと次のサンプルの DynamoDB イベントを使用して、Lambda 関数を手動で呼び出します。次の内容を `input.txt` という名前のファイルにコピーします。

**Example input.txt**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ]
}
```

次の `invoke` コマンドを実行します。

```
aws lambda invoke --function-name ProcessDynamoDBRecords \
    --cli-binary-format raw-in-base64-out \
    --payload file://input.txt outputfile.txt
```

AWS CLI バージョン 2 を使用している場合、**cli-binary-format** オプションは必須です。これをデフォルト設定にするには、`aws configure set cli-binary-format raw-in-base64-out` を実行します。詳細については、「*AWS Command Line Interface バージョン 2 用ユーザーガイド*」の「[AWS CLI でサポートされているグローバルコマンドラインオプション](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list)」を参照してください。

この関数はレスポンス本文で文字列 `message` を返します。

`outputfile.txt` ファイルで出力を確認します。

## ストリーミングが有効になった DynamoDB テーブルを作成する
<a name="with-ddb-create-buckets"></a>

ストリーミングが有効な Amazon DynamoDB テーブルを作成します。

**DynamoDB テーブルを作成するには**

1. [DynamoDB コンソール](https://console.aws.amazon.com/dynamodb)を開きます。

1. [**Create table**] を選択します。

1. 次の設定でテーブルを作成します。
   + **テーブル名** – **lambda-dynamodb-stream**
   + **プライマリキー** – **id** (文字列)

1. [**作成**] を選択します。

**ストリームを有効化するには**

1. [DynamoDB コンソール](https://console.aws.amazon.com/dynamodb)を開きます。

1. [**テーブル**] を選択します。

1. [**lambda-dynamodb-stream**] テーブルを選択します。

1. [**Exports and streams**] (エクスポートとストリーミング)で、[**DynamoDB stream details**] (DynamoDB ストリーミングの詳細) を選択します。

1. **[オンにする]** を選択します。

1. **[ビュータイプ]** には、**[キー属性のみ]** を選択します。

1. **[ストリームをオンにする]** を選択します。

ストリーム ARN をメモします。こちらは、次のステップでストリームを Lambda 関数に関連付ける際に必要になります。ストリームの有効化の詳細については、「[DynamoDB Streams を使用したテーブルアクティビティのキャプチャ](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)」を参照してください。

## AWS Lambda でイベントソースを追加する
<a name="with-ddb-attach-notification-configuration"></a>

AWS Lambda でイベントソースマッピングを作成します。このイベントソースのマッピングは、DynamoDB ストリームを Lambda 関数に関連付けます。このイベントソースのマッピングを作成すると、AWS Lambda はストリームのポーリングを開始します。

次の AWS CLI `create-event-source-mapping` コマンドを実行します。コマンドの実行後、UUID をメモします。この UUID は、イベントソースマッピングを削除するときなど、コマンドでイベントソースマッピングを参照する場合に必要になります。

```
aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \
    --batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn
```

 これにより、指定された DynamoDB ストリームと Lambda 関数の間にマッピングが作成されます。1 つの DynamoDB ストリームを複数の Lambda 関数と関連付けたり、1 つの Lambda 関数を複数のストリームに関連付けたりすることができます。ただし、Lambda 関数は、共有するストリーム用に、読み取りスループットを共有します。

次のコマンドを実行して、イベントソースのマッピングのリストを取得できます。

```
aws lambda list-event-source-mappings
```

このリストでは、作成済みのすべてのイベントソースのマッピングが返され、各マッピングに対して `LastProcessingResult` などが示されます。問題がある場合、このフィールドは情報メッセージを提供するために使用されます。`No records processed` (AWS Lambda がポーリングを開始していないか、ストリームにレコードがないことを示す) や、`OK` (AWS Lambda がストリームから正常にレコードを読み取り、Lambda 関数を呼び出したことを示す) などの値は、問題がないことを示しています。問題がある場合は、エラーメッセージが返されます。

イベントソースマッピングが多数ある場合、関数の name パラメータを使用して結果を絞り込みます。

```
aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords
```

## セットアップをテストする
<a name="with-ddb-final-integration-test-no-iam"></a>

エンドツーエンドエクスペリエンスをテストします。テーブルの更新を実行すると、DynamoDB はイベントレコードをストリームに書き込みます。ストリームをポーリングしている AWS Lambda は、ストリームで新しいレコードを検出し、イベントを Lambda 関数に渡して、ユーザーに代わって関数を実行します。

1. DynamoDB コンソールで、テーブルに項目を追加、更新、削除します。DynamoDB は、これらのアクションのレコードをストリームに書き込みます。

1. AWS Lambda は、ストリームをポーリングし、ストリームの更新を検出すると、ストリームで見つかったイベントデータを渡して Lambda 関数を呼び出します。

1. 関数が実行され、Amazon CloudWatch にログが作成されます。報告されたログは Amazon CloudWatch コンソールで確認できます。

## 次のステップ
<a name="with-ddb-next-steps"></a>

このチュートリアルでは、Lambda で DynamoDB ストリームイベントを処理するための基本について説明しました。本番ワークロードでは、部分的なバッチレスポンスロジックの実装を考慮し、個々のレコード障害処理の効率化を図ってください。Powertools for AWS Lambda の[バッチプロセッサユーティリティ](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)は、Python、TypeScript、.NET、Java で利用可能で、これに対する堅牢なソリューションを提供し、部分的なバッチレスポンスの複雑さを自動的に処理し、正常に処理されたレコードの再試行回数を減らします。

## リソースのクリーンアップ
<a name="cleanup"></a>

このチュートリアル用に作成したリソースは、保持しない場合は削除できます。使用しなくなった AWS リソースを削除することで、AWS アカウント アカウントに請求される料金の発生を防ぎます。

**Lambda 関数を削除するには**

1. Lambda コンソールの [[関数]](https://console.aws.amazon.com/lambda/home#/functions) ページを開きます。

1. 作成した関数を選択します。

1. **[アクション]** で、**[削除]** を選択します。

1. テキスト入力フィールドに **confirm** と入力し、**[削除]** を選択します。

**実行ロールを削除する**

1. IAM コンソールの [[ロール]](https://console.aws.amazon.com/iam/home#/roles) ページを開きます。

1. 作成した実行ロールを選択します。

1. **[削除]** を選択します。

1. テキスト入力フィールドにロールの名前を入力し、**[Delete]** (削除) を選択します。

**DynamoDB テーブルを削除するには**

1. DynamoDB コンソールで [[Tables (テーブル)] ページ](https://console.aws.amazon.com//dynamodb/home#tables:)を開きます。

1. 作成したテーブルを選択します。

1. [**削除**] を選択します。

1. テキストボックスに「**delete**」と入力します。

1. **[テーブルの削除]** を選択します。