

# Amazon DynamoDB で AWS Lambda を使用する
<a name="with-ddb"></a>

**注記**  
Lambda 関数以外のターゲットにデータを送信したい、または送信する前にデータをエンリッチしたいという場合は、「[Amazon EventBridge Pipes](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)」を参照してください。

AWS Lambda 関数を使用して、[Amazon DynamoDB ストリーム](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)のレコードを処理します。DynamoDB Streams では、Lambda 関数を使用して、DynamoDB テーブルが更新されるたびに追加の作業を実行することができます。

DynamoDB ストリームを処理するときは、部分的なバッチレスポンスロジックを実装して、バッチ内の一部のレコードで障害が発生した場合に正常に処理されたレコードが再試行されないようにする必要があります。Powertools for AWS Lambda の[バッチプロセッサユーティリティ](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)は、Python、TypeScript、.NET、Java で利用でき、部分的なバッチレスポンスロジックを自動的に処理して開発時間を短縮し、信頼性を向上させることで、この実装を簡素化します。

**Topics**
+ [ポーリングストリームとバッチストリーム](#dynamodb-polling-and-batching)
+ [ポーリングとストリームの開始位置](#dyanmo-db-stream-poll)
+ [DynamoDB Streams でのシャードの同時読み込み](#events-dynamodb-simultaneous-readers)
+ [イベントの例](#events-sample-dynamodb)
+ [Lambda で DynamoDB レコードを処理する](services-dynamodb-eventsourcemapping.md)
+ [DynamoDB と Lambda を使用した部分的なバッチレスポンスの設定](services-ddb-batchfailurereporting.md)
+ [Lambda で DynamoDB イベントソースの破棄されたレコードを保持する](services-dynamodb-errors.md)
+ [Lambda での ステートフル DynamoDB ストリーム処理の実装](services-ddb-windows.md)
+ [Amazon DynamoDB イベントソースマッピング用の Lambda パラメータ](services-ddb-params.md)
+ [DynamoDB イベントソースでのイベントフィルタリングの使用](with-ddb-filtering.md)
+ [チュートリアル: Amazon DynamoDB Streams で AWS Lambda を使用する](with-ddb-example.md)

## ポーリングストリームとバッチストリーム
<a name="dynamodb-polling-and-batching"></a>

Lambda は、レコードの DynamoDB ストリームにあるシャードを 1 秒あたり 4 回の基本レートでポーリングします。レコードが利用可能になると、Lambda は関数を呼び出し、結果を待機します。処理が成功すると、Lambda は、レコードをさらに受け取るまでポーリングを再開します。

デフォルトで、Lambda はレコードが使用可能になると同時に関数を呼び出します。Lambda がイベントソースから読み取るバッチにレコードが 1 つしかない場合、Lambda は関数に 1 つのレコードしか送信しません。少数のレコードで関数を呼び出さないようにするには、*バッチ処理ウィンドウ*を設定することで、最大 5 分間レコードをバッファリングするようにイベントソースに指示できます。関数を呼び出す前に、Lambda は、完全なバッチを収集する、バッチ処理ウィンドウの期限が切れる、またはバッチが 6 MB のペイロード制限に到達するまでイベントソースからのレコードの読み取りを継続します。詳細については、「[バッチ処理動作](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)」を参照してください。

**警告**  
Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、レコードの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードを冪等にすることを強くお勧めします。詳細については、AWS ナレッジセンターの「[Lambda 関数を冪等にするにはどうすればよいですか?](https://repost.aws/knowledge-center/lambda-function-idempotent)」を参照してください。

Lambda は、設定された[拡張機能](lambda-extensions.md)の完了を待たずに、次のバッチを処理のために送信します。つまり、Lambda がレコードの次のバッチを処理する間も拡張機能を実行し続けることができます。アカウントの[同時実行](lambda-concurrency.md)設定または制限に違反すると、スロットリングの問題が発生する可能性があります。これが潜在的な問題であるかどうかを検出するには、関数を監視し、イベントソースマッピングで予想よりも高い[同時実行メトリクス](monitoring-concurrency.md#general-concurrency-metrics)が表示されているかどうかを確認します。呼び出しの間隔が短いため、Lambda はシャードの数よりも高い同時実行使用量を一時的に報告する場合があります。これは、拡張機能のない Lambda 関数にも当てはまります。

DynamoDB ストリームの 1 つのシャードを複数の Lambda 呼び出しで同時に処理するには、[ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor) 設定を行います。Lambda がシャードからポーリングする同時バッチの数は、1 (デフォルト)～10 の並列化係数で指定できます。例えば、`ParallelizationFactor` を 2 に設定すると、最大 200 個の Lambda 呼び出しを同時に実行して、DynamoDB ストリームのシャードを 100 個処理できます (ただし、`ConcurrentExecutions` メトリクスに異なる値が表示される場合があります)。これにより、データボリュームが揮発性で [IteratorAge](monitoring-metrics-types.md#performance-metrics) が高い場合に、処理のスループットをスケールアップすることができます。シャードごとの同時実行バッチの数を増やしても、Lambda はアイテム (パーティションおよびソートキー) レベルで順序立った処理を確実に行います。

## ポーリングとストリームの開始位置
<a name="dyanmo-db-stream-poll"></a>

イベントソースマッピングの作成時および更新時のストリームのポーリングは、最終的に一貫性があることに注意してください。
+ イベントソースマッピングの作成時、ストリームからのイベントのポーリングが開始されるまでに数分かかる場合があります。
+ イベントソースマッピングの更新時、ストリームからのイベントのポーリングが停止および再開されるまでに数分かかる場合があります。

つまり、`LATEST` をストリームの開始位置として指定すると、イベントソースマッピングの作成または更新中にイベントを見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を `TRIM_HORIZON` として指定します。

## DynamoDB Streams でのシャードの同時読み込み
<a name="events-dynamodb-simultaneous-readers"></a>

単一リージョンのテーブルがグローバルテーブルでない場合、同じ DynamoDB Streams のシャードから、同時に 2 つまでの Lambda 関数を読み込むように設計できます。この制限を超えると、リクエストのスロットリングが発生する場合があります。グローバルテーブルでは、リクエストのスロットリングを回避するために、同時関数の数を 1 に制限することをお勧めします。

## イベントの例
<a name="events-sample-dynamodb"></a>

**Example**  

```
{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "2",
      "eventVersion": "1.0",
      "dynamodb": {
        "OldImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "SequenceNumber": "222",
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 59,
        "NewImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "MODIFY",
      "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525",
      "eventSource": "aws:dynamodb"
    }
  ]}
```

# Lambda で DynamoDB レコードを処理する
<a name="services-dynamodb-eventsourcemapping"></a>

イベントソースマッピングを作成し、ストリームから Lambda 関数にレコードを送信するように Lambda に通知します。複数のイベントソースマッピングを作成することで、複数の Lambda 関数で同じデータを処理したり、1 つの関数で複数のストリームの項目を処理したりできます。

別の AWS アカウント のストリームからのレコードを処理するようにイベント ソース マッピングを構成できます。詳細については[クロスアカウントのイベントソースマッピングの作成](#services-dynamodb-eventsourcemapping-cross-account)を参照してください。

DynamoDB ストリームから読み取るように関数を設定するには、「[AWSLambdaDynamoDBExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaDynamoDBExecutionRole.html)」 AWS 管理ポリシーを実行ロールにアタッチし、**[DynamoDB]** トリガーを作成します。

**アクセス許可を追加してトリガーを作成するには**

1. Lambda コンソールの[関数ページ](https://console.aws.amazon.com/lambda/home#/functions)を開きます。

1. 関数の名前を選択します。

1. **[設定]** タブを開き、次に **[アクセス権限]** をクリックします。

1. **[実行ロール]** で、実行ロールのリンクを選択します。このリンクを選択すると、IAM コンソールでロールが開きます。  
![\[\]](http://docs.aws.amazon.com/ja_jp/lambda/latest/dg/images/execution-role.png)

1. **[アクセス許可を追加]**、**[ポリシーをアタッチ]** の順に選択します。  
![\[\]](http://docs.aws.amazon.com/ja_jp/lambda/latest/dg/images/attach-policies.png)

1. [検索] フィールドに `AWSLambdaDynamoDBExecutionRole` を入力します。実行ロールにポリシーを追加 関数が DynamoDB ストリームから読み取るために必要な許可を含む AWS 管理ポリシーです。このポリシーの詳細については、「*AWS 管理ポリシーリファレンス*」の「[AWSLambdaDynamoDBExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaDynamoDBExecutionRole.html)」を参照してください。

1. Lambda コンソールの関数に戻ります。**[関数の概要]** で **[トリガーを追加]** をクリックします。  
![\[\]](http://docs.aws.amazon.com/ja_jp/lambda/latest/dg/images/add-trigger.png)

1. トリガーのタイプを選択します。

1. 必須のオプションを設定し、[**Add**] (追加) を選択します。

Lambda は、DynamoDB イベントソースの次のオプションをサポートしています。

**イベントソースオプション**
+ **DynamoDB テーブル** - レコードの読み取り元の DynamoDB テーブル。
+ **バッチサイズ** - 各バッチで関数に送信されるレコードの数。最大 10,000。Lambda は、イベントの合計サイズが同期呼び出しの[ペイロード上限](gettingstarted-limits.md) (6 MB) を超えない限り、バッチ内のすべてのレコードを単一の呼び出しで関数に渡します。
+ **バッチウィンドウ** - 関数を呼び出す前にレコードを収集する最大時間（秒数）を指定します。
+ **開始位置** - 新規レコードのみ、または既存のすべてのレコードを処理します。
  + **最新** - ストリームに追加された新しいレコードを処理します。
  + **水平トリム** - ストリーム内のすべてのレコードを処理します。

  既存のレコードを処理した後、関数に戻り、新しいレコードの処理が続行されます。
+ **[障害発生時の宛先]** — 処理できないレコードの標準 SQS キューまたは標準 SNS トピックです。Lambda は、古すぎる、または再試行回数の上限に達したレコードのバッチを廃棄すると、バッチに関する詳細をキューまたはトピックに送信します。
+ **再試行回数** - 関数がエラーを返したときに Lambda が再試行する回数の上限です。これは、バッチが関数に到達しなかったサービスエラーやスロットルには適用されません。
+ **レコードの最大有効期間** — Lambda が関数に送信するレコードの最大経過時間。
+ **エラー発生時のバッチ分割** — 関数がエラーを返した場合、再試行する前にバッチを 2 つに分割します。元のバッチサイズ設定は変更されません。
+ **シャードごとの同時バッチ** — 同じシャードからの複数のバッチを同時に処理します。
+ **有効** - イベントソースマッピングを有効にするには、true に設定します。レコードの処理を停止するには、false に設定します。Lambda は、処理された最新のレコードを追跡し、マッピングが再度有効になるとその時点から処理を再開します。

**注記**  
DynamoDB トリガーの一部として Lambda によって呼び出される GetRecords API コールに対しては、料金は発生しません。

後でイベントソース設定を管理するには、デザイナーでトリガーを選択します。

## クロスアカウントのイベントソースマッピングの作成
<a name="services-dynamodb-eventsourcemapping-cross-account"></a>

Amazon DynamoDB は、[リソースベースのポリシー](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/access-control-resource-based.html)をサポートするようになりました。この機能を使用すると、別のアカウントの Lambda 関数を使用して、DynamoDB ストリームからのデータを 1 つの AWS アカウント で処理できます。

別の AWS アカウント の DynamoDB ストリームを使用して Lambda 関数のイベントソースマッピングを作成するには、リソースベースのポリシーを使用してストリームを設定し、Lambda 関数にレコードを読み取るアクセス許可を付与する必要があります。クロスアカウントアクセスを許可するようにストリームを設定する方法については、「*Amazon DynamoDB デベロッパーガイド*」の「[クロスアカウントの Lambda 関数を使用したアクセスを共有する](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/rbac-cross-account-access.html#rbac-analyze-cross-account-lambda-access)」を参照してください。

Lambda 関数に必要なアクセス許可を付与するリソースベースのポリシーでストリームを設定したら、クロスアカウントストリーム ARN を使用してイベントソースマッピングを作成します。ストリーム ARN は、クロスアカウント DynamoDB コンソールのテーブルの **[エクスポートおよびストリーム]** タブにあります。

Lambda コンソールを使用する場合は、ストリーム ARN をイベントソースマッピング作成ページの DynamoDB テーブル入力フィールドに直接貼り付けます。

 **注:** クロスリージョントリガーはサポートされていません。

# DynamoDB と Lambda を使用した部分的なバッチレスポンスの設定
<a name="services-ddb-batchfailurereporting"></a>

イベントソースからストリーミングデータを使用および処理する場合、デフォルトでは、バッチが完全に成功した場合にのみ、バッチの最大シーケンス番号に Lambda チェックポイントが設定されます。Lambda は、他のすべての結果を完全な失敗として扱い、再試行の上限までバッチの処理を再試行します。ストリームからのバッチの処理中に部分的な成功を許可するには、`ReportBatchItemFailures`をオンにします 。部分的な成功を許可すると、レコードの再試行回数を減らすことができますが、成功したレコードの再試行の可能性を完全に妨げるわけではありません。

`ReportBatchItemFailures` をオンにするには、列挙値 **ReportBatchItemFailures** を [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes) リストに含めます。このリストは、関数で有効になっているレスポンスタイプを示します。このリストは、イベントソースマッピングを[作成](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html)または[更新](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html)するときに設定できます。

**注記**  
関数コードが部分的なバッチ処理失敗レスポンスを返しても、イベントソースマッピングに対して `ReportBatchItemFailures` 機能が明示的に有効化されていなければ、Lambda はこれらのレスポンスを処理しません。

## レポートの構文
<a name="streams-batchfailurereporting-syntax"></a>

バッチアイテムの失敗に関するレポートを設定する場合、`StreamsEventResponse` クラスはバッチアイテムの失敗のリストとともに返されます。`StreamsEventResponse`オブジェクトを使用して、バッチ処理で最初に失敗したレコードのシーケンス番号を返すことができます。また、正しいレスポンスシンタックスを使用して、独自のカスタムクラスを作成することもできます。次の JSON 構造体は、必要な応答構文を示しています。

```
{ 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "<SequenceNumber>"
        }
    ]
}
```

**注記**  
`batchItemFailures` 配列に複数の項目が含まれている場合、Lambda はシーケンス番号が最も小さいレコードをチェックポイントとして使用します。その後、Lambda はそのチェックポイントからすべてのレコードを再試行します。

## 成功条件と失敗の条件
<a name="streams-batchfailurereporting-conditions"></a>

次のいずれかを返すと、Lambda はバッチを完全な成功として処理します:
+ 空の`batchItemFailure`リストです。
+ null の `batchItemFailure` リスト
+ 空の `EventResponse`
+ ヌル `EventResponse`

次のいずれかを返すと、Lambda はバッチを完全な失敗として処理します:
+ 空の文字列`itemIdentifier`
+ ヌル `itemIdentifier`
+ `itemIdentifier`間違えているキー名

Lambda は、再試行戦略に基づいて失敗を再試行します。

## バッチを２分割します
<a name="streams-batchfailurereporting-bisect"></a>

呼び出しが失敗し、`BisectBatchOnFunctionError` オンになっている場合、バッチは`ReportBatchItemFailures`設定に関係なく２分割されます。

部分的なバッチ成功レスポンスを受信し、`BisectBatchOnFunctionError` と `ReportBatchItemFailures` の両方がオンになっている場合、バッチは返されたシーケンス番号で 2 分割され、Lambda は残りのレコードのみを再試行します。

部分的なバッチレスポンスロジックの実装を簡素化するには、Powertools for AWS Lambda の[バッチプロセッサユーティリティ](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)を使用することを検討してください。これらの複雑さが自動的に処理されます。

バッチで失敗したメッセージ ID のリストを返す関数コードの例を次に示します。

------
#### [ .NET ]

**SDK for .NET**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
.NET を使用して Lambda で DynamoDB のバッチアイテム失敗のレポート。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)

    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");
        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>();
        StreamsEventResponse streamsEventResponse = new StreamsEventResponse();

        foreach (var record in dynamoEvent.Records)
        {
            try
            {
                var sequenceNumber = record.Dynamodb.SequenceNumber;
                context.Logger.LogInformation(sequenceNumber);
            }
            catch (Exception ex)
            {
                context.Logger.LogError(ex.Message);
                batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber });
            }
        }

        if (batchItemFailures.Count > 0)
        {
            streamsEventResponse.BatchItemFailures = batchItemFailures;
        }

        context.Logger.LogInformation("Stream processing complete.");
        return streamsEventResponse;
    }
}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Go を使用して Lambda で DynamoDB のバッチアイテム失敗のレポート。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

type BatchItemFailure struct {
	ItemIdentifier string `json:"ItemIdentifier"`
}

type BatchResult struct {
	BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"`
}

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) {
	var batchItemFailures []BatchItemFailure
	curRecordSequenceNumber := ""

	for _, record := range event.Records {
		// Process your record
		curRecordSequenceNumber = record.Change.SequenceNumber
	}

	if curRecordSequenceNumber != "" {
		batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber})
	}
	
	batchResult := BatchResult{
		BatchItemFailures: batchItemFailures,
	}

	return &batchResult, nil
}

func main() {
	lambda.Start(HandleRequest)
}
```

------
#### [ Java ]

**SDK for Java 2.x**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Java を使用した Lambda での DynamoDB のバッチアイテム失敗のレポート。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord;

import java.util.ArrayList;
import java.util.List;

public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) {

        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
        String curRecordSequenceNumber = "";

        for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) {
          try {
                //Process your record
                StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb();
                curRecordSequenceNumber = dynamodbRecord.getSequenceNumber();
                
            } catch (Exception e) {
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
                return new StreamsEventResponse(batchItemFailures);
            }
        }
       
       return new StreamsEventResponse();   
    }
}
```

------
#### [ JavaScript ]

**SDK for JavaScript (v3)**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
JavaScript を使用して Lambda で DynamoDB のバッチアイテム失敗のレポート。  

```
export const handler = async (event) => {
  const records = event.Records;
  let curRecordSequenceNumber = "";

  for (const record of records) {
    try {
      // Process your record
      curRecordSequenceNumber = record.dynamodb.SequenceNumber;
    } catch (e) {
      // Return failed record's sequence number
      return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] };
    }
  }

  return { batchItemFailures: [] };
};
```
TypeScript を使用して Lambda で DynamoDB のバッチアイテム失敗をレポートする。  

```
import {
  DynamoDBBatchResponse,
  DynamoDBBatchItemFailure,
  DynamoDBStreamEvent,
} from "aws-lambda";

export const handler = async (
  event: DynamoDBStreamEvent
): Promise<DynamoDBBatchResponse> => {
  const batchItemFailures: DynamoDBBatchItemFailure[] = [];
  let curRecordSequenceNumber;

  for (const record of event.Records) {
    curRecordSequenceNumber = record.dynamodb?.SequenceNumber;

    if (curRecordSequenceNumber) {
      batchItemFailures.push({
        itemIdentifier: curRecordSequenceNumber,
      });
    }
  }

  return { batchItemFailures: batchItemFailures };
};
```

------
#### [ PHP ]

**SDK for PHP**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
PHP を使用した Lambda での DynamoDB バッチ項目失敗のレポート。  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): array
    {
        $dynamoDbEvent = new DynamoDbEvent($event);
        $this->logger->info("Processing records");

        $records = $dynamoDbEvent->getRecords();
        $failedRecords = [];
        foreach ($records as $record) {
            try {
                $data = $record->getData();
                $this->logger->info(json_encode($data));
                // TODO: Do interesting work based on the new data
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
                // failed processing the record
                $failedRecords[] = $record->getSequenceNumber();
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");

        // change format for the response
        $failures = array_map(
            fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
            $failedRecords
        );

        return [
            'batchItemFailures' => $failures
        ];
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK for Python (Boto3)**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Python を使用して Lambda で DynamoDB のバッチアイテム失敗のレポート。  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
    records = event.get("Records")
    curRecordSequenceNumber = ""
    
    for record in records:
        try:
            # Process your record
            curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"]
        except Exception as e:
            # Return failed record's sequence number
            return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}

    return {"batchItemFailures":[]}
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Ruby を使用して Lambda で DynamoDB のバッチアイテム失敗のレポート。  

```
def lambda_handler(event:, context:)
    records = event["Records"]
    cur_record_sequence_number = ""
  
    records.each do |record|
      begin
        # Process your record
        cur_record_sequence_number = record["dynamodb"]["SequenceNumber"]
      rescue StandardError => e
        # Return failed record's sequence number
        return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]}
      end
    end
  
    {"batchItemFailures" => []}
  end
```

------
#### [ Rust ]

**SDK for Rust**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda-with-batch-item-handling)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Rust を使用して Lambda で DynamoDB のバッチアイテム失敗のレポート。  

```
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord, StreamRecord},
    streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

/// Process the stream record
fn process_record(record: &EventRecord) -> Result<(), Error> {
    let stream_record: &StreamRecord = &record.change;

    // process your stream record here...
    tracing::info!("Data: {:?}", stream_record);

    Ok(())
}

/// Main Lambda handler here...
async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> {
    let mut response = DynamoDbEventResponse {
        batch_item_failures: vec![],
    };

    let records = &event.payload.records;

    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(response);
    }

    for record in records {
        tracing::info!("EventId: {}", record.event_id);

        // Couldn't find a sequence number
        if record.change.sequence_number.is_none() {
            response.batch_item_failures.push(DynamoDbBatchItemFailure {
                item_identifier: Some("".to_string()),
            });
            return Ok(response);
        }

        // Process your record here...
        if process_record(record).is_err() {
            response.batch_item_failures.push(DynamoDbBatchItemFailure {
                item_identifier: record.change.sequence_number.clone(),
            });
            /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
            return Ok(response);
        }
    }

    tracing::info!("Successfully processed {} record(s)", records.len());

    Ok(response)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

## Powertools for AWS Lambda バッチプロセッサを使用する
<a name="services-ddb-batchfailurereporting-powertools"></a>

Powertools for AWS Lambda のバッチプロセッサユーティリティは、部分的なバッチレスポンスロジックを自動的に処理するため、バッチ障害レポートの実装の複雑さが軽減されます。バッチプロセッサを使用した例を次に示します。

**Python**  
詳細な例とセットアップ手順については、[バッチプロセッサのドキュメント](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)を参照してください。
AWS Lambda バッチプロセッサを使用した DynamoDB ストリームレコードの処理。  

```
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
logger = Logger()

def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
```

**TypeScript**  
詳細な例とセットアップ手順については、[バッチプロセッサのドキュメント](https://docs.aws.amazon.com/powertools/typescript/latest/features/batch/)を参照してください。
AWS Lambda バッチプロセッサを使用した DynamoDB ストリームレコードの処理。  

```
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { DynamoDBStreamEvent, Context } from 'aws-lambda';

const processor = new BatchProcessor(EventType.DynamoDBStreams);
const logger = new Logger();

const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};

export const handler = async (event: DynamoDBStreamEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};
```

**Java**  
詳細な例とセットアップ手順については、[バッチプロセッサのドキュメント](https://docs.powertools.aws.dev/lambda/java/latest/utilities/batch/)を参照してください。
AWS Lambda バッチプロセッサを使用した DynamoDB ストリームレコードの処理。  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

public class DynamoDBStreamBatchHandler implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    private final BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler;

    public DynamoDBStreamBatchHandler() {
        handler = new BatchMessageHandlerBuilder()
                .withDynamoDbBatchHandler()
                .buildWithRawMessageHandler(this::processMessage);
    }

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context context) {
        return handler.processBatch(ddbEvent, context);
    }

    private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) {
        // Process the change record
    }
}
```

**.NET**  
詳細な例とセットアップ手順については、[バッチプロセッサのドキュメント](https://docs.aws.amazon.com/powertools/dotnet/utilities/batch-processing/)を参照してください。
AWS Lambda バッチプロセッサを使用した DynamoDB ストリームレコードの処理。  

```
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;

[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]

namespace HelloWorld;

public class Customer
{
    public string? CustomerId { get; set; }
    public string? Name { get; set; }
    public string? Email { get; set; }
    public DateTime CreatedAt { get; set; }
}

internal class TypedDynamoDbRecordHandler : ITypedRecordHandler<Customer> 
{
    public async Task<RecordHandlerResult> HandleAsync(Customer customer, CancellationToken cancellationToken)
    {
        if (string.IsNullOrEmpty(customer.Email)) 
        {
            throw new ArgumentException("Customer email is required");
        }

        return await Task.FromResult(RecordHandlerResult.None); 
    }
}

public class Function
{
    [BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))]
    public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _)
    {
        return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; 
    }
}
```

# Lambda で DynamoDB イベントソースの破棄されたレコードを保持する
<a name="services-dynamodb-errors"></a>

DynamoDB イベントソースマッピングのエラー処理は、エラーが関数の呼び出し前に発生するか、関数の呼び出し中に発生するかによって異なります。
+ **呼び出し前:** スロットリングまたはその他の問題によって Lambda イベントソースマッピングが関数を呼び出すことができない場合、レコードの有効期限が切れるか、イベントソースマッピングで設定された最大有効期間 ([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)) を超えるまで再試行します。
+ **呼び出し中:** 関数は呼び出されたがエラーが返された場合、Lambda はレコードの有効期限が切れるか、最大有効期間 ([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)) を超えるか、設定された再試行クォータ ([MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts)) に達するまで再試行します。関数エラーの場合、[BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-BisectBatchOnFunctionError) を設定することもできます。これは、失敗したバッチを 2 つの小さなバッチに分割し、不良レコードを分離してタイムアウトを回避します。バッチを分割しても、再試行クォータは消費されません。

エラー処理の対策に失敗すると、Lambda はレコードを破棄し、ストリームからのバッチ処理を継続します。デフォルト設定では、不良レコードによって、影響を受けるシャードでの処理が最大 1 日間ブロックされる可能性があります。これを回避するには、関数のイベントソースマッピングを、適切な再試行回数と、ユースケースに適合する最大レコード経過時間で設定します。

## 失敗した呼び出しの送信先の設定
<a name="dynamodb-on-failure-destination-console"></a>

失敗したイベントソースマッピング呼び出しの記録を保持するには、関数のイベントソースマッピングに送信先を追加します。送信先に送られる各レコードは、失敗した呼び出しに関するメタデータを含む JSON ドキュメントです。Amazon S3 送信先の場合、Lambda はメタデータと共に呼び出しレコード全体を送信します。任意の Amazon SNS トピック、Amazon SQS キュー、Amazon S3 バケット、または Kafka を送信先として設定できます。

Amazon S3 送信先を使用すると、[Amazon S3 イベント通知](https://docs.aws.amazon.com/)機能を使用して、オブジェクトが送信先 S3 バケットにアップロードされたときに通知を受け取ることができます。また、S3 イベント通知を設定して、失敗したバッチに対して別の Lambda 関数を呼び出し、自動処理を実行することもできます。

実行ロールには、送信先に対するアクセス許可が必要です。
+ **SQS 送信先の場合:** [sqs:SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)
+ **SNS 送信先の場合:** [sns:Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html)
+ **S3 送信先の場合:** [s3:PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) および [s3:ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/ListObjectsV2.html)
+ **Kafka 送信先の場合:** [kafka-cluster:WriteData](https://docs.aws.amazon.com/msk/latest/developerguide/kafka-actions.html)

Kafka トピックは、Kafka イベントソースマッピングの障害発生時の送信先として設定できます。再試行回数を超えた後、またはレコードが最大経過時間を超えた後に Lambda がレコードを処理できなくなると、Lambda は失敗したレコードを指定された Kafka トピックに送信して、後で処理します。「[Kafka トピックを障害発生時の送信先として使用する](kafka-on-failure-destination.md)」を参照してください。

S3 送信先に対して独自の KMS キーを使用した暗号化を有効にしている場合、関数の実行ロールには [kms:GenerateDataKey](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html) を呼び出すためのアクセス許可も必要です。KMS キーと S3 バケットの送信先が Lambda 関数および実行ロールとは異なるアカウントにある場合は、kms:GenerateDataKey を許可するように実行ロールを信頼するように KMS キーを設定します。

障害発生時の送信先をコンソールを使用して設定するには、以下の手順に従います。

1. Lambda コンソールの [[関数ページ]](https://console.aws.amazon.com/lambda/home#/functions) を開きます。

1. 関数を選択します。

1. [**機能の概要 **] で、[**送信先を追加 **] を選択します。

1. **[ソース]** には、**[イベントソースマッピング呼び出し]** を選択します。

1. **[イベントソースマッピング]** では、この関数用に設定されているイベントソースを選択します。

1. **[条件]** には **[失敗時]** を選択します。イベントソースマッピング呼び出しでは、これが唯一受け入れられる条件です。

1. **[送信先タイプ]** では、Lambda が呼び出しレコードを送信する送信先タイプを選択します。

1. [**送信先**] で、リソースを選択します。

1. **[保存]** を選択します。

AWS Command Line Interface (AWS CLI) を使用して障害発生時の送信先を設定することもできます。例えば、次の [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) コマンドは、SQS を障害発生時の送信先として持つイベントソースマッピングを `MyFunction` に追加します。

```
aws lambda create-event-source-mapping \
--function-name "MyFunction" \
--event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
```

次の [update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html) コマンドは、2 回の再試行後、またはレコードが 1 時間以上経過した場合に失敗した呼び出しレコードを SNS 送信先に送信するように、イベントソースマッピングを更新します。

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--maximum-retry-attempts 2 \
--maximum-record-age-in-seconds 3600 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
```

更新された設定は非同期に適用され、プロセスが完了するまで出力に反映されません。現在のステータスを表示するには、[get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html) コマンドを使用します。

送信先を削除するには、`destination-config` パラメータの引数として空の文字列を指定します。

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": ""}}'
```

### Amazon S3 送信先のセキュリティのベストプラクティス
<a name="ddb-s3-destination-security"></a>

関数の設定から送信先を削除せずに、送信先として設定された S3 バケットを削除すると、セキュリティリスクが発生する可能性があります。別のユーザーが送信先バケットの名前を知っている場合は、その AWS アカウントでバケットを再作成できます。失敗した呼び出しのレコードがそのバケットに送信され、関数からのデータが公開される可能性があります。

**警告**  
関数からの呼び出しレコードを別の AWS アカウントの S3 バケットに送信できないようにするには、`s3:PutObject` アクセス許可を自分アカウントのバケットに制限する条件を関数の実行ロールに追加します。

次の例は、関数の `s3:PutObject` アクセス許可を自分のアカウントのバケットに制限する IAM ポリシーを示しています。このポリシーは、送信先として S3 バケットを使用するために必要な `s3:ListBucket` アクセス許可も Lambda に付与します。

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3BucketResourceAccountWrite",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*/*",
                "arn:aws:s3:::*"
            ],
            "Condition": {
                "StringEquals": {
                    "s3:ResourceAccount": "111122223333"
                }
            }
        }
    ]
}
```

AWS マネジメントコンソールまたは AWS CLI を使用して、関数の実行ロールにアクセス許可ポリシーを追加する場合は、次の手順を参照してください。

------
#### [ Console ]

**関数の実行ロールにアクセス許可ポリシーを追加するには (コンソール)**

1. Lambda コンソールの [[関数]](https://console.aws.amazon.com/lambda/home#/functions) ページを開きます。

1. 実行ロールを変更する Lambda 関数を選択します。

1. **[構成]** タブで、**[アクセス許可]** を選択します。

1. **[実行ロール]** タブで、関数の **[ロール名]** を選択して、ロールの IAM コンソールページを開きます。

1. 次の手順を実行してアクセス許可ポリシーをロールに追加します。

   1. **[アクセス許可ポリシー]** ペインで、**[アクセス許可の追加]**、**[インラインポリシーを作成]** を選択します。

   1. **ポリシーエディタ**で、**[JSON]** を選択します。

   1. 追加するポリシーをエディタに貼り付け (既存の JSON を置き換える)、**[次へ]** を選択します。

   1. **[ポリシーの詳細]** で **[ポリシー名]** を入力します。

   1. [**Create policy**] (ポリシーの作成) を選択します。

------
#### [ AWS CLI ]

**関数の実行ロールにアクセス許可ポリシーを追加するには (CLI)**

1. 必要なアクセス許可を持つ JSON ポリシードキュメントを作成し、ローカルディレクトリに保存します。

1. IAM `put-role-policy` CLI コマンドを使用して、関数の実行ロールにアクセス許可を追加します。JSON ポリシードキュメントを保存したディレクトリから次のコマンドを実行して、ロール名、ポリシー名、ポリシードキュメントを独自の値に置き換えます。

   ```
   aws iam put-role-policy \
   --role-name my_lambda_role \
   --policy-name LambdaS3DestinationPolicy \
   --policy-document file://my_policy.json
   ```

------

### Amazon SNS および Amazon SQS の呼び出しレコードの例
<a name="kinesis-on-failure-destination-example-sns-sqs"></a>

次の例は、DynamoDB ストリームに対して Lambda が SQS または SNS 送信先に送信する呼び出しレコードを示しています。

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:13:49.717Z",
    "DDBStreamBatchInfo": {
        "shardId": "shardId-00000001573689847184-864758bb",
        "startSequenceNumber": "800000000003126276362",
        "endSequenceNumber": "800000000003126276362",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z",
        "batchSize": 1,
        "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388"
    }
}
```

この情報は、トラブルシューティングのためにストリームから影響を受けるレコードを取得する際に使用できます。実際のレコードは含まれていないので、有効期限が切れて失われる前に、このレコードを処理し、ストリームから取得する必要があります。

### Amazon S3 呼び出しレコードの例
<a name="kinesis-on-failure-destination-example-sns-sqs-s3"></a>

次の例は、DynamoDB ストリームに対して Lambda が S3 バケットに送信する呼び出しレコードを示しています。SQS と SNS の送信先に関する前例のすべてのフィールドに加えて、`payload` フィールドには元の呼び出しレコードがエスケープされた JSON 文字列として含まれています。

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:13:49.717Z",
    "DDBStreamBatchInfo": {
        "shardId": "shardId-00000001573689847184-864758bb",
        "startSequenceNumber": "800000000003126276362",
        "endSequenceNumber": "800000000003126276362",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z",
        "batchSize": 1,
        "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388"
    },
    "payload": "<Whole Event>" // Only available in S3
}
```

呼び出しレコードを含む S3 オブジェクトでは、次の命名規則が使用されます。

```
aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>
```

# Lambda での ステートフル DynamoDB ストリーム処理の実装
<a name="services-ddb-windows"></a>

Lambda 関数は、連続ストリーム処理アプリケーションを実行できます。ストリームは、アプリケーションを継続的に流れる無限のデータを表します。この継続的に更新される入力からの情報を分析するために、時間に関して定義されたウィンドウを使用して、含まれるレコードをバインドできます。

タンブリングウィンドウは、一定の間隔で開閉する別個のタイムウィンドウです。ディフォルトでは、Lambda 呼び出しはステートレス — 外部データベースがない場合、複数の連続した呼び出しでデータを処理するために使用することはできません。ただし、タンブリングウィンドウを使用して、呼び出し間で状態を維持できます。この状態は、現在のウィンドウに対して以前に処理されたメッセージの集計結果が含まれます。状態は、シャードごとに最大 1 MB にすることができます。このサイズを超えると、Lambda はウィンドウを早期に終了します。

ストリームの各レコードは、特定のウィンドウに属しています。Lambda は各レコードを少なくとも 1 回処理しますが、各レコードが 1 回だけ処理される保証はありません。エラー処理などのまれなケースでは、一部のレコードが複数回処理されることがあります。レコードは常に最初から順番に処理されます。レコードが複数回処理される場合、順不同で処理されます。

## 集約と処理
<a name="streams-tumbling-processing"></a>

ユーザー管理関数は、集約と、その集約の最終結果を処理するために呼び出されます。Lambda は、ウィンドウで受信したすべてのレコードを集約します。これらのレコードは、個別の呼び出しとして複数のバッチで受け取ることができます。各呼び出しは状態を受け取ります。したがって、タンブリングウィンドウを使用する場合、Lambda 関数の応答に `state` プロパティが含まれている必要があります。応答に `state` プロパティが含まれてないと、Lambda はこれを失敗した呼び出しと見なします。この条件を満たすために、関数は次の JSON 形式の `TimeWindowEventResponse` オブジェクトを返すことができます。

**Example `TimeWindowEventResponse`値**  

```
{
    "state": {
        "1": 282,
        "2": 715
    },
    "batchItemFailures": []
}
```

**注記**  
Java 関数の場合は、`Map<String, String>`を使用して状態を表すことをお勧めします。

ウィンドウの最後で、フラグ`isFinalInvokeForWindow`が`true`に設定され、これが最終状態であり、処理の準備ができていることが示されます。処理が完了すると、ウィンドウが完了し、最終的な呼び出しが完了し、状態は削除されます。

ウィンドウの最後に、Lambda は集計結果に対するアクションの最終処理を使用します。最終処理が同期的に呼び出されます。呼び出しが成功すると、関数はシーケンス番号をチェックポイントし、ストリーム処理が続行されます。呼び出しが失敗した場合、Lambda 関数は呼び出しが成功するまで処理を一時停止します。

**Example DynamodbtimeWindowEvent**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ],
    "window": {
        "start": "2020-07-30T17:00:00Z",
        "end": "2020-07-30T17:05:00Z"
    },
    "state": {
        "1": "state1"
    },
    "shardId": "shard123456789",
    "eventSourceARN": "stream-ARN",
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}
```

## 設定
<a name="streams-tumbling-config"></a>

イベントソースマッピングを作成または更新するときに 、タンブリングウィンドウを設定できます。タンブリングウィンドウを設定するには、ウィンドウを秒単位で指定します ([TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds))。次の例の AWS Command Line Interface (AWS CLI) コマンドは、タンブリングウィンドウが 120 秒に設定されたストリーミングイベントソースマッピングを作成します。集約と処理のために Lambda 関数が定義した関数の名前は `tumbling-window-example-function` です。

```
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \
--function-name tumbling-window-example-function \
--starting-position TRIM_HORIZON \
--tumbling-window-in-seconds 120
```

Lambda は、レコードがストリームに挿入された時間に基づいて、タンブリングウィンドウの境界を決定します。すべてのレコードには、Lambda が境界の決定に使用するおおよそのタイムスタンプがあります。

ウィンドウの集合をタンブルしても、再共有はサポートされません。シャードが終了すると、Lambda はウィンドウが閉じているとみなし、子シャードは新しい状態で自身のウィンドウを開始します。

タンブルウィンドウは、既存の再試行ポリシー`maxRetryAttempts`および`maxRecordAge`を完全にサポートします。

**Example Handler.py - 集約と処理**  
次の Python 関数は、最終状態を集約して処理する方法を示しています。  

```
def lambda_handler(event, context):
    print('Incoming event: ', event)
    print('Incoming state: ', event['state'])

#Check if this is the end of the window to either aggregate or process.
    if event['isFinalInvokeForWindow']:
        # logic to handle final state of the window
        print('Destination invoke')
    else:
        print('Aggregate invoke')

#Check for early terminations
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')

    #Aggregation logic
    state = event['state']
    for record in event['Records']:
        state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1

    print('Returning state: ', state)
    return {'state': state}
```

# Amazon DynamoDB イベントソースマッピング用の Lambda パラメータ
<a name="services-ddb-params"></a>

すべての Lambda イベントソースタイプは、同じ [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) および [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) API オペレーションを共有しています。ただし、DynamoDB Streams に適用されるのは一部のパラメータのみです。


| [Parameter] (パラメータ) | 必須 | デフォルト | メモ | 
| --- | --- | --- | --- | 
|  BatchSize  |  N  |  100  |  最大: 10,000  | 
|  BisectBatchOnFunctionError  |  いいえ  |  false  | なし  | 
|  DestinationConfig  |  いいえ  | 該当なし  |  廃棄されたレコードの標準 Amazon SQS キューまたは標準 Amazon SNS トピックの送信先。  | 
|  有効  |  いいえ  |  true  | なし  | 
|  EventSourceArn  |  はい  | 該当なし |  データストリームまたはストリームコンシューマーの ARN。  | 
|  FilterCriteria  |  いいえ  | 該当なし  |  [Lambda が関数に送信するイベントを制御する](invocation-eventfiltering.md)  | 
|  FunctionName  |  はい  | 該当なし  | なし  | 
|  FunctionResponseTypes  |  いいえ  | 該当なし |  関数がバッチ内の特定の失敗を報告できるようにするには、`FunctionResponseTypes` に値 `ReportBatchItemFailures` を含めます。詳細については、「[DynamoDB と Lambda を使用した部分的なバッチレスポンスの設定](services-ddb-batchfailurereporting.md)」を参照してください。  | 
|  MaximumBatchingWindowInSeconds  |  いいえ  |  0  | なし  | 
|  MaximumRecordAgeInSeconds  |  いいえ  |  -1  |  -1 は無制限を意味し、失敗したレコードは有効期限が切れるまで再試行されます。「[DynamoDB ストリームのデータ保持制限](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.DataRetention)」は 24 時間です。 最小: -1 最大: 604,800  | 
|  MaximumRetryAttempts  |  いいえ  |  -1  |  -1 に設定すると無制限になり、失敗したレコードはレコードの有効期限が切れるまで再試行されます。 最小: 0 最大: 10,000  | 
|  ParallelizationFactor  |  いいえ  |  1  |  最大: 10  | 
|  StartingPosition  |  はい  | 該当なし  |  TRIM\$1HORIZON または LATEST  | 
|  TumblingWindowInSeconds  |  いいえ  | 該当なし  |  最小: 0 最大: 900  | 

# DynamoDB イベントソースでのイベントフィルタリングの使用
<a name="with-ddb-filtering"></a>

イベントフィルタリングを使用して、Lambda が関数に送信するストリームまたはキューからのレコードを制御することができます。イベントフィルタリングの仕組みに関する一般情報については、「[Lambda が関数に送信するイベントを制御する](invocation-eventfiltering.md)」を参照してください。

このセクションでは、DynamoDB イベントソースのイベントフィルタリングに焦点を当てます。

**注記**  
DynamoDB イベントソースマッピングは、 `dynamodb` キーでのフィルタリングのみをサポートします。

**Topics**
+ [DynamoDB イベント](#filtering-ddb)
+ [テーブル属性でフィルタリングする](#filtering-ddb-attributes)
+ [ブール式でフィルタリングする](#filtering-ddb-boolean)
+ [Exists 演算子の使用](#filtering-ddb-exists)
+ [DynamoDB フィルタリングの JSON 形式](#filtering-ddb-JSON-format)

## DynamoDB イベント
<a name="filtering-ddb"></a>

プライマリキー `CustomerName`、属性 `AccountManager`、属性 `PaymentTerms` を含む DynamoDB テーブルがあるとします。次の内容では、DynamoDB テーブルのストリームからのレコード例が示されています。

```
{
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
          "ApproximateCreationDateTime": "1678831218.0",
          "Keys": {
              "CustomerName": {
                  "S": "AnyCompany Industries"
              }
          },
          "NewImage": {
              "AccountManager": {
                  "S": "Pat Candella"
              },
              "PaymentTerms": {
                  "S": "60 days"
              },
              "CustomerName": {
                  "S": "AnyCompany Industries"
              }
          },
          "SequenceNumber": "111",
          "SizeBytes": 26,
          "StreamViewType": "NEW_IMAGE"
      }
  }
```

DynamoDB テーブルのキーおよび属性値に基づいてフィルタリングするには、レコードで `dynamodb` キーを使用します。次のセクションでは、さまざまなフィルタータイプの例を示します。

### テーブルキーでフィルタリングする
<a name="filtering-ddb-keys"></a>

プライマリキー `CustomerName` が「AnyCompany Industries」のレコードのみを関数で処理するとします。`FilterCriteria` オブジェクトは次のようになります。

```
{
     "Filters": [
          {
              "Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"
          }
      ]
 }
```

以下は、わかりやすくするためにプレーン JSON で展開したフィルターの `Pattern` の値を記載しています。

```
{
     "dynamodb": {
          "Keys": {
              "CustomerName": {
                  "S": [ "AnyCompany Industries" ]
                  }
              }
          }
 }
```

コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。

------
#### [ Console ]

コンソールを使用してこのフィルターを追加するには、[イベントソースマッピングへのフィルター条件のアタッチ (コンソール)](invocation-eventfiltering.md#filtering-console) の指示に従って **[フィルター条件]** に次の文字列を入力します。

```
{ "dynamodb" : { "Keys" : { "CustomerName" : { "S" : [ "AnyCompany Industries" ] } } } }
```

------
#### [ AWS CLI ]

AWS Command Line Interface (AWS CLI) を使用してこれらのフィルター条件を持つ新しいイベントソースマッピングを作成するには、以下のコマンドを実行します。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"}]}'
```

これらのフィルター条件を既存のイベントソースマッピングに追加するには、次のコマンドを実行します。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"Keys\" : { \"CustomerName\" : { \"S\" : [ \"AnyCompany Industries\" ] } } } }"}]}'
```

------
#### [ AWS SAM ]

AWS SAM を使用してこのフィルターを追加するには、イベントソースの YAML テンプレートに次のスニペットを追加します。

```
FilterCriteria:
   Filters:
     - Pattern: '{ "dynamodb" : { "Keys" : { "CustomerName" : { "S" : [ "AnyCompany Industries" ] } } } }'
```

------

## テーブル属性でフィルタリングする
<a name="filtering-ddb-attributes"></a>

DynamoDB を使用すると、`NewImage` および `OldImage` キーを使用して属性値をフィルタリングすることもできます。最新のテーブル画像の `AccountManager` 属性が「Pat Candella」または「Shirley Rodriguez」のレコードをフィルタリングするとします。`FilterCriteria` オブジェクトは次のようになります。

```
{
    "Filters": [
        {
            "Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"
        }
    ]
}
```

以下は、わかりやすくするためにプレーン JSON で展開したフィルターの `Pattern` の値を記載しています。

```
{
    "dynamodb": {
        "NewImage": {
            "AccountManager": {
                "S": [ "Pat Candella", "Shirley Rodriguez" ]
            }
        }
    }
}
```

コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。

------
#### [ Console ]

コンソールを使用してこのフィルターを追加するには、[イベントソースマッピングへのフィルター条件のアタッチ (コンソール)](invocation-eventfiltering.md#filtering-console) の指示に従って **[フィルター条件]** に次の文字列を入力します。

```
{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella", "Shirley Rodriguez" ] } } } }
```

------
#### [ AWS CLI ]

AWS Command Line Interface (AWS CLI) を使用してこれらのフィルター条件を持つ新しいイベントソースマッピングを作成するには、以下のコマンドを実行します。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"}]}'
```

これらのフィルター条件を既存のイベントソースマッピングに追加するには、次のコマンドを実行します。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\", \"Shirley Rodriguez\" ] } } } }"}]}'
```

------
#### [ AWS SAM ]

AWS SAM を使用してこのフィルターを追加するには、イベントソースの YAML テンプレートに次のスニペットを追加します。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella", "Shirley Rodriguez" ] } } } }'
```

------

## ブール式でフィルタリングする
<a name="filtering-ddb-boolean"></a>

ブール型 AND 式を使用してフィルターを作成することもできます。これらの式には、テーブルの主パラメータと属性パラメータの両方を含めることができます。`AccountManager` の `NewImage` の値が「Pat Candella」で、`OldImage` の値が「Terry Whitlock」であるレコードをフィルタリングするとします。`FilterCriteria` オブジェクトは次のようになります。

```
{
    "Filters": [
        {
            "Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } }"
        }
    ]
}
```

以下は、わかりやすくするためにプレーン JSON で展開したフィルターの `Pattern` の値を記載しています。

```
{ 
    "dynamodb" : { 
        "NewImage" : { 
            "AccountManager" : { 
                "S" : [ 
                    "Pat Candella" 
                ] 
            } 
        } 
    }, 
    "dynamodb": { 
        "OldImage": { 
            "AccountManager": { 
                "S": [ 
                    "Terry Whitlock" 
                ] 
            } 
        } 
    } 
}
```

コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。

------
#### [ Console ]

コンソールを使用してこのフィルターを追加するには、[イベントソースマッピングへのフィルター条件のアタッチ (コンソール)](invocation-eventfiltering.md#filtering-console) の指示に従って **[フィルター条件]** に次の文字列を入力します。

```
{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella" ] } } } , "dynamodb" : { "OldImage" : { "AccountManager" : { "S" : [ "Terry Whitlock" ] } } } }
```

------
#### [ AWS CLI ]

AWS Command Line Interface (AWS CLI) を使用してこれらのフィルター条件を持つ新しいイベントソースマッピングを作成するには、以下のコマンドを実行します。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } } "}]}'
```

これらのフィルター条件を既存のイベントソースマッピングに追加するには、次のコマンドを実行します。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"dynamodb\" : { \"NewImage\" : { \"AccountManager\" : { \"S\" : [ \"Pat Candella\" ] } } } , \"dynamodb\" : { \"OldImage\" : { \"AccountManager\" : { \"S\" : [ \"Terry Whitlock\" ] } } } } "}]}'
```

------
#### [ AWS SAM ]

AWS SAM を使用してこのフィルターを追加するには、イベントソースの YAML テンプレートに次のスニペットを追加します。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "dynamodb" : { "NewImage" : { "AccountManager" : { "S" : [ "Pat Candella" ] } } } , "dynamodb" : { "OldImage" : { "AccountManager" : { "S" : [ "Terry Whitlock" ] } } } }'
```

------

**注記**  
DynamoDB イベントフィルタリングは、数値演算子 (数値等式および数値範囲) の使用をサポートしていません。テーブルの項目が数値として保存されている場合でも、これらのパラメータは JSON レコードオブジェクトの文字列に変換されます。

## Exists 演算子の使用
<a name="filtering-ddb-exists"></a>

DynamoDB の JSON イベントオブジェクトは構造化されているため、EXISTS 演算子の使用には特別な注意が必要です。EXISTS 演算子はイベント JSON のリーフノードでのみ機能するため、フィルターパターンが EXISTS を使用して中間ノードをテストしても機能しません。次の DynamoDB テーブルの項目を考慮します。

```
{
  "UserID": {"S": "12345"},
  "Name": {"S": "John Doe"},
  "Organizations": {"L": [
      {"S":"Sales"},
      {"S":"Marketing"},
      {"S":"Support"}
    ]
  }
}
```

次のように、`"Organizations"` を含むイベントをテストするフィルターパターンを作成できます。

```
{ "dynamodb" : { "NewImage" : { "Organizations" : [ { "exists": true } ] } } }
```

ただし、`"Organizations"` はリーフノードではないため、このフィルターパターンに一致するものは返されません。次の例では、EXISTS 演算子を適切に使用して目的のフィルターパターンを作成する方法を示しています。

```
{ "dynamodb" : { "NewImage" : {"Organizations": {"L": {"S": [ {"exists": true } ] } } } } }
```

## DynamoDB フィルタリングの JSON 形式
<a name="filtering-ddb-JSON-format"></a>

DynamoDB ソースのイベントを適切にフィルタリングするには、データフィールドおよびデータフィールド (`dynamodb`) のフィルター条件の両方が有効な JSON 形式である必要があります。フィールドのどちらかが有効な JSON 形式ではない場合、Lambda はメッセージをドロップするか、例外をスローします。以下は、特定の動作を要約した表です。


| 着信データの形式 | データプロパティのフィルターパターンの形式 | 結果として生じるアクション | 
| --- | --- | --- | 
|  有効な JSON  |  有効な JSON  |  Lambda がフィルター条件に基づいてフィルタリングを実行します。  | 
|  有効な JSON  |  データプロパティのフィルターパターンがない  |  Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。  | 
|  有効な JSON  |  JSON 以外  |  Lambda がイベントソースマッピングの作成または更新時に例外をスローします。データプロパティのフィルターパターンは、有効な JSON 形式である必要があります。  | 
|  JSON 以外  |  有効な JSON  |  Lambda がレコードをドロップします。  | 
|  JSON 以外  |  データプロパティのフィルターパターンがない  |  Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。  | 
|  JSON 以外  |  JSON 以外  |  Lambda がイベントソースマッピングの作成または更新時に例外をスローします。データプロパティのフィルターパターンは、有効な JSON 形式である必要があります。  | 

# チュートリアル: Amazon DynamoDB Streams で AWS Lambda を使用する
<a name="with-ddb-example"></a>

 このチュートリアルでは、Amazon DynamoDB ストリームからのイベントを処理する Lambda 関数を作成します。

## 前提条件
<a name="with-ddb-prepare"></a>

### AWS Command Line Interface のインストール
<a name="install_aws_cli"></a>

AWS Command Line Interface をまだインストールしていない場合は、「[最新バージョンの AWS CLI のインストールまたは更新](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)」にある手順に従ってインストールしてください。

このチュートリアルでは、コマンドを実行するためのコマンドラインターミナルまたはシェルが必要です。Linux および macOS では、任意のシェルとパッケージマネージャーを使用してください。

**注記**  
Windows では、Lambda でよく使用される一部の Bash CLI コマンド (`zip` など) が、オペレーティングシステムの組み込みターミナルでサポートされていません。Ubuntu および Bash の Windows 統合バージョンを取得するには、[Windows Subsystem for Linux をインストール](https://docs.microsoft.com/en-us/windows/wsl/install-win10)します。

## 実行ロールを作成する
<a name="with-ddb-create-execution-role"></a>

AWS リソースにアクセスするためのアクセス権限を関数に付与する[実行ロール](lambda-intro-execution-role.md)を作成します。

**実行ロールを作成するには**

1. IAM コンソールの [[ロールページ](https://console.aws.amazon.com/iam/home#/roles)] を開きます。

1. [**ロールの作成**] を選択します。

1. 次のプロパティでロールを作成します。
   + **信頼されたエンティティ** – Lambda
   + **アクセス許可** - **AWSLambdaDynamoDBExecutionRole**
   + **ロール名** -**lambda-dynamodb-role** 

**AWSLambdaDynamoDBExecutionRole** には、DynamoDB から項目を読み取り、 に CloudWatch Logs ログを書き込むために、関数が必要とするアクセス許可があります。

## 関数を作成する
<a name="with-ddb-example-create-function"></a>

DynamoDB イベントを処理する Lambda 関数を作成します。関数コードは、受信イベントデータの一部を CloudWatch ログに書き込みます。

------
#### [ .NET ]

**SDK for .NET**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
.NET を使用して Lambda で DynamoDB イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace AWSLambda_DDB;

public class Function
{
    public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
    {
        context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");

        foreach (var record in dynamoEvent.Records)
        {
            context.Logger.LogInformation($"Event ID: {record.EventID}");
            context.Logger.LogInformation($"Event Name: {record.EventName}");

            context.Logger.LogInformation(JsonSerializer.Serialize(record));
        }

        context.Logger.LogInformation("Stream processing complete.");
    }
}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Go を使用して Lambda で DynamoDB イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-lambda-go/events"
	"fmt"
)

func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) {
	if len(event.Records) == 0 {
		return nil, fmt.Errorf("received empty event")
	}

	for _, record := range event.Records {
	 	LogDynamoDBRecord(record)
	}

	message := fmt.Sprintf("Records processed: %d", len(event.Records))
	return &message, nil
}

func main() {
	lambda.Start(HandleRequest)
}

func LogDynamoDBRecord(record events.DynamoDBEventRecord){
	fmt.Println(record.EventID)
	fmt.Println(record.EventName)
	fmt.Printf("%+v\n", record.Change)
}
```

------
#### [ Java ]

**SDK for Java 2.x**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Java を使用して Lambda で DynamoDB イベントの消費。  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

public class example implements RequestHandler<DynamodbEvent, Void> {

    private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create();

    @Override
    public Void handleRequest(DynamodbEvent event, Context context) {
        System.out.println(GSON.toJson(event));
        event.getRecords().forEach(this::logDynamoDBRecord);
        return null;
    }

    private void logDynamoDBRecord(DynamodbStreamRecord record) {
        System.out.println(record.getEventID());
        System.out.println(record.getEventName());
        System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb()));
    }
}
```

------
#### [ JavaScript ]

**SDK for JavaScript (v3)**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
JavaScript を使用した Lambda での DynamoDB イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
};

const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```
TypeScript を使用した Lambda での DynamoDB イベントの消費。  

```
export const handler = async (event, context) => {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(record => {
        logDynamoDBRecord(record);
    });
}
const logDynamoDBRecord = (record) => {
    console.log(record.eventID);
    console.log(record.eventName);
    console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
```

------
#### [ PHP ]

**SDK for PHP**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
PHP を使用した Lambda での DynamoDB イベントの消費。  

```
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\DynamoDb\DynamoDbHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends DynamoDbHandler
{
    private StderrLogger $logger;

    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleDynamoDb(DynamoDbEvent $event, Context $context): void
    {
        $this->logger->info("Processing DynamoDb table items");
        $records = $event->getRecords();

        foreach ($records as $record) {
            $eventName = $record->getEventName();
            $keys = $record->getKeys();
            $old = $record->getOldImage();
            $new = $record->getNewImage();
            
            $this->logger->info("Event Name:".$eventName."\n");
            $this->logger->info("Keys:". json_encode($keys)."\n");
            $this->logger->info("Old Image:". json_encode($old)."\n");
            $this->logger->info("New Image:". json_encode($new));
            
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }

        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords items");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK for Python (Boto3)**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Python を使用して Lambda で DynamoDB イベントの消費。  

```
import json

def lambda_handler(event, context):
    print(json.dumps(event, indent=2))

    for record in event['Records']:
        log_dynamodb_record(record)

def log_dynamodb_record(record):
    print(record['eventID'])
    print(record['eventName'])
    print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Ruby を使用して Lambda で DynamoDB イベントの消費。  

```
def lambda_handler(event:, context:)
    return 'received empty event' if event['Records'].empty?
  
    event['Records'].each do |record|
      log_dynamodb_record(record)
    end
  
    "Records processed: #{event['Records'].length}"
  end
  
  def log_dynamodb_record(record)
    puts record['eventID']
    puts record['eventName']
    puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}"
  end
```

------
#### [ Rust ]

**SDK for Rust**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-ddb-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Rust を使用して Lambda で DynamoDB イベントを利用します。  

```
use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
use aws_lambda_events::{
    event::dynamodb::{Event, EventRecord},
   };


// Built with the following dependencies:
//lambda_runtime = "0.11.1"
//serde_json = "1.0"
//tokio = { version = "1", features = ["macros"] }
//tracing = { version = "0.1", features = ["log"] }
//tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
//aws_lambda_events = "0.15.0"

async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> {
    
    let records = &event.payload.records;
    tracing::info!("event payload: {:?}",records);
    if records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    for record in records{
        log_dynamo_dbrecord(record);
    }

    tracing::info!("Dynamo db records processed");

    // Prepare the response
    Ok(())

}

fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{
    tracing::info!("EventId: {}", record.event_id);
    tracing::info!("EventName: {}", record.event_name);
    tracing::info!("DynamoDB Record: {:?}", record.change );
    Ok(())

}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
    .with_max_level(tracing::Level::INFO)
    .with_target(false)
    .without_time()
    .init();

    let func = service_fn(function_handler);
    lambda_runtime::run(func).await?;
    Ok(())
    
}
```

------

**関数を作成するには**

1. サンプルコードを `example.js` という名前のファイルにコピーします。

1. デプロイパッケージを作成します。

   ```
   zip function.zip example.js
   ```

1. `create-function` コマンドを使用して Lambda 関数を作成します。

   ```
   aws lambda create-function --function-name ProcessDynamoDBRecords \
       --zip-file fileb://function.zip --handler example.handler --runtime nodejs24.x \
       --role arn:aws:iam::111122223333:role/lambda-dynamodb-role
   ```

## Lambda 関数をテストする
<a name="with-dbb-invoke-manually"></a>

このセットアップでは、`invoke`AWS Lambda の CLI コマンドと次のサンプルの DynamoDB イベントを使用して、Lambda 関数を手動で呼び出します。次の内容を `input.txt` という名前のファイルにコピーします。

**Example input.txt**  

```
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ]
}
```

次の `invoke` コマンドを実行します。

```
aws lambda invoke --function-name ProcessDynamoDBRecords \
    --cli-binary-format raw-in-base64-out \
    --payload file://input.txt outputfile.txt
```

AWS CLI バージョン 2 を使用している場合、**cli-binary-format** オプションは必須です。これをデフォルト設定にするには、`aws configure set cli-binary-format raw-in-base64-out` を実行します。詳細については、「*AWS Command Line Interface バージョン 2 用ユーザーガイド*」の「[AWS CLI でサポートされているグローバルコマンドラインオプション](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list)」を参照してください。

この関数はレスポンス本文で文字列 `message` を返します。

`outputfile.txt` ファイルで出力を確認します。

## ストリーミングが有効になった DynamoDB テーブルを作成する
<a name="with-ddb-create-buckets"></a>

ストリーミングが有効な Amazon DynamoDB テーブルを作成します。

**DynamoDB テーブルを作成するには**

1. [DynamoDB コンソール](https://console.aws.amazon.com/dynamodb)を開きます。

1. [**Create table**] を選択します。

1. 次の設定でテーブルを作成します。
   + **テーブル名** – **lambda-dynamodb-stream**
   + **プライマリキー** – **id** (文字列)

1. [**作成**] を選択します。

**ストリームを有効化するには**

1. [DynamoDB コンソール](https://console.aws.amazon.com/dynamodb)を開きます。

1. [**テーブル**] を選択します。

1. [**lambda-dynamodb-stream**] テーブルを選択します。

1. [**Exports and streams**] (エクスポートとストリーミング)で、[**DynamoDB stream details**] (DynamoDB ストリーミングの詳細) を選択します。

1. **[オンにする]** を選択します。

1. **[ビュータイプ]** には、**[キー属性のみ]** を選択します。

1. **[ストリームをオンにする]** を選択します。

ストリーム ARN をメモします。こちらは、次のステップでストリームを Lambda 関数に関連付ける際に必要になります。ストリームの有効化の詳細については、「[DynamoDB Streams を使用したテーブルアクティビティのキャプチャ](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)」を参照してください。

## AWS Lambda でイベントソースを追加する
<a name="with-ddb-attach-notification-configuration"></a>

AWS Lambda でイベントソースマッピングを作成します。このイベントソースのマッピングは、DynamoDB ストリームを Lambda 関数に関連付けます。このイベントソースのマッピングを作成すると、AWS Lambda はストリームのポーリングを開始します。

次の AWS CLI `create-event-source-mapping` コマンドを実行します。コマンドの実行後、UUID をメモします。この UUID は、イベントソースマッピングを削除するときなど、コマンドでイベントソースマッピングを参照する場合に必要になります。

```
aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \
    --batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn
```

 これにより、指定された DynamoDB ストリームと Lambda 関数の間にマッピングが作成されます。1 つの DynamoDB ストリームを複数の Lambda 関数と関連付けたり、1 つの Lambda 関数を複数のストリームに関連付けたりすることができます。ただし、Lambda 関数は、共有するストリーム用に、読み取りスループットを共有します。

次のコマンドを実行して、イベントソースのマッピングのリストを取得できます。

```
aws lambda list-event-source-mappings
```

このリストでは、作成済みのすべてのイベントソースのマッピングが返され、各マッピングに対して `LastProcessingResult` などが示されます。問題がある場合、このフィールドは情報メッセージを提供するために使用されます。`No records processed` (AWS Lambda がポーリングを開始していないか、ストリームにレコードがないことを示す) や、`OK` (AWS Lambda がストリームから正常にレコードを読み取り、Lambda 関数を呼び出したことを示す) などの値は、問題がないことを示しています。問題がある場合は、エラーメッセージが返されます。

イベントソースマッピングが多数ある場合、関数の name パラメータを使用して結果を絞り込みます。

```
aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords
```

## セットアップをテストする
<a name="with-ddb-final-integration-test-no-iam"></a>

エンドツーエンドエクスペリエンスをテストします。テーブルの更新を実行すると、DynamoDB はイベントレコードをストリームに書き込みます。ストリームをポーリングしている AWS Lambda は、ストリームで新しいレコードを検出し、イベントを Lambda 関数に渡して、ユーザーに代わって関数を実行します。

1. DynamoDB コンソールで、テーブルに項目を追加、更新、削除します。DynamoDB は、これらのアクションのレコードをストリームに書き込みます。

1. AWS Lambda は、ストリームをポーリングし、ストリームの更新を検出すると、ストリームで見つかったイベントデータを渡して Lambda 関数を呼び出します。

1. 関数が実行され、Amazon CloudWatch にログが作成されます。報告されたログは Amazon CloudWatch コンソールで確認できます。

## 次のステップ
<a name="with-ddb-next-steps"></a>

このチュートリアルでは、Lambda で DynamoDB ストリームイベントを処理するための基本について説明しました。本番ワークロードでは、部分的なバッチレスポンスロジックの実装を考慮し、個々のレコード障害処理の効率化を図ってください。Powertools for AWS Lambda の[バッチプロセッサユーティリティ](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)は、Python、TypeScript、.NET、Java で利用可能で、これに対する堅牢なソリューションを提供し、部分的なバッチレスポンスの複雑さを自動的に処理し、正常に処理されたレコードの再試行回数を減らします。

## リソースのクリーンアップ
<a name="cleanup"></a>

このチュートリアル用に作成したリソースは、保持しない場合は削除できます。使用しなくなった AWS リソースを削除することで、AWS アカウント アカウントに請求される料金の発生を防ぎます。

**Lambda 関数を削除するには**

1. Lambda コンソールの [[関数]](https://console.aws.amazon.com/lambda/home#/functions) ページを開きます。

1. 作成した関数を選択します。

1. **[アクション]** で、**[削除]** を選択します。

1. テキスト入力フィールドに **confirm** と入力し、**[削除]** を選択します。

**実行ロールを削除する**

1. IAM コンソールの [[ロール]](https://console.aws.amazon.com/iam/home#/roles) ページを開きます。

1. 作成した実行ロールを選択します。

1. **[削除]** を選択します。

1. テキスト入力フィールドにロールの名前を入力し、**[Delete]** (削除) を選択します。

**DynamoDB テーブルを削除するには**

1. DynamoDB コンソールで [[Tables (テーブル)] ページ](https://console.aws.amazon.com//dynamodb/home#tables:)を開きます。

1. 作成したテーブルを選択します。

1. [**削除**] を選択します。

1. テキストボックスに「**delete**」と入力します。

1. **[テーブルの削除]** を選択します。