

# DynamoDB Streams の変更データキャプチャ
<a name="Streams"></a>

 DynamoDB Streams は、DynamoDB テーブル内の項目レベルの変更に関するシーケンスを時間順にキャプチャし、その情報を最大 24 時間ログに保存します。アプリケーションは、このログにアクセスし、データ項目の変更前および変更後の内容をほぼリアルタイムで参照できます。

 保管時の暗号化では、DynamoDB Streams のデータが暗号化されます。詳細については、「[保管時の DynamoDB 暗号化](EncryptionAtRest.md)」を参照してください。

*DynamoDB Streams* は、DynamoDB テーブル内の項目に加えられた変更に関する情報の順序付けされた情報です。テーブルでストリーミングを有効にすると、DynamoDB はテーブル内のデータ項目に加えられた各変更に関する情報をキャプチャします。

アプリケーションがテーブル内の項目を作成、更新、または削除するたびに、DynamoDB Streams は変更された項目のプライマリキー属性を付けてストリーミングレコードを書き込みます。*ストリーミングレコード*には、DynamoDB テーブル内の単一の項目に加えられたデータ変更についての情報が含まれています。ストリームレコードが追加情報（変更された項目の前後のイメージ）をキャプチャするようにストリームを設定できます。

DynamoDB Streams を使用すれば、以下のことを確認できます。
+ 各ストリームレコードは、ストリームに 1 回だけ出現します。
+ DynamoDB テーブルで変更された各項目について、ストリーミングレコードは項目に対する実際の変更と同じ順序で出現します。

DynamoDB Streams は、ストリーミングレコードをほぼリアルタイムで書き込むため、これらのストリーミングを使用し、内容に基づいてアクションを実行するアプリケーションを構築できます。

**Topics**
+ [DynamoDB Streams のエンドポイント](#Streams.Endpoints)
+ [ストリームの有効化](#Streams.Enabling)
+ [ストリームの読み込みと処理](#Streams.Processing)
+ [DynamoDB Streams と有効期限 (TTL)](time-to-live-ttl-streams.md)
+ [DynamoDB Streams Kinesis Adapter を使用したストリームレコードの処理](Streams.KCLAdapter.md)
+ [DynamoDB Streams 低レベル API: Java の例](Streams.LowLevel.Walkthrough.md)
+ [DynamoDB Streams と AWS Lambda のトリガー](Streams.Lambda.md)
+ [DynamoDB Streams と Apache Flink](StreamsApacheFlink.xml.md)

## DynamoDB Streams のエンドポイント
<a name="Streams.Endpoints"></a>

AWS では、DynamoDB と DynamoDB Streams 用に個別のエンドポイントを維持しています。データベースのテーブルとインデックスを使用するには、アプリケーションが DynamoDB エンドポイントにアクセスする必要があります。DynamoDB Streams レコードを読み込んで処理するには、アプリケーションが同じリージョンの DynamoDB Streams エンドポイントにアクセスする必要があります。

DynamoDB Streams には 2 つのエンドポイントセットが用意されています。具体的には次の 2 つです。
+ **IPv4 専用エンドポイント**: `streams.dynamodb.<region>.amazonaws.com` 命名規則を持つエンドポイント。
+ **デュアルスタックエンドポイント**: IPv4 と IPv6 の両方と互換性があり、`streams-dynamodb.<region>.api.aws` 命名規則に従う新しいエンドポイント。

**注記**  
DynamoDB および DynamoDB Streams のリージョンとエンドポイントの完全なリストについては、「*AWS 全般のリファレンス*」の「[リージョンとエンドポイント](https://docs.aws.amazon.com/general/latest/gr/rande.html)」を参照してください。

AWS SDK は、DynamoDB と DynamoDB Streams 用に個別のクライアントを提供します。要件によっては、アプリケーションは、DynamoDB エンドポイント、DynamoDB Streams エンドポイント、または両方に同時にアクセスできます。両方のエンドポイントに接続するには、アプリケーションで 2 つのクライアントをインスタンス化する必要があります。1 つは DynamoDB 用、もう 1 つは DynamoDB Streams 用です。

## ストリームの有効化
<a name="Streams.Enabling"></a>

新しいテーブルでは、AWS CLI または AWS SDK 経由でそのテーブルの作成時にストリームを有効にできます。また、既存のテーブルでストリーミングを有効または無効にすることや、ストリーミングの設定を変更することができます。DynamoDB Streams は非同期的に動作するため、ストリーミングを有効にしてもテーブルのパフォーマンスに影響はありません。

DynamoDB Streams を管理する最も簡単な方法は、AWS マネジメントコンソール を使用することです。

1. AWS マネジメントコンソール にサインインして DynamoDB コンソール ([https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)) を開きます。

1. DynamoDB コンソールのダッシュボードで、[**Tables (テーブル)**] を選択して既存テーブルを選びます。

1. [**エクスポートとストリーム**] タブを選択します。

1. **[DynamoDB ストリームの詳細]** セクションで、**[オンにする]** を選択します。

1. **[DynamoDB ストリームをオンにする]** ウィンドウで、テーブルのデータが変更されるたびにストリーミングに書き込まれる情報を選択します。
   + [**キー属性のみ**] - 変更された項目のキー属性のみ。
   + **[New image]** (新規イメージ) — 変更後に表示される項目全体。
   + **[Old image]** (古いイメージ) — 変更前に表示されていた項目全体。
   + **[New and old images]** (新規イメージおよび古いイメージ) — 項目の新しいイメージと古いイメージの両方。

   すべての設定が正しいことを確認したら、**[ストリームをオンにする]** を選択します。

1. (オプション) 既存のストリーミングを無効にするには、**[DynamoDB ストリームの詳細]** で **[オフにする]** を選択します。

`CreateTable` または `UpdateTable` API オペレーションを使用して、ストリームを有効にするか、変更することもできます。ストリームの設定内容は、`StreamSpecification` パラメータにより決まります。
+ `StreamEnabled` — テーブルでストリーミングが有効 (`true`) か無効 (`false`) かを指定します。
+ `StreamViewType` — テーブル内のデータが変更されるたびにストリーミングに書き込まれる情報を指定します。
  + `KEYS_ONLY` — 変更された項目のキー属性のみ。
  + `NEW_IMAGE` — 変更後に表示される項目全体。
  + `OLD_IMAGE` — 変更前に表示されていた項目全体。
  + `NEW_AND_OLD_IMAGES` — 項目の新しいイメージと古いイメージの両方。

ストリームはいつでも有効または無効にできます。ただし、既にストリームがあるテーブルでストリームを有効にしようとした場合、`ValidationException` を受け取ります。また、ストリームのないテーブルでストリームを無効にしようとすると、`ValidationException` が発生します。

`StreamEnabled` を `true` に設定すると、一意のストリーミング記述子が割り当てられた新しいストリーミングが DynamoDB で作成されます。テーブルでストリームを無効にして再度有効にすると、新しいストリームは異なるストリーム記述子で作成されます。

各ストリームは、Amazon リソースネーム（ARN）により一意に識別されます。次に、`TestTable` という名前の DynamoDB テーブルにあるストリーミングのサンプル ARN を示します。

```
arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291
```

テーブルの最新のストリーミング記述子を調べるには、DynamoDB `DescribeTable` リクエストを発行し、レスポンスで `LatestStreamArn` 要素を探します。

**注記**  
ストリームのセットアップ後は `StreamViewType` を編集できません。セットアップ後にストリームを変更する必要がある場合は、現在のストリームを無効にして新しいストリームを作成する必要があります。

## ストリームの読み込みと処理
<a name="Streams.Processing"></a>

ストリームを読み取って処理するには、アプリケーションから DynamoDB Streams エンドポイントに接続して API リクエストを発行する必要があります。

ストリームは、*ストリームレコード*で構成されています。各ストリーミングレコードは、ストリーミングが属する DynamoDB テーブル内の 1 件のデータ変更を表しています。各ストリームレコードには、レコードがストリームに発行された順序を反映したシーケンス番号が割り当てられます。

ストリームレコードは、グループ (つまり、*シャード*) に整理されます。各シャードは、複数のストリームレコードのコンテナとして機能し、これらのレコードへのアクセスと反復処理に必要な情報が含まれています。シャード内のストリームレコードは 24 時間後に自動的に削除されます。

シャードはエフェメラルであり、必要に応じて自動的に作成および削除されます。また、任意のシャードは複数の新しいシャードに分割できます。これもまた自動的に行われます (親シャードが 1 つの子シャードのみを持つ場合もあります)。アプリケーションが複数のシャードからレコードを並列処理できるように、シャードは親テーブルで高レベルな書き込みアクティビティに応じて分割される場合があります。

ストリームを無効にすると、開かれているシャードは閉じられます。ストリーミング内のデータは 24 時間読み込み可能な状態になります。

シャードには系列 (親と子) があるため、アプリケーションは子シャードを処理する前に、必ず親シャードを処理する必要があります。これにより、ストリームレコードも正しい順序で処理されるようになります。(DynamoDB Streams Kinesis Adapter を使用している場合、これは自動的に処理されます。アプリケーションは、シャードとストリーミングレコードを正しい順序で処理します。アプリケーションの実行中に分割されたシャードに加えて、新しいシャードまたは有効期限切れのシャードは自動的に処理されます。詳細については、「[DynamoDB Streams Kinesis Adapter を使用したストリームレコードの処理](Streams.KCLAdapter.md)」を参照してください。)

次の図は、ストリーム、ストリーム内のシャード、シャード内のストリームレコードの関係を示しています。

![\[DynamoDB Streams 構造。データ変更を表すストリームレコードは、シャードに編成されます。\]](http://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/images/streams-terminology.png)


**注記**  
項目内のデータを何も変更しない `PutItem` または `UpdateItem` オペレーションを実行した場合、そのオペレーションのストリーミングレコードは DynamoDB Streams によって書き込まれ*ません*。

ストリームにアクセスしてその中のストリームレコードを処理するには、以下の操作を実行する必要があります。
+ アクセスするストリームの一意の ARN を調べます。
+ 目的のストリームレコードがストリーム内のどのシャードに含まれているかを調べます。
+ シャードにアクセスし、目的のストリームレコードを取得します。

**注記**  
最大でも 2 つを超えるプロセスが、同時に同じストリームシャードから読み込みを行うことはできません。シャードごとに 2 つを超えるリーダーがあると、スロットリングが発生する場合があります。

DynamoDB Streams API は、アプリケーションプログラム用の以下のアクションを提供します。
+  `[ListStreams](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_ListStreams.html)` — 現在のアカウントおよびエンドポイントのストリーミング記述子のリストを返します。必要に応じて、特定のテーブル名のストリーム記述子だけをリクエストできます。
+ `[DescribeStream](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_DescribeStream.html)` — ストリームに関する情報 (例: ストリームの最新ステータス、Amazon リソースネーム (ARN)、シャードの構成、対応する DynamoDB テーブル) を返します。オプションで `ShardFilter` フィールドを使用して、親シャードに関連付けられた既存の子シャードを取得できます。
+ `[GetShardIterator](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html)` — シャード内の場所を表す*シャードイテレーター*を返します。イテレータがストリーム内の最も古いポイント、最も新しいポイント、特定のポイントへのアクセスを提供することをリクエストできます。
+ `[GetRecords](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html)` — 特定のシャード内からストリーミングレコードを返します。`GetShardIterator` リクエストから返されたシャードイテレーターを指定する必要があります。

リクエストやレスポンスの例など、これらの API オペレーションの詳細な説明については、「[Amazon DynamoDB Streams API リファレンス](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Operations_Amazon_DynamoDB_Streams.html)」を参照してください。

### シャード検出
<a name="Streams.ShardDiscovery"></a>



2 つの強力な方法で、DynamoDB ストリーム内の新しいシャードを検出します。Amazon DynamoDB Streams ユーザーには、新しいシャードを追跡して識別するための 2 つの効果的な方法があります。

**ストリームトポロジ全体のポーリング**  
`DescribeStream` API を使用して、ストリームを定期的にポーリングします。これにより、作成された新しいシャードを含む、ストリーム内のすべてのシャードが返されます。時間経過に伴う結果を比較することで、新しく追加されたシャードを検出できます。

**子シャードの検出**  
`DescribeStream` API を `ShardFilter` パラメータとともに使用して、シャードのサブセットを見つけます。リクエストで親シャードを指定することで、DynamoDB Streams は直下の子シャードを返します。このアプローチは、ストリーム全体をスキャンせずにシャード系統のみを追跡する必要がある場合に役立ちます。  
DynamoDB Streams からデータを使用するアプリケーションは、この `ShardFilter` パラメータを使用して閉じたシャードの読み取りから子シャードに効率的に移行でき、`DescribeStream` API への繰り返しの呼び出しを回避して、すべての閉じたシャードと開いているシャードのシャードマップを取得してトラバースできます。これにより、親シャードが閉じられた後に子シャードをすばやく検出できるため、ストリーム処理アプリケーションの応答性とコスト効率が向上します。

どちらの方法でも、DynamoDB Streams の進化する構造を常に把握できるため、重要なデータの更新やシャードの変更を見逃すことはありません。

### DynamoDB Streams のデータ保持期限
<a name="Streams.DataRetention"></a>

DynamoDB Streams 内のすべてのデータは、24 時間保持されます。特定のテーブルの直近 24 時間のアクティビティを取得して分析できます。ただし、24 時間を超えたデータはすぐにトリミング (削除) される可能性があります。

テーブルのストリームを無効にした場合、ストリーム内のデータは 24 時間読み込み可能な状態になります。この時間が経過すると、データは期限切れになり、ストリームレコードは自動的に削除されます。既存のストリームを手動で削除するためのメカニズムはありません。保持期限 (24 時間) が切れ、すべてのストリームレコードが削除されるまで待つ必要があります。

# DynamoDB Streams と有効期限 (TTL)
<a name="time-to-live-ttl-streams"></a>

テーブルに対して Amazon DynamoDB Streams を有効にし、期限切れの項目のストリーミングレコードを処理することで、[有効期限](TTL.md) (TTL) によって削除された項目をバックアップ (または処理) できます。詳細については、「[ストリームの読み込みと処理](Streams.md#Streams.Processing)」を参照してください。

ストリームレコードにはユーザー ID フィールド `Records[<index>].userIdentity` が含まれます。

有効期限切れの後に有効期限 (TTL) プロセスによって削除された項目には、次のフィールドが含まれています。
+ `Records[<index>].userIdentity.type`

  `"Service"`
+ `Records[<index>].userIdentity.principalId`

  `"dynamodb.amazonaws.com"`

**注記**  
TTL をグローバルテーブルで使用すると、TTL が実行されたリージョンに `userIdentity` フィールドが設定されます。削除が複製されても、このフィールドは他のリージョンには設定されません。

次の JSON は 1 つのストリームレコードの関連する部分を示しています。

```
"Records": [
    {
        ...

        "userIdentity": {
            "type": "Service",
            "principalId": "dynamodb.amazonaws.com"
        }

        ...

    }
]
```

## DynamoDB Streams と Lambda を使用して TTL 削除済みアイテムをアーカイブする
<a name="streams-archive-ttl-deleted-items"></a>

[DynamoDB 有効期限 (TTL)](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html)、[DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) および [AWS Lambda](https://aws.amazon.com/lambda/) を組み合わせると、データのアーカイブを簡素化し、DynamoDB ストレージコストを削減し、コードの複雑さを軽減するのに役立ちます。ストリームコンシューマーとして Lambda を使用すると、Kinesis Client Library (KCL) などの他のコンシューマーと比較してコストが削減されるなど、多くの利点があります。Lambda を使用してイベントを消費する場合、DynamoDB ストリームの `GetRecords` API 呼び出しでは課金が発生しません。Lambda はストリームイベント内の JSON パターンを識別してイベントフィルタリングを提供できます。イベントパターンのコンテンツフィルタリングでは、最大 5 つの異なるフィルターを定義して、処理のために Lambda に送信されるイベントを制御できます。これにより、Lambda 関数の呼び出しを減らしてコードを簡素化し、全体的なコストを削減できます。

DynamoDB Streams には、`Create`、`Modify` および`Remove` アクションなどのすべてのデータ変更が含まれています。これは、アーカイブ Lambda 関数の不要な呼び出しを引き起こす可能性があります。例えば、1 時間あたり 200 万件のデータ変更がストリームに流れ込むテーブルがあり、そのうち 5% 未満が TTL プロセスによって期限切れになり、アーカイブする必要があるアイテム削除であるとします。[Lambda イベントソースフィルター](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)を使用すると、Lambda 関数は 1 時間あたり 100,000 回しか呼び出されません。イベントフィルタリングを使用した結果、イベントフィルタリングを行わなければ 200 万回の呼び出しが発生するところを、必要な呼び出しに対してのみ課金されることになります。

イベントフィルタリングは、[Lambda イベントソースマッピング](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html)に適用されます。これは、選択されたイベント (DynamoDB ストリーム) から読み取り、Lambda 関数を呼び出すリソースです。次の図は、ストリームとイベントフィルターを使用して Lambda 関数によって有効期限 (TTL) 削除済みアイテムがどのように消費されるかを示しています。

![\[TTL プロセスによって削除された項目は、ストリームとイベントフィルターを使用する Lambda 関数を開始します。\]](http://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/images/streams-lambda-ttl.png)


### DynamoDB の有効期限 (TTL) イベントフィルターパターン
<a name="ttl-event-filter-pattern"></a>

イベントソースマッピングの[フィルター条件](https://docs.aws.amazon.com/lambda/latest/dg/API_FilterCriteria.html)に次の JSON を追加することで、TTL 削除済みアイテムに対してのみ Lambda 関数の呼び出しを許可します。

```
{
    "Filters": [
        {
            "Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }
        }
    ]
}
```

### AWS Lambda イベントソースマッピングを作成します。
<a name="create-event-source-mapping"></a>

次のコードスニペットを使用して、テーブルの DynamoDB ストリームに接続できる、フィルター処理されたイベントソースマッピングを作成します。各コードブロックには、イベントフィルターパターンが含まれます。

------
#### [ AWS CLI ]

```
aws lambda create-event-source-mapping \
--event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \
--batch-size 10 \
--enabled \
--function-name test_func \
--starting-position LATEST \
--filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
```

------
#### [ Java ]

```
LambdaClient client = LambdaClient.builder()
        .region(Region.EU_WEST_1)
        .build();

Filter userIdentity = Filter.builder()
        .pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}")
        .build();

FilterCriteria filterCriteria = FilterCriteria.builder()
        .filters(userIdentity)
        .build();

CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder()
        .eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000")
        .batchSize(10)
        .enabled(Boolean.TRUE)
        .functionName("test_func")
        .startingPosition("LATEST")
        .filterCriteria(filterCriteria)
        .build();

try{
    CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest);
    System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn());

}catch (ServiceException e){
    System.out.println(e.getMessage());
}
```

------
#### [ Node ]

```
const client = new LambdaClient({ region: "eu-west-1" });

const input = {
    EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000",
    BatchSize: 10,
    Enabled: true,
    FunctionName: "test_func",
    StartingPosition: "LATEST",
    FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] }
}

const command = new CreateEventSourceMappingCommand(input);

try {
    const results = await client.send(command);
    console.log(results);
} catch (err) {
    console.error(err);
}
```

------
#### [ Python ]

```
session = boto3.session.Session(region_name = 'eu-west-1')
client = session.client('lambda')

try:
    response = client.create_event_source_mapping(
        EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000',
        BatchSize=10,
        Enabled=True,
        FunctionName='test_func',
        StartingPosition='LATEST',
        FilterCriteria={
            'Filters': [
                {
                    'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"
                },
            ]
        }
    )
    print(response)
except Exception as e:
    print(e)
```

------
#### [ JSON ]

```
{
  "userIdentity": {
     "type": ["Service"],
     "principalId": ["dynamodb.amazonaws.com"]
   }
}
```

------

# DynamoDB Streams Kinesis Adapter を使用したストリームレコードの処理
<a name="Streams.KCLAdapter"></a>

Amazon DynamoDB からのストリーミングを使用するには、Amazon Kinesis Adapter を使用することをお勧めします。DynamoDB Streams API は、意図的に Kinesis Data Streams と似せています。どちらのサービスでも、データストリーミングは、ストリーミングレコードのコンテナであるシャードで構成されています。両方のサービスの API には `ListStreams`、`DescribeStream`、`GetShards`、および `GetShardIterator` オペレーションが含まれています。(これらの DynamoDB Streams アクションは、Kinesis Data Streams の対応するアクションと似ていますが、100% 同一ではありません)。

DynamoDB Streams ユーザーは、KCL 内のデザインパターンを使用して DynamoDB Streams のシャードとストリーミングレコードを処理できます。これを行うには、DynamoDB Streams Kinesis Adapter を使用します。Kinesis Adapter は Kinesis Data Streams インターフェイスを実装しているため、KCL を使用して DynamoDB Streams からのレコードを消費および処理することができます。DynamoDB Streams Kinesis アダプターのセットアップとインストールの方法については、「[GitHub リポジトリ](https://github.com/awslabs/dynamodb-streams-kinesis-adapter)」を参照してください。

Kinesis Client Library (KCL) を使用して、Kinesis Data Streams のアプリケーションを書き込むことができます。KCL は、低レベルの Kinesis Data Streams API の上で役に立つ抽象化を提供することによりコーディングを簡素化します。KCL の詳細については、「[Amazon Kinesis Data Streams デベロッパーガイド](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)」の「*Kinesis Client Library を使用したコンシューマーの開発*」を参照してください。

DynamoDB では、AWS SDK for Java v2.x で KCL バージョン 3.x を使用することをお勧めします。現在の DynamoDB Streams Kinesis Adapter バージョン 1.x と AWS SDK for AWS SDK for Java v1.x は、「[AWS SDK とツールのメンテナンスポリシー](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html)」に従って、移行期間中、意図したとおりライフサイクルを通じて引き続き完全にサポートされます。

**注記**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを強くお勧めします。最新の KCL バージョンを確認するには、GitHub の「[Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)」ページを参照してください。最新の KCL バージョンの詳細については、「[Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html)」を参照してください。KCL 1.x から KCL 3.x への移行については、「KCL 1.x から KCL 3.x への移行」を参照してください。

次の図表は、これらのライブラリがどのように連携するかを示しています。

![\[DynamoDB Streams レコードを処理するための DynamoDB Streams、Kinesis Data Streams、および KCL 間のインタラクション。\]](http://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


DynamoDB Streams Kinesis Adapter を使用すれば、DynamoDB Streams エンドポイントにシームレスに誘導された API コールを使用して、KCL インターフェイスに対して開発を開始できます。

アプリケーションは起動時に KCL をコールしてワーカーをインスタンス化します。ワーカーに、アプリケーションの構成情報 (ストリーミング記述子や AWS 認証情報、および提供するレコードプロセッサクラスの名前など) を提供する必要があります。レコードプロセッサでコードを実行すると、ワーカーは次のタスクを実行します。
+ ストリームに接続する
+ ストリーミング内のシャードを列挙します。
+ ストリーム内の閉じた親シャードの子シャードをチェックして列挙します
+ シャードと他のワーカー (存在する場合) の関連付けを調整する
+ レコードプロセッサで管理する各シャードのレコードプロセッサをインスタンス化する
+ ストリーミングからレコードを取得します。
+ 高スループット時の GetRecords API 呼び出しレートをスケーリングします (キャッチアップモードが設定されている場合)
+ 対応するレコードプロセッサにレコードを送信する
+ 処理されたレコードのチェックポイントを作成する
+ ワーカーのインスタンス数が変化したときに、シャードとワーカーの関連付けを調整する
+ シャードが分割されたときに、シャードとワーカーの関連付けを調整します。

KCL アダプターは、一時的なスループットの増加を処理するための自動呼び出しレート調整機能であるキャッチアップモードをサポートしています。ストリーム処理の遅延が設定可能なしきい値 (デフォルトは 1 分) を超えると、キャッチアップモードでは GetRecords API の呼び出し頻度が設定可能な値 (デフォルトは 3 倍) でスケーリングされ、レコードの取得速度が上がり、遅延が減少すると通常の状態に戻ります。これは、デフォルトのポーリングレートを使用しているコンシューマーが DynamoDB 書き込みアクティビティに圧迫される可能性がある高スループット期間中に役立ちます。キャッチアップモードは、`catchupEnabled` 設定パラメータ (デフォルトは false) を使用して有効にできます。

**注記**  
こちらに記載されている KCL 概念の説明については、「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Kinesis Client Library を使用したコンシューマーの開発](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)」を参照してください。  
AWS Lambda でのストリームの使用の詳細については、「[DynamoDB Streams と AWS Lambda のトリガー](Streams.Lambda.md)」を参照してください。

# KCL 1.x から KCL 3.x への移行
<a name="streams-migrating-kcl"></a>

## 概要
<a name="migrating-kcl-overview"></a>

このガイドでは、コンシューマーアプリケーションを KCL 1.x から KCL 3.x に移行する手順について説明します。KCL 1.x と KCL 3.x のアーキテクチャの違いにより、移行では互換性を確保するために複数のコンポーネントを更新する必要があります。

KCL 1.x は、KCL 3.x とは異なるクラスとインターフェイスを使用します。まずレコードプロセッサ、レコードプロセッサファクトリ、ワーカークラスを KCL 3.x 互換形式に移行し、KCL 1.x から KCL 3.x への移行手順に従う必要があります。

## 移行手順
<a name="migration-steps"></a>

**Topics**
+ [ステップ 1: レコードプロセッサを移行する](#step1-record-processor)
+ [ステップ 2: レコードプロセッサファクトリーを移行する](#step2-record-processor-factory)
+ [ステップ 3: ワーカーを移行する](#step3-worker-migration)
+ [ステップ 4: KCL 3.x 設定の概要と推奨事項](#step4-configuration-migration)
+ [ステップ 5: KCL 2.x から KCL 3.x に移行する](#step5-kcl2-to-kcl3)

### ステップ 1: レコードプロセッサを移行する
<a name="step1-record-processor"></a>

以下の例は、KCL 1.x DynamoDB Streams Kinesis Adapter に実装されたレコードプロセッサを示しています。

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**RecordProcessor クラスを移行するには**

1. インターフェイスを `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` および `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` から `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor` に変更します。以下に例を示します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. `initialize` メソッド `processRecords` とメソッドの import ステートメントを更新します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. `shutdownRequested` メソッドを以下の新しいメソッドに置き換えます。`leaseLost`、`shardEnded`、および `shutdownRequested`。

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

以下に示しているのは、レコードプロセッサのクラスの更新されたバージョンです。

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**注記**  
DynamoDB Streams Kinesis Adapter は SDKv2 レコードモデルを使用するようになりました。SDKv2 では、複雑な `AttributeValue` オブジェクト (`BS`、`NS`、`M`、`L`、`SS`) が null を返すことはありません。`hasBs()`、`hasNs()`、`hasM()`、`hasL()`、`hasSs()` メソッドを使用して、これらの値が存在するかどうかを確認します。

### ステップ 2: レコードプロセッサファクトリーを移行する
<a name="step2-record-processor-factory"></a>

レコードプロセッサファクトリーは、リースが取得された際にレコードプロセッサの作成を担当します。以下に示しているのは、KCL 1.x ファクトリーの例です。

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**`RecordProcessorFactory` を移行するには**
+ 実装されているインターフェイスを `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` から `software.amazon.kinesis.processor.ShardRecordProcessorFactory` に変更します。以下に例を示します。

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

以下は、3.0 のレコードプロセッサファクトリーの例です。

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### ステップ 3: ワーカーを移行する
<a name="step3-worker-migration"></a>

バージョン 3.0 の KCL では、新しいクラス **Scheduler** によって **Worker** クラスが置き換えられます。KCL 1.x のワーカーの例を次に示します。

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**ワーカーを移行するには**

1. `import` クラスの `Worker` ステートメントを `Scheduler` クラスと `ConfigsBuilder` クラスのインポートステートメントに変更します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. `StreamTracker` をインポートし、`StreamsWorkerFactory` のインポートを `StreamsSchedulerFactory` に変更します。

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. アプリケーションを起動するポジションを選択します。`TRIM_HORIZON`、`LATEST` のいずれかになります。

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. `StreamTracker` インスタンスを作成します。

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. `AmazonDynamoDBStreamsAdapterClient` オブジェクトを作成します。

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. `ConfigsBuilder` オブジェクトを作成します。

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. 次の例に示すように、`ConfigsBuilder` を使用して `Scheduler` を作成します。

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**重要**  
この `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` 設定は、KCL v2 と v3 ではなく、KCL v3 と KCL v1 の DynamoDB Streams Kinesis Adapter 間の互換性を維持します。

### ステップ 4: KCL 3.x 設定の概要と推奨事項
<a name="step4-configuration-migration"></a>

KCL 3.x に関連する KCL 1.x 後に導入された設定の詳細については、「[KCL 設定](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html)」と「[KCL 移行クライアント設定](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration)」を参照してください。

**重要**  
KCL 3.x 以降のバージョンでは、`checkpointConfig`、`coordinatorConfig`、`leaseManagementConfig`、`metricsConfig`、`processorConfig`、および `retrievalConfig` オブジェクトを直接作成する代わりに、スケジューラの初期化の問題を回避するために、`ConfigsBuilder` を使用して設定を行うことをお勧めします。`ConfigsBuilder` を使用すると、KCL アプリケーションをより柔軟で保守性の高い方法で構成できます。

#### KCL 3.x でデフォルト値を更新する設定
<a name="kcl3-configuration-overview"></a>

`billingMode`  
KCL バージョン 1.x では、`billingMode` のデフォルト値は `PROVISIONED` に設定されます。ただし、KCL バージョン 3.x では、デフォルトの `billingMode` は `PAY_PER_REQUEST` (オンデマンドモード) です。使用量に基づいて容量を自動的に調整するには、リーステーブルのオンデマンドキャパシティモードを使用することをお勧めします。リーステーブルにプロビジョンドキャパシティを使用するガイダンスについては、「[プロビジョンドキャパシティモードのリーステーブルのベストプラクティス](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html)」を参照してください。

`idleTimeBetweenReadsInMillis`  
KCL バージョン 1.x では、`idleTimeBetweenReadsInMillis` のデフォルト値は 1,000 (または 1 秒) に設定されています。KCL バージョン 3.x は i`dleTimeBetweenReadsInMillis` のデフォルト値を 1,500 (または 1.5 秒) に設定しますが、Amazon DynamoDB Streams Kinesis Adapter はデフォルト値を 1,000 (または 1 秒) に上書きします。

#### KCL 3.x の新しい設定
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
この設定では、新しく検出されたシャードが処理を開始するまでの時間間隔を定義し、1.5 × `leaseAssignmentIntervalMillis` として計算されます。この設定が明示的に設定されていない場合、時間間隔はデフォルトで 1.5 × `failoverTimeMillis` に設定されます。新しいシャードの処理には、リーステーブルをスキャンし、リーステーブルのグローバルセカンダリインデックス (GSI) をクエリする必要があります。`leaseAssignmentIntervalMillis` を小さくすると、これらのスキャンおよびクエリオペレーションの頻度が増加し、DynamoDB のコストが高くなります。新しいシャードの処理の遅延を最小限に抑えるために、この値を 2,000 (または 2 秒) に設定することをお勧めします。

`shardConsumerDispatchPollIntervalMillis`  
この設定では、シャードコンシューマーによる連続するポーリング間の間隔を定義して、状態遷移をトリガーします。KCL バージョン 1.x では、この動作は `idleTimeInMillis` パラメータによって制御され、設定可能な設定として公開されませんでした。KCL バージョン 3.x では、KCL バージョン 1.x のセットアップで ` idleTimeInMillis` で使用される値と一致するようにこの設定を行うことをお勧めします。

### ステップ 5: KCL 2.x から KCL 3.x に移行する
<a name="step5-kcl2-to-kcl3"></a>

最新の Kinesis Client Library (KCL) バージョンとのスムーズな移行と互換性を確保するには、[KCL 2.x から KCL 3.x へのアップグレード](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics)に関する移行ガイドの手順のステップ 5～8 に従います。

一般的な KCL 3.x のトラブルシューティングの問題については、「[Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html)」を参照してください。

# 以前のバージョンにロールバックする
<a name="kcl-migration-rollback"></a>

このトピックでは、コンシューマーアプリケーションを以前の KCL バージョンにロールバックする方法について説明します。このロールバックプロセスは次の 2 つのステップで構成されています。

1. [KCL 移行ツール](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)を実行します。

1. 以前の KCL バージョンコードを再デプロイします。

## ステップ 1: KCL 移行ツールを実行する
<a name="kcl-migration-rollback-step1"></a>

以前の KCL バージョンにロールバックする必要がある場合は、KCL 移行ツールを実行する必要があります。このツールは 2 つの重要なタスクを実行します。
+ これにより、DynamoDB のワーカーメトリクステーブルと呼ばれるメタデータテーブルとリーステーブルのグローバルセカンダリインデックスが削除されます。これらのアーティファクトは KCL 3.x によって作成されますが、以前のバージョンにロールバックするときには必要ありません。
+ これにより、すべてのワーカーが KCL 1.x と互換性のあるモードで実行され、以前の KCL バージョンで使用されていた負荷分散アルゴリズムの使用が開始されます。KCL 3.x の新しい負荷分散アルゴリズムに問題がある場合、この問題はすぐに軽減されます。

**重要**  
DynamoDB のコーディネーター状態テーブルが存在し、移行、ロールバック、ロールフォワードプロセス中に削除されていない必要があります。

**注記**  
コンシューマーアプリケーションのすべてのワーカーが、一度に同じ負荷分散アルゴリズムを使用することが重要です。KCL 移行ツールを使用すると、KCL 3.x コンシューマーアプリケーション内のすべてのワーカーが KCL 1.x 互換モードに切り替えられ、アプリケーションが以前の KCL バージョンにロールバックしている間、すべてのワーカーが同じ負荷分散アルゴリズムを実行します。

[KCL 移行ツール](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)は、[KCL GitHub リポジトリ](https://github.com/awslabs/amazon-kinesis-client/tree/master)のスクリプトディレクトリでダウンロードできます。コーディネーター状態テーブル、ワーカーメトリクステーブル、リーステーブルに書き込むための適切なアクセス許可を持つワーカーまたはホストからスクリプトを実行します。KCL コンシューマーアプリケーションに適切な [IAM アクセス許可](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html)が設定されていることを確認します。指定されたコマンドを使用して、KCL アプリケーションごとにスクリプトを 1 回だけ実行します。

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### パラメータ
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
*リージョン*をお客様の AWS リージョンに置き換えてください。

`--application_name`  
このパラメータは、DynamoDB メタデータテーブル (リーステーブル、コーディネーター状態テーブル、ワーカーメトリクステーブル) にデフォルト名を使用している場合に必要です。これらのテーブルにカスタム名を指定している場合は、このパラメータを省略できます。*applicationName* を実際の KCL アプリケーションの名前に置き換えます。カスタム名が指定されていない場合、ツールはこの名前を使用してデフォルトのテーブル名を取得します。

`--lease_table_name`  
このパラメータは、KCL 設定でリーステーブルのカスタム名を設定している場合に必要です。デフォルトのテーブル名を使用している場合は、このパラメータを省略できます。*leaseTableName* をリーステーブルに指定したカスタムテーブル名に置き換えます。

`--coordinator_state_table_name`  
このパラメータは、KCL 設定でコーディネーター状態テーブルのカスタム名を設定している場合に必要です。デフォルトのテーブル名を使用している場合は、このパラメータを省略できます。*coordinatorStateTableName* を、コーディネーター状態テーブルに指定したカスタムテーブル名に置き換えます。

`--worker_metrics_table_name`  
このパラメータは、KCL 設定でワーカーメトリクステーブルのカスタム名を設定している場合に必要です。デフォルトのテーブル名を使用している場合は、このパラメータを省略できます。*workerMetricsTableName* を、ワーカーメトリクステーブルに指定したカスタムテーブル名に置き換えてください。

## ステップ 2: 以前の KCL バージョンでコードを再デプロイする
<a name="kcl-migration-rollback-step2"></a>

**重要**  
KCL 移行ツールによって生成された出力でバージョン 2.x に言及する場合は、KCL バージョン 1.x を参照していると解釈する必要があります。スクリプトを実行しても完全なロールバックは行われず、負荷分散アルゴリズムを KCL バージョン 1.x で使用されるものに切り替えるだけです。

ロールバック用に KCL 移行ツールを実行すると、次のいずれかのメッセージが表示されます。

メッセージ 1  
「ロールバックが完了しました。アプリケーションは 2x と互換性のある機能を実行していました。以前の KCL バージョンでコードをデプロイして、以前のアプリケーションバイナリにロールバックしてください。」  
**必要なアクション:** これは、ワーカーが KCL 1.x 互換モードで実行されていたことを意味します。以前の KCL バージョンのコードをワーカーに再デプロイしてください。

メッセージ 2  
「ロールバックが完了しました。KCL アプリケーションは 3x の機能を実行しており、2x と互換性のある機能にロールバックします。しばらくしても移行が見られない場合は、以前の KCL バージョンでコードをデプロイして、以前のアプリケーションバイナリにロールバックしてください。」  
**必要なアクション:** これはワーカーが KCL 3.x モードで実行され、KCL 移行ツールがすべてのワーカーを KCL 1.x 互換モードに切り替えたことを意味します。以前の KCL バージョンのコードをワーカーに再デプロイしてください。

メッセージ 3  
「アプリケーションは既にロールバックされています。削除できる KCLv3 リソースは、移行でアプリケーションをロールフォワードできるようになるまで料金が発生しないようにクリーンアップされました。」  
**必要なアクション:** これは、ワーカーが既にロールバックされ、KCL 1.x 互換モードで実行されていることを意味します。以前の KCL バージョンのコードをワーカーに再デプロイしてください。

# ロールバック後に KCL 3.x にロールフォワードする
<a name="kcl-migration-rollforward"></a>

このトピックでは、ロールバック後にコンシューマーアプリケーションを KCL 3.x にロールフォワードする方法について説明します。ロールフォワードが必要な場合は、2 ステップのプロセスを完了する必要があります。

1. [KCL 移行ツール](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)を実行します。

1. KCL 3.x を使用してコードをデプロイします。

## ステップ 1: KCL 移行ツールを実行する
<a name="kcl-migration-rollforward-step1"></a>

次のコマンドを使用して KCL 移行ツールを実行し、KCL 3.x にロールフォワードします。

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### パラメータ
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
*リージョン*をお客様の AWS リージョンに置き換えてください。

`--application_name`  
このパラメータは、コーディネーター状態テーブルにデフォルト名を使用している場合に必要です。コーディネーター状態テーブルにカスタム名を指定している場合は、このパラメータを省略できます。*applicationName* を実際の KCL アプリケーションの名前に置き換えます。カスタム名が指定されていない場合、ツールはこの名前を使用してデフォルトのテーブル名を取得します。

`--coordinator_state_table_name`  
このパラメータは、KCL 設定でコーディネーター状態テーブルのカスタム名を設定している場合に必要です。デフォルトのテーブル名を使用している場合は、このパラメータを省略できます。*coordinatorStateTableName* を、コーディネーター状態テーブルに指定したカスタムテーブル名に置き換えます。

移行ツールをロールフォワードモードで実行すると、KCL は KCL 3.x に必要な次の DynamoDB リソースを作成します。
+ リーステーブルのグローバルセカンダリインデックス
+ ワーカーメトリクステーブル

## ステップ 2: KCL 3.x を使用してコードをデプロイする
<a name="kcl-migration-rollforward-step2"></a>

ロールフォワードの KCL 移行ツールを実行したら、KCL 3.x を使用してコードをワーカーにデプロイします。移行を完了するには、「[ステップ 8: 移行を完了する](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish)」を参照してください。

# チュートリアル: DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough"></a>

このセクションは、Amazon Kinesis Client Library と Amazon DynamoDB Streams Kinesis Adapter を使用する Java アプリケーションのチュートリアルです。アプリケーションには、データレプリケーションの例が表示されます。データレプリケーションでは、1 つのテーブルからの書き込みアクティビティが 2 番目のテーブルに適用され、両方のテーブルの内容が同期されます。ソースコードについては、「[完成したプログラム: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)」を参照してください。

このプログラムでは、次のような処理を実行します。

1. `KCL-Demo-src` と `KCL-Demo-dst` という 2 つの DynamoDB テーブルを作成します。これらの各テーブルでは、ストリームが有効になっています。

1. 項目を追加、更新、削除することで、ソーステーブルで更新アクティビティを生成します。これにより、データがテーブルのストリームに書き込まれます。

1. ストリーミングからレコードを読み込んで、DynamoDB リクエストとして再構築し、ターゲットテーブルにリクエストを適用します。

1. ソーステーブルとターゲットテーブルをスキャンし、内容が同じであることを確認します。

1. テーブルを削除してクリーンアップします。

これらのステップについては次のセクションで説明します。完成したアプリケーションは、チュートリアルの最後に示します。

**Topics**
+ [ステップ 1: DynamoDB テーブルを作成する](#Streams.KCLAdapter.Walkthrough.Step1)
+ [ステップ 2: ソーステーブルに更新アクティビティを生成する](#Streams.KCLAdapter.Walkthrough.Step2)
+ [ステップ 3: ストリームを処理する](#Streams.KCLAdapter.Walkthrough.Step3)
+ [ステップ 4: 両方のテーブルの内容が同じであることを確認する](#Streams.KCLAdapter.Walkthrough.Step4)
+ [ステップ 5：クリーンアップ](#Streams.KCLAdapter.Walkthrough.Step5)
+ [完成したプログラム: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## ステップ 1: DynamoDB テーブルを作成する
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

最初のステップでは、2 つの DynamoDB テーブル (送信元テーブルと送信先テーブル) を作成します。ソーステーブルのストリームにある `StreamViewType` は `NEW_IMAGE` です。これは、このテーブルで項目が変更されると必ず、イメージの "後の" 項目がストリームに書き込まれることを意味します。このようにして、ストリームはテーブル内のすべての書き込みアクティビティを記録します。

次の例は、両方のテーブルを作成するためのコードを示しています。

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## ステップ 2: ソーステーブルに更新アクティビティを生成する
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

次のステップでは、ソーステーブルにいくつかの書き込みアクティビティを生成します。このアクティビティの実行中、ソーステーブルのストリームもほぼリアルタイムで更新されます。

アプリケーションは、データを書き込むための `PutItem`、`UpdateItem`、および `DeleteItem` API オペレーションを呼び出すメソッドを持つヘルパークラスを定義します。次の例は、これらのメソッドの使用方法を示しています。

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## ステップ 3: ストリームを処理する
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

ここでは、プログラムがストリームの処理を開始します。DynamoDB Streams Kinesis Adapter は、低レベルの DynamoDB Streams コールを行わなくてもコードが KCL を十分に使用できるように、KCL と DynamoDB Streams エンドポイントの間の透過的なレイヤーとして機能します。このプログラムでは次のタスクを実行しています。
+ KCL インターフェイス定義に従ったメソッド（`StreamsRecordProcessor`、`initialize`、`processRecords`）を使用して、レコードプロセッサクラス `shutdown` を定義します。`processRecords` メソッドには、ソーステーブルのストリームからの読み込みとターゲットテーブルへの書き込みに必要なロジックが含まれています。
+ レコードプロセッサクラスのクラスファクトリを定義します（`StreamsRecordProcessorFactory`）。これは、KCL を使用する Java プログラムに必要です。
+ クラスファクトリに関連付けられた新しい KCL `Worker` をインスタンス化します。
+ レコード処理が完了すると、`Worker` をシャットダウンします。

必要に応じて、Streams KCL Adapter 設定でキャッチアップモードを有効にして、ストリーム処理の遅延が 1 分 (デフォルト) を超えたときに GetRecords API 呼び出しレートを 3 倍 (デフォルト) に自動的にスケーリングし、ストリームコンシューマーがテーブル内の高スループットのスパイクを処理できるようにします。

KCL インターフェイス定義の詳細については、「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Kinesis Client Library を使用したコンシューマーの開発](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)」を参照してください。

次の例は、`StreamsRecordProcessor` におけるメインループを示しています。`case` ステートメントは、ストリームレコードに出現する `OperationType` に基づいて、実行するアクションを決定します。

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## ステップ 4: 両方のテーブルの内容が同じであることを確認する
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

この時点で、ソーステーブルとターゲットテーブルの内容が同期されています。アプリケーションは、両方のテーブルに対して `Scan` リクエストを発行し、内容が実際に同じであることを確認します。

`DemoHelper` クラスには、低レベルの `ScanTable` API を呼び出す `Scan` メソッドが含まれています。次の例は、その使用方法を示しています。

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## ステップ 5：クリーンアップ
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

デモは完了したため、アプリケーションによりソーステーブルとターゲットテーブルが削除されます。次のコード例を参照してください。テーブルが削除されても、そのストリームは最大 24 時間使用可能です。その後、自動的に削除されます。

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# 完成したプログラム: DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

次に、[チュートリアル: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.md) で説明したタスクを実行する、完成した Java プログラムを次に示します。実行すると、次のような出力が表示されます。

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**重要**  
 このプログラムを実行するには、クライアントアプリケーションがポリシーを使用して DynamoDB および Amazon CloudWatch にアクセスできることを確認します。詳細については、「[DynamoDB のアイデンティティベースのポリシー](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies)」を参照してください。

ソースコードは、4 つの `.java` ファイルから構成されています。このプログラムを構築するには、Amazon Kinesis Client Library (KCL) 3.x と AWS SDK for Java v2 を含む次の依存関係を推移的な依存関係として追加します。

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

ソースファイルは次のとおりです。
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```

# DynamoDB Streams 低レベル API: Java の例
<a name="Streams.LowLevel.Walkthrough"></a>

**注記**  
このページのコードはすべてを網羅していないため、Amazon DynamoDB Streams を使用するすべてのシナリオに対応しているわけではありません。DynamoDB からストリーミングレコードを使用する場合は、[DynamoDB Streams Kinesis Adapter を使用したストリームレコードの処理](Streams.KCLAdapter.md) で説明されているとおり、Kinesis Client Library (KCL) を使用し、Amazon Kinesis Adapter を介して行うことを推奨します。

このセクションには、動作中の DynamoDB Streams を示す Java プログラムが含まれています。このプログラムでは、次のような処理を実行します。

1. ストリーミングが有効になった DynamoDB テーブルを作成します。

1. このテーブルのストリーム設定を記述します。

1. テーブル内のデータを変更します。

1. ストリーム内のシャードを記述します。

1. シャードからストリームレコードを読み込みます。

1. 子シャードを取得し、レコードの読み取りを続行します。

1. クリーンアップします。

プログラムを実行すると、以下のような出力が表示されます。

```
Testing Streams Demo
Creating an Amazon DynamoDB table TestTableForStreams with a simple primary key: Id
Waiting for TestTableForStreams to be created...
Current stream ARN for TestTableForStreams: arn:aws:dynamodb:us-east-2:123456789012:table/TestTableForStreams/stream/2018-03-20T16:49:55.208
Stream enabled: true
Update view type: NEW_AND_OLD_IMAGES

Performing write activities on TestTableForStreams
Processing item 1 of 100
Processing item 2 of 100
Processing item 3 of 100
...
Processing item 100 of 100
Shard: {ShardId: shardId-1234567890-...,SequenceNumberRange: {StartingSequenceNumber: 100002572486797508907,},}
    Shard iterator: EjYFEkX2a26eVTWe...
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2000001584047545833909, SizeBytes=22, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2100003604869767892701, SizeBytes=55, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, SequenceNumber=2200001099771112898434, SizeBytes=36, StreamViewType=NEW_AND_OLD_IMAGES)
...
Deleting the table...
Table StreamsDemoTable deleted.
Demo complete
```

**Example 例**  

```
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter;

public class StreamsLowLevelDemo {


    public static void main(String[] args) {
        final String usage = "Testing Streams Demo";
        try {
            System.out.println(usage);

            String tableName = "StreamsDemoTable";
            String key = "Id";
            System.out.println("Creating an Amazon DynamoDB table " + tableName + " with a simple primary key: " + key);
            Region region = Region.US_WEST_2;
            DynamoDbClient ddb = DynamoDbClient.builder()
                    .region(region)
                    .build();

            DynamoDbStreamsClient ddbStreams = DynamoDbStreamsClient.builder()
                    .region(region)
                    .build();
            DescribeTableRequest describeTableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
            TableDescription tableDescription = null;
            try{
                tableDescription = ddb.describeTable(describeTableRequest).table();
            }catch (Exception e){
                System.out.println("Table " + tableName + " does not exist.");
                tableDescription = createTable(ddb, tableName, key);
            }

            // Print the stream settings for the table
            String streamArn = tableDescription.latestStreamArn();
           
            StreamSpecification streamSpec = tableDescription.streamSpecification();
            System.out.println("Current stream ARN for " + tableDescription.tableName() + ": " +
                   streamArn);
            System.out.println("Stream enabled: " + streamSpec.streamEnabled());
            System.out.println("Update view type: " + streamSpec.streamViewType());
            System.out.println();
            // Generate write activity in the table
            System.out.println("Performing write activities on " + tableName);
            int maxItemCount = 100;
            for (Integer i = 1; i <= maxItemCount; i++) {
                System.out.println("Processing item " + i + " of " + maxItemCount);
                // Write a new item
                putItemInTable(key, i, tableName, ddb);
                // Update the item
                updateItemInTable(key, i, tableName, ddb);
                // Delete the item
                deleteDynamoDBItem(key, i, tableName, ddb);
            }

            // Process Stream
            processStream(streamArn, maxItemCount, ddb, ddbStreams, tableName);

            // Delete the table
            System.out.println("Deleting the table...");
            DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                    .tableName(tableName)
                    .build();
            ddb.deleteTable(deleteTableRequest);
            System.out.println("Table " + tableName + " deleted.");
            System.out.println("Demo complete");
            ddb.close();
        } catch (Exception e) {
            System.out.println("Error: " + e.getMessage());
        }
    }

    private static void processStream(String streamArn, int maxItemCount, DynamoDbClient ddb, DynamoDbStreamsClient ddbStreams, String tableName) {
        // Get all the shard IDs from the stream. Note that DescribeStream returns
        // the shard IDs one page at a time.
        String lastEvaluatedShardId = null;
        do {
            DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
                    .streamArn(streamArn)
                    .exclusiveStartShardId(lastEvaluatedShardId).build();
            DescribeStreamResponse describeStreamResponse = ddbStreams.describeStream(describeStreamRequest);

            List<Shard> shards = describeStreamResponse.streamDescription().shards();

            // Process each shard on this page

            fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, shards);

            // If LastEvaluatedShardId is set, then there is
            // at least one more page of shard IDs to retrieve
            lastEvaluatedShardId = describeStreamResponse.streamDescription().lastEvaluatedShardId();

        } while (lastEvaluatedShardId != null);

    }

    private static void fetchShardsAndReadRecords(String streamArn, int maxItemCount, DynamoDbStreamsClient ddbStreams, List<Shard> shards) {
        for (Shard shard : shards) {
            String shardId = shard.shardId();
            System.out.println("Shard: " + shard);

            // Get an iterator for the current shard
            GetShardIteratorRequest shardIteratorRequest = GetShardIteratorRequest.builder()
                    .streamArn(streamArn).shardId(shardId)
                    .shardIteratorType(ShardIteratorType.TRIM_HORIZON).build();

            GetShardIteratorResponse getShardIteratorResult = ddbStreams.getShardIterator(shardIteratorRequest);

            String currentShardIter = getShardIteratorResult.shardIterator();

            // Shard iterator is not null until the Shard is sealed (marked as READ_ONLY).
            // To prevent running the loop until the Shard is sealed, we process only the
            // items that were written into DynamoDB and then exit.
            int processedRecordCount = 0;
            while (currentShardIter != null && processedRecordCount < maxItemCount) {
                // Use the shard iterator to read the stream records
                GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder()
                        .shardIterator(currentShardIter).build();
                GetRecordsResponse getRecordsResult = ddbStreams.getRecords(getRecordsRequest);
                List<Record> records = getRecordsResult.records();
                for (Record record : records) {
                    System.out.println("        " + record.dynamodb());
                }
                processedRecordCount += records.size();
                currentShardIter = getRecordsResult.nextShardIterator();
            }
            if (currentShardIter == null){
                System.out.println("Shard has been fully processed. Shard iterator is null.");
                System.out.println("Fetch the child shard to continue processing instead of bulk fetching all shards");
                DescribeStreamRequest describeStreamRequestForChildShards = DescribeStreamRequest.builder()
                        .streamArn(streamArn)
                        .shardFilter(ShardFilter.builder()
                                .type(ShardFilterType.CHILD_SHARDS)
                                .shardId(shardId).build())
                        .build();
                DescribeStreamResponse describeStreamResponseChildShards = ddbStreams.describeStream(describeStreamRequestForChildShards);
                fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, describeStreamResponseChildShards.streamDescription().shards());
            }
        }
    }

    private static void putItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());
        item.put("Message", AttributeValue.builder()
                .s("New Item!")
                .build());
        PutItemRequest request = PutItemRequest.builder()
                .tableName(tableName)
                .item(item)
                .build();
        ddb.putItem(request);
    }

    private static void updateItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {

        HashMap<String, AttributeValue> itemKey = new HashMap<>();
        itemKey.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());


        HashMap<String, AttributeValueUpdate> updatedValues = new HashMap<>();
        updatedValues.put("Message", AttributeValueUpdate.builder()
                .value(AttributeValue.builder().s("This is an updated item").build())
                .action(AttributeAction.PUT)
                .build());

        UpdateItemRequest request = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(itemKey)
                .attributeUpdates(updatedValues)
                .build();
        ddb.updateItem(request);
    }

    public static void deleteDynamoDBItem(String key, Integer i, String tableName, DynamoDbClient ddb) {
        HashMap<String, AttributeValue> keyToGet = new HashMap<>();
        keyToGet.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());

        DeleteItemRequest deleteReq = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(keyToGet)
                .build();
        ddb.deleteItem(deleteReq);
    }

    public static TableDescription createTable(DynamoDbClient ddb, String tableName, String key) {
        DynamoDbWaiter dbWaiter = ddb.waiter();
        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType("NEW_AND_OLD_IMAGES")
                .build();
        CreateTableRequest request = CreateTableRequest.builder()
                .attributeDefinitions(AttributeDefinition.builder()
                        .attributeName(key)
                        .attributeType(ScalarAttributeType.S)
                        .build())
                .keySchema(KeySchemaElement.builder()
                        .attributeName(key)
                        .keyType(KeyType.HASH)
                        .build())
                .billingMode(BillingMode.PAY_PER_REQUEST) //  DynamoDB automatically scales based on traffic.
                .tableName(tableName)
                .streamSpecification(streamSpecification)
                .build();

        TableDescription newTable;
        try {
            CreateTableResponse response = ddb.createTable(request);
            DescribeTableRequest tableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
                    
            System.out.println("Waiting for " + tableName + " to be created...");

            // Wait until the Amazon DynamoDB table is created.
            WaiterResponse<DescribeTableResponse> waiterResponse = dbWaiter.waitUntilTableExists(tableRequest);
            waiterResponse.matched().response().ifPresent(System.out::println);
            newTable = response.tableDescription();
            return newTable;

        } catch (DynamoDbException e) {
            System.err.println(e.getMessage());
            System.exit(1);
        }
        return null;
    }



}
```

# DynamoDB Streams と AWS Lambda のトリガー
<a name="Streams.Lambda"></a>

Amazon DynamoDB は AWS Lambda と統合されているため、*トリガー* (DynamoDB Streams 内のイベントに自動的に応答するコードの一部) を作成できます。トリガーを使用すると、DynamoDB テーブル内のデータ変更に対応するアプリケーションを構築できます。

**Topics**
+ [チュートリアル \$11: AWS CLI を使用した Amazon DynamoDB と AWS Lambda での、フィルターを使ったすべてのイベントの処理](Streams.Lambda.Tutorial.md)
+ [チュートリアル \$12: DynamoDB と Lambda での、フィルターを使用したいくつかのイベントの処理。](Streams.Lambda.Tutorial2.md)
+ [Lambda での DynamoDB ストリームの使用に関するベストプラクティス](Streams.Lambda.BestPracticesWithDynamoDB.md)

テーブルで DynamoDB Streams を有効にした場合、書き込む AWS Lambda 関数にストリーミングの Amazon リソースネーム (ARN) を関連付けることができます。その後、その DynamoDB テーブルに対するすべてのミューテーションアクションをストリーム上の項目としてキャプチャできます。例えば、テーブル内の項目が変更されたときに、そのテーブルのストリームに新しいレコードがすぐに表示されるようにトリガーを設定できます。

**注記**  
3 つ以上の Lambda 関数を 1 つの DynamoDB ストリームにサブスクライブすると、読み込みスロットリングが発生する可能性があります。

[AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) サービスは、1 秒に 4 回、ストリームをポーリングして新しいレコードを探します。新しいストリームレコードが利用可能になると、Lambda 関数が同期的に呼び出されます。同じ DynamoDB ストリームに最大 2 つの Lambda 関数をサブスクライブできます。同じ DynamoDB ストリームに 3 つ以上の Lambda 関数をサブスクライブすると、読み込みスロットリングが発生する可能性があります。

Lambda 関数は、通知の送信やワークフローの開始など、指定した他の多くのアクションを実行できます。各ストリームレコードを Amazon S3 File Gateway (Amazon S3) などの永続的ストレージにコピーするだけで、書き込みアクティビティの永続的な監査追跡をテーブルに作成する Lambda 関数を記述できます。または、`GameScores` テーブルに書き込みを行うモバイルゲームアプリがあるとします。`TopScore` テーブルの `GameScores` 属性が更新されるたびに、対応するストリームレコードがテーブルのストリームに書き込まれます。その後、このイベントによって Lambda 関数をトリガーし、ソーシャルメディアネットワークにおめでとうメッセージを投稿できます この関数は、`GameScores` の更新でないストリームレコードや、`TopScore` 属性を変更しないストリームレコードを無視するように記述できます。

関数がエラーを返した場合、処理が正常に完了するか、データの有効期限が切れるまで Lambda はバッチを再試行します。Lambda では、より小さなバッチで再試行したり、再試行回数を制限したり、古すぎるレコードを破棄したりするよう設定できます。

パフォーマンスのベストプラクティスとして、Lambda 関数は存続期間を短くする必要があります。また、不必要な処理の遅延が発生するのを防ぐため、複雑なロジックは実行しないでください。特に高速ストリームの場合は、同期的に長時間実行する Lambda よりも、非同期的な後処理ステップ関数ワークフローをトリガーすることをお勧めします。

 DynamoDB ストリームでリソースベースのポリシーを設定し、Lambda 関数へのクロスアカウント読み取りアクセスを許可することで、異なる AWS アカウント間で Lambda トリガーを使用できます。クロスアカウントアクセスを許可するようにストリームを設定する方法については、DynamoDB 開発者ガイドの「[クロスアカウントの AWS Lambda 関数を使用したアクセスの共有](rbac-cross-account-access.md#shared-access-cross-acount-lambda)」を参照してください。

AWS Lambda の詳細については、「[AWS Lambda デベロッパーガイド](https://docs.aws.amazon.com/lambda/latest/dg/)」を参照してください。

# チュートリアル \$11: AWS CLI を使用した Amazon DynamoDB と AWS Lambda での、フィルターを使ったすべてのイベントの処理
<a name="Streams.Lambda.Tutorial"></a>

 

このチュートリアルでは、AWS Lambda トリガーを作成して、DynamoDB テーブルからのストリーミングを処理します。

**Topics**
+ [ステップ 1: ストリーミングが有効になった DynamoDB テーブルを作成する](#Streams.Lambda.Tutorial.CreateTable)
+ [ステップ 2: Lambda 実行ロールを作成する](#Streams.Lambda.Tutorial.CreateRole)
+ [ステップ 3: Amazon SNS トピックを作成する](#Streams.Lambda.Tutorial.SNSTopic)
+ [ステップ 4: Lambda 関数を作成してテストする](#Streams.Lambda.Tutorial.LambdaFunction)
+ [ステップ 5: トリガーを作成してテストする](#Streams.Lambda.Tutorial.CreateTrigger)

このチュートリアルのシナリオは、シンプルなソーシャルネットワークである Woofer です。Woofer ユーザーは、他の Woofer ユーザーに送信される *bark* (短いテキストメッセージ) を使用して通信します。次の図は、このアプリケーションのコンポーネントとワークフローを示しています。

![\[DynamoDB テーブル、ストリームレコード、Lambda 関数、および Amazon SNS トピックの Woofer アプリケーションワークフロー。\]](http://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/images/StreamsAndTriggers.png)


1. ユーザーは DynamoDB テーブル (`BarkTable`) に項目を書き込みます。テーブルの各項目は bark を表します。

1. 新しいストリームレコードが書き込まれ、新しい項目が `BarkTable` に追加されたことを反映します。

1. 新しいストリームレコードは AWS Lambda 関数 (`publishNewBark`) をトリガーします。

1. ストリーミングレコードに、新しい項目が `BarkTable` に追加されたことが示された場合、Lambda 関数はストリーミングレコードからデータを読み込み、Amazon Simple Notification Service (Amazon SNS) のトピックにメッセージを発行します。

1. メッセージは Amazon SNS トピックの受信者によって受信されます (このチュートリアルでは、唯一の受信者は E メールアドレスです)。

**開始する前に**  
このチュートリアルでは AWS Command Line Interface AWS CLI を使用します。まだ行っていない場合は、「[AWS Command Line Interface ユーザーガイド](https://docs.aws.amazon.com/cli/latest/userguide/)」の手順に従って、AWS CLI をインストールおよび設定します。

## ステップ 1: ストリーミングが有効になった DynamoDB テーブルを作成する
<a name="Streams.Lambda.Tutorial.CreateTable"></a>

このステップでは、Woofer ユーザーからのすべての bark を保存する DynamoDB テーブル (`BarkTable`) を作成します。プライマリキーは、`Username` (パーティションキー) と `Timestamp` (ソートキー) で構成されます。これらの属性は両方とも文字列型になります。

`BarkTable` ではストリームが有効になっています。このチュートリアルの後半では、AWS Lambda 関数をストリームと関連付けてトリガーを作成します。

1. 次のコマンドを入力して、テーブルを作成します。

   ```
   aws dynamodb create-table \
       --table-name BarkTable \
       --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \
       --key-schema AttributeName=Username,KeyType=HASH  AttributeName=Timestamp,KeyType=RANGE \
       --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
       --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
   ```

1. 出力で、`LatestStreamArn` を探します。

   ```
   ...
   "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

   `region` と `accountID` をメモしておきます。これらは、このチュートリアルの他のステップで必要になります。

## ステップ 2: Lambda 実行ロールを作成する
<a name="Streams.Lambda.Tutorial.CreateRole"></a>

このステップでは、AWS Identity and Access Management (IAM) ロール (`WooferLambdaRole`) を作成し、それにアクセス権限を割り当てます。このロールは、[ステップ 4: Lambda 関数を作成してテストする](#Streams.Lambda.Tutorial.LambdaFunction) で作成する Lambda 関数で使用されます。

また、ロールのポリシーを作成します。このポリシーには、Lambda 関数がランタイム時に必要とするすべてのアクセス許可が含まれます。

1. 次の内容で、`trust-relationship.json` というファイルを作成します。

------
#### [ JSON ]

****  

   ```
   {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "lambda.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
   ```

------

1. `WooferLambdaRole` を作成するため、以下のコマンドを入力します。

   ```
   aws iam create-role --role-name WooferLambdaRole \
       --path "/service-role/" \
       --assume-role-policy-document file://trust-relationship.json
   ```

1. 次の内容で、`role-policy.json` というファイルを作成します。(`region` および `accountID` をお客様の AWS リージョンとアカウント ID に置き換えます。)

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogGroup",
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": "arn:aws:logs:us-east-1:111122223333:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "dynamodb:DescribeStream",
                   "dynamodb:GetRecords",
                   "dynamodb:GetShardIterator",
                   "dynamodb:ListStreams"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:111122223333:table/BarkTable/stream/*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "sns:Publish"
               ],
               "Resource": [
                   "*"
               ]
           }
       ]
   }
   ```

------

   ポリシーには 4 つのステートメントがあり、これにより `WooferLambdaRole` は以下を実行できます。
   + Lambda 関数 (`publishNewBark`) を実行します。このチュートリアルの後半で、この関数を作成します。
   + Amazon CloudWatch Logs のアクセス。Lambda 関数はランタイム時に診断を CloudWatch Logs に書き込みます。
   + `BarkTable` の DynamoDB Streams からデータを読み込みます。
   + Amazon SNS にメッセージを公開します。

1. 次のコマンドを入力して、`WooferLambdaRole` にポリシーをアタッチします。

   ```
   aws iam put-role-policy --role-name WooferLambdaRole \
       --policy-name WooferLambdaRolePolicy \
       --policy-document file://role-policy.json
   ```

## ステップ 3: Amazon SNS トピックを作成する
<a name="Streams.Lambda.Tutorial.SNSTopic"></a>

このステップでは、Amazon SNSトピック (`wooferTopic`) を作成し、そのトピックに E メールアドレスをサブスクライブします。Lambda 関数はこのトピックを使用して、Woofer ユーザーからの新しい bark を公開します。

1. 次のコマンドを入力して、新しい Amazon SNS トピックを作成します。

   ```
   aws sns create-topic --name wooferTopic
   ```

1. 次のコマンドを入力して、`wooferTopic` に E メールアドレスをサブスクライブします (`region` および `accountID` は AWS リージョンとアカウント ID に置き換え、`example@example.com` は有効な E メールアドレスと置き換えます)。

   ```
   aws sns subscribe \
       --topic-arn arn:aws:sns:region:accountID:wooferTopic \
       --protocol email \
       --notification-endpoint example@example.com
   ```

1. Amazon SNS は E メールアドレスに確認メッセージを送信します。そのメッセージの [**サブスクリプションを確認**] リンクを選択して、サブスクリプションプロセスを完了します。

## ステップ 4: Lambda 関数を作成してテストする
<a name="Streams.Lambda.Tutorial.LambdaFunction"></a>

このステップでは、AWS Lambda 関数 (`publishNewBark`) を作成して `BarkTable` からのストリームレコードを処理します。

`publishNewBark` 関数は、`BarkTable` の新しい項目に対応するストリームイベントのみを処理します。この関数は、そのようなイベントからデータを読み取ってから、Amazon SNS をコールしてデータを公開します。

1. 次の内容で、`publishNewBark.js` というファイルを作成します。`region` および `accountID` をお客様の AWS リージョンとアカウント ID に置き換えます。

   ```
   'use strict';
   var AWS = require("aws-sdk");
   var sns = new AWS.SNS();
   
   exports.handler = (event, context, callback) => {
   
       event.Records.forEach((record) => {
           console.log('Stream record: ', JSON.stringify(record, null, 2));
   
           if (record.eventName == 'INSERT') {
               var who = JSON.stringify(record.dynamodb.NewImage.Username.S);
               var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S);
               var what = JSON.stringify(record.dynamodb.NewImage.Message.S);
               var params = {
                   Subject: 'A new bark from ' + who,
                   Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what,
                   TopicArn: 'arn:aws:sns:region:accountID:wooferTopic'
               };
               sns.publish(params, function(err, data) {
                   if (err) {
                       console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2));
                   } else {
                       console.log("Results from sending message: ", JSON.stringify(data, null, 2));
                   }
               });
           }
       });
       callback(null, `Successfully processed ${event.Records.length} records.`);
   };
   ```

1. `publishNewBark.js` を含める zip ファイルを作成します。zip コマンドラインユーティリティがある場合は、次のコマンドを入力してこれを行うことができます。

   ```
   zip publishNewBark.zip publishNewBark.js
   ```

1. Lambda 関数を作成する場合は、[ステップ 2: Lambda 実行ロールを作成する](#Streams.Lambda.Tutorial.CreateRole) で作成した `WooferLambdaRole` の Amazon リソースネーム (ARN) を指定します。この ARN を取得するには、次のコマンドを入力します。

   ```
   aws iam get-role --role-name WooferLambdaRole
   ```

   出力で、`WooferLambdaRole` の ARN を探します。

   ```
   ...
   "Arn": "arn:aws:iam::region:role/service-role/WooferLambdaRole"
   ...
   ```

   次のコマンドを入力して、Lambda 関数を作成します。*roleARN* を `WooferLambdaRole` の ARN に置き換えます。

   ```
   aws lambda create-function \
       --region region \
       --function-name publishNewBark \
       --zip-file fileb://publishNewBark.zip \
       --role roleARN \
       --handler publishNewBark.handler \
       --timeout 5 \
       --runtime nodejs16.x
   ```

1. ここで、`publishNewBark` をテストして、これが動作することを確認します。これを行うには、DynamoDB Streams の実際のレコードに似た情報を入力します。

   次の内容で、`payload.json` というファイルを作成します。`region` および `accountID` をお客様の AWS リージョンとアカウント ID に置き換えます。

   ```
   {
       "Records": [
           {
               "eventID": "7de3041dd709b024af6f29e4fa13d34c",
               "eventName": "INSERT",
               "eventVersion": "1.1",
               "eventSource": "aws:dynamodb",
               "awsRegion": "region",
               "dynamodb": {
                   "ApproximateCreationDateTime": 1479499740,
                   "Keys": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "NewImage": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Message": {
                           "S": "This is a bark from the Woofer social network"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "SequenceNumber": "13021600000000001596893679",
                   "SizeBytes": 112,
                   "StreamViewType": "NEW_IMAGE"
               },
               "eventSourceARN": "arn:aws:dynamodb:region:account ID:table/BarkTable/stream/2016-11-16T20:42:48.104"
           }
       ]
   }
   ```

   次のコマンドを入力して、`publishNewBark` 関数をテストします。

   ```
   aws lambda invoke --function-name publishNewBark --payload file://payload.json --cli-binary-format raw-in-base64-out output.txt
   ```

   テストが成功すると、次の出力が表示されます。

   ```
   {
       "StatusCode": 200,
       "ExecutedVersion": "$LATEST"
   }
   ```

   さらに、`output.txt` ファイルには次のテキストが含まれます。

   ```
   "Successfully processed 1 records."
   ```

   また、数分以内に新しい E メールメッセージが届きます。
**注記**  
AWS Lambda は、Amazon CloudWatch Logs に診断情報を書き込みます。Lambda 関数でエラーが発生した場合、この診断情報をトラブルシューティングに使用できます。  
CloudWatch コンソール ([https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)) を開きます。
ナビゲーションペインで [**ログ**] を選択します。
次のロググループを選択: `/aws/lambda/publishNewBark`
最新のログストリーミングを選択して、関数からの出力 (およびエラー) を表示します。

## ステップ 5: トリガーを作成してテストする
<a name="Streams.Lambda.Tutorial.CreateTrigger"></a>

[ステップ 4: Lambda 関数を作成してテストする](#Streams.Lambda.Tutorial.LambdaFunction) で、Lambda 関数をテストして、正しく実行されたことを確認しました。このステップでは、Lambda 関数 (`publishNewBark`) をイベントソース (`BarkTable` ストリーミング) に関連付けることで*トリガー*を作成します。

1. トリガーを作成する場合、`BarkTable` ストリーム用の ARN を指定する必要があります。この ARN を取得するには、次のコマンドを入力します。

   ```
   aws dynamodb describe-table --table-name BarkTable
   ```

   出力で、`LatestStreamArn` を探します。

   ```
   ...
    "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

1. 次のコマンドを入力してトリガーを作成します `streamARN` を実際のストリーム ARN に置き換えます。

   ```
   aws lambda create-event-source-mapping \
       --region region \
       --function-name publishNewBark \
       --event-source streamARN  \
       --batch-size 1 \
       --starting-position TRIM_HORIZON
   ```

1. トリガーをテストします。次のコマンドを入力して、`BarkTable` に項目を追加します。

   ```
   aws dynamodb put-item \
       --table-name BarkTable \
       --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}
   ```

   数分以内に新しい E メールメッセージが届きます。

1. DynamoDB コンソールを開き、さらにいくつかの項目を `BarkTable` に追加します。`Username` および `Timestamp` 属性の値を指定する必要があります (必須ではないものの、`Message` の値を指定する必要があります)。`BarkTable` に追加した各項目について、新しい E メールメッセージが届きます。

   Lambda 関数は、`BarkTable` に追加した新しい項目のみを処理します。テーブル内の項目を更新または削除すると、この関数は何も行いません。

**注記**  
AWS Lambda は、Amazon CloudWatch Logs に診断情報を書き込みます。Lambda 関数でエラーが発生した場合、この診断情報をトラブルシューティングに使用できます。  
CloudWatch コンソール ([https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)) を開きます。
ナビゲーションペインで [**ログ**] を選択します。
次のロググループを選択: `/aws/lambda/publishNewBark`
最新のログストリーミングを選択して、関数からの出力 (およびエラー) を表示します。

# チュートリアル \$12: DynamoDB と Lambda での、フィルターを使用したいくつかのイベントの処理。
<a name="Streams.Lambda.Tutorial2"></a>

このチュートリアルでは、AWS Lambda トリガーを作成して、DynamoDB テーブルからのストリームの一部のイベントのみを処理します。

**Topics**
+ [すべてをまとめる - CloudFormation](#Streams.Lambda.Tutorial2.Cloudformation)
+ [すべてをまとめる - CDK](#Streams.Lambda.Tutorial2.CDK)

[Lambda イベントフィルタリング](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)では、フィルター式を使用して、処理のために Lambda が関数に送信するイベントを制御できます。DynamoDB ストリームごとに最大 5 つの異なるフィルターを設定できます。バッチ処理ウィンドウを使用している場合、Lambda は新しいイベントそれぞれにフィルター条件を適用して、現在のバッチに含めるかどうかを確認します。

フィルターは `FilterCriteria` と呼ばれる構造を介して適用されます。`FilterCriteria` の 3 つの主な属性は `metadata properties`、`data properties`、および `filter patterns` です。

DynamoDB Streams イベントの構造の例を次に示します。

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

`metadata properties` は、イベントオブジェクトのフィールドです。DynamoDB Streams の場合、`metadata properties` は `dynamodb` や `eventName` のようなフィールドです。

`data properties` は、イベント本文のフィールドです。`data properties` をフィルタリングするには、それらを適切なキー内の `FilterCriteria` に含めるようにしてください。DynamoDB イベントソースのデータキーは `NewImage` または `OldImage` です。

最後に、フィルタールールは、特定のプロパティに適用するフィルター式を定義します。次に例を示します。


| 比較演算子 | 例 | ルール構文 (一部) | 
| --- | --- | --- | 
|  Null  |  製品タイプは NULL  |  `{ "product_type": { "S": null } } `  | 
|  空  |  製品名は空白  |  `{ "product_name": { "S": [ ""] } } `  | 
|  Equals  |  州はフロリダに等しい  |  `{ "state": { "S": ["FL"] } } `  | 
|  And  |  製品州はフロリダ、製品カテゴリはチョコレート  |  `{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } } `  | 
|  または  |  製品州はフロリダまたはカリフォルニア  |  `{ "state": { "S": ["FL","CA"] } } `  | 
|  Not  |  製品州はフロリダ州ではない  |  `{"state": {"S": [{"anything-but": ["FL"]}]}}`  | 
|  存在する  |  地産品は存在する  |  `{"homemade": {"S": [{"exists": true}]}}`  | 
|  存在しない  |  地産品は存在しない  |  `{"homemade": {"S": [{"exists": false}]}}`  | 
|  Begins with  |  PK は COMPANY から始まる  |  `{"PK": {"S": [{"prefix": "COMPANY"}]}}`  | 

Lambda 関数には、最大 5 つのイベントフィルタリングパターンを指定できます。これら 5 つのイベントのそれぞれが論理 OR として評価されることに注意してください。そのため、`Filter_One` および `Filter_Two` という名前の 2 つのフィルターを設定すると、Lambda 関数は `Filter_One` OR `Filter_Two` を実行します。

**注記**  
[Lambda イベントのフィルタリング](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)ページには、数値をフィルタリングして比較するオプションがいくつかありますが、DynamoDB のフィルタイベントの場合、DynamoDB の数値は文字列として保存されるため、適用されません。例えば ` "quantity": { "N": "50" }` の場合は、`"N"` プロパティのおかげでそれが数字だとわかります。

## すべてをまとめる - CloudFormation
<a name="Streams.Lambda.Tutorial2.Cloudformation"></a>

実際のイベントフィルタリング機能を見ていただくために、CloudFormation テンプレートのサンプルを以下に示します。このテンプレートは、Amazon DynamoDB Streams を有効にしたパーティションキー PK とソートキー SK を含むシンプルな DynamoDB テーブルを生成します。これにより、Amazon Cloudwatch へのログの書き込みと Amazon DynamoDB Streams からのイベントの読み取りを許可する Lambda 関数とシンプルな Lambda 実行ロールが作成されます。また、DynamoDB Streams と Lambda 関数間にイベントソースマッピングも追加されるため、Amazon DynamoDB Streams にイベントが発生するたびに関数を実行できます。

```
AWSTemplateFormatVersion: "2010-09-09"

Description: Sample application that presents AWS Lambda event source filtering 
with Amazon DynamoDB Streams.

Resources:
  StreamsSampleDDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "PK"
          AttributeType: "S"
        - AttributeName: "SK"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "PK"
          KeyType: "HASH"
        - AttributeName: "SK"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: "NEW_AND_OLD_IMAGES"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17",		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17",		 	 	 
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource: !GetAtt StreamsSampleDDBTable.StreamArn

  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST

  ProcessEventLambda:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: 300
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          import logging

          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)

          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)

Outputs:
  StreamsSampleDDBTable:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.Arn
  StreamARN:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.StreamArn
```

この CloudFormation テンプレートをデプロイすると、次の Amazon DynamoDB 項目を挿入できます。

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

この CloudFormation テンプレートにインラインで組み込まれているシンプルな Lambda 関数により、Lambda 関数の Amazon CloudWatch ロググループのイベントは次のように表示されます。

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

**フィルター例**
+ **特定の州に一致する商品のみ**

この例では、CloudFormation テンプレートを変更して、フロリダで生産されたすべての製品 (略称「FL」) と一致するフィルターを含めます。

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

スタックを再デプロイしたら、テーブルに次の DynamoDB 項目を追加できます。この例での製品はカリフォルニア産なので、Lambda 関数ログには表示されないことに注意してください。

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK#1000",
 "company_id": "1000",
 "fabric": "Florida Chocolates",
 "price": 15,
 "product_id": "1000",
 "quantity": 50,
 "state": "CA",
 "stores": 5,
 "type": ""
}
```
+ **PK と SK のうち、ある値で始まるアイテムのみ**

この例では、CloudFormation テンプレートを変更して次の条件を含めます。

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

AND 条件では、条件がパターン内になければならないことに注意してください。ここで、キー PK と SK は同じ式内にあり、カンマで区切られます。

PK と SK がある値から始まるか、特定の州産かのどちらかです。

この例では、CloudFormation テンプレートを変更して次の条件を含めます。

```
  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

フィルターセクションに新しいパターンを導入することで、OR 条件が追加されます。

## すべてをまとめる - CDK
<a name="Streams.Lambda.Tutorial2.CDK"></a>

次のサンプル CDK プロジェクト形成テンプレートでは、イベントフィルタリング機能について説明します。この CDK プロジェクトに取り組む前に、[準備スクリプトの実行](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html)を含む[前提条件をインストール](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html)する必要があります。

**CDK プロジェクトを作成する**

まず、空のディレクトリで `cdk init` を呼び出して、新しい AWS CDK プロジェクトを作成します。

```
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
```

この `cdk init` コマンドは、プロジェクトフォルダの名前を使用して、クラス、サブフォルダ、ファイルなど、プロジェクトのさまざまな要素に名前を付けます。フォルダ名に含まれるハイフンはすべてアンダースコアに変換されます。それ以外の場合、名前は Python 識別子の形式に従う必要があります。例えば、数字で始めたり、スペースを含めたりはしないでください。

新しいプロジェクトで作業するには、その仮想環境を有効にします。これにより、プロジェクトの依存関係をグローバルにインストールするのではなく、プロジェクトフォルダにローカルにインストールできます。

```
source .venv/bin/activate
python -m pip install -r requirements.txt
```

**注記**  
これは、仮想環境をアクティブにする Mac/Linux コマンドとして認識されるかもしれません。Python テンプレートには、同じコマンドを Windows で使用できるようにするバッチファイル、`source.bat` が含まれています。従来の Windows コマンド、`.venv\Scripts\activate.bat` も機能します。AWS CDK Toolkit v1.70.0 以前を使用して AWS CDK プロジェクトを初期化した場合、仮想環境は `.venv` ではなく `.env` ディレクトリにあります。

**基本インフラストラクチャ**

任意のテキストエディタでファイル `./ddb_filters/ddb_filters_stack.py` を開きます。このファイルは、AWS CDK プロジェクトを作成したときに自動生成されました。

次に、`_create_ddb_table` および `_set_ddb_trigger_function` 関数を追加します。これらの関数は、プロビジョニングモードのオンデマンドモードでパーティションキー PK とソートキー SK を含む DynamoDB テーブルを作成します。Amazon DynamoDB Streams をデフォルトで有効にして、新しいイメージと古いイメージを表示できます。

Lambda 関数はファイル `app.py` の下のフォルダ `lambda` に保存されます。このファイルは後で作成されます。これには環境変数 `APP_TABLE_NAME` が含まれます。この変数は、このスタックによって作成される Amazon DynamoDB テーブルの名前になります。同じ関数で、Lambda 関数にストリーム読み取り権限を付与します。最後に、Lambda 関数のイベントソースとして DynamoDB Streams にサブスクライブします。

`__init__` メソッド内のファイルの最後で、それぞれの構成を呼び出してスタック内で初期化します。追加のコンポーネントやサービスを必要とする大規模なプロジェクトでは、これらの構成を基本スタックの外部で定義するのが最適な場合があります。

```
import os
import json

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_dynamodb as dynamodb,
)
from constructs import Construct


class DdbFiltersStack(Stack):

    def _create_ddb_table(self):
        dynamodb_table = dynamodb.Table(
            self,
            "AppTable",
            partition_key=dynamodb.Attribute(
                name="PK", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
        return dynamodb_table

    def _set_ddb_trigger_function(self, ddb_table):
        events_lambda = _lambda.Function(
            self,
            "LambdaHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="app.handler",
            environment={
                "APP_TABLE_NAME": ddb_table.table_name,
            },
        )

        ddb_table.grant_stream_read(events_lambda)

        event_subscription = _lambda.CfnEventSourceMapping(
            scope=self,
            id="companyInsertsOnlyEventSourceMapping",
            function_name=events_lambda.function_name,
            event_source_arn=ddb_table.table_stream_arn,
            maximum_batching_window_in_seconds=1,
            starting_position="LATEST",
            batch_size=1,
        )

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ddb_table = self._create_ddb_table()
        self._set_ddb_trigger_function(ddb_table)
```

次に、Amazon CloudWatch にログを出力する、非常にシンプルな Lambda 関数を作成します。それには、`lambda` という新しいフォルダを作成します。

```
mkdir lambda
touch app.py
```

任意のテキストエディタを使用して、次の内容を `app.py` ファイルに追加します。

```
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info('Received Event: %s', event)
    for rec in event['Records']:
        LOGGER.info('Record: %s', rec)
```

`/ddb_filters/` フォルダにいることを確認し、次のコマンドを入力してサンプルアプリケーションを作成します。

```
cdk deploy
```

ある時点で、ソリューションをデプロイするかどうか確認する画面が表示されます。`Y` を入力して変更を確定します。

```
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

Do you wish to deploy these changes (y/n)? y

...

✨  Deployment time: 67.73s

Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
```

変更がデプロイされたら、AWS コンソールを開いてテーブルに項目を 1 つ追加します。

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

これで CloudWatch ログには、このエントリの情報がすべて含まれているはずです。

**フィルター例**
+ **特定の州に一致する商品のみ**

ファイル `ddb_filters/ddb_filters/ddb_filters_stack.py` を開き、「FL」に等しいすべての製品と一致するフィルターを含めるように変更します。これは 45 行目の `event_subscription` のすぐ下で修正できます。

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```
+ **PK と SK のうち、ある値で始まる項目のみ**

Python スクリプトを次の条件を含むように変更します。

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
        ]
    },
```
+ **PK と SK がある値から始まるか、特定の州産かのどちらかです。**

Python スクリプトを次の条件を含むように変更します。

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```

Filters 配列に要素を追加することで OR 条件が追加されることに注意してください。

**クリーンアップ**

作業ディレクトリのベースにあるフィルタースタックを見つけて、`cdk destroy` を実行します。リソースの削除を確認するメッセージが表示されます。

```
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y
```

# Lambda での DynamoDB ストリームの使用に関するベストプラクティス
<a name="Streams.Lambda.BestPracticesWithDynamoDB"></a>

AWS Lambda 関数は、他の関数から分離された実行環境である*コンテナ*内で実行されます。この関数を初めて実行すると、AWS Lambda は新しいコンテナを作成し、関数のコードを実行し始めます。

Lambda 関数には、コールごとに 1 回実行される*ハンドラ*があります。ハンドラには、関数用の主要なビジネスロジックが含まれます。たとえば、[ステップ 4: Lambda 関数を作成してテストする](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) に示す Lambda 関数には、DynamoDB Streams のレコードを処理できるハンドラがあります。

コンテナの作成後、AWS Lambda が初めてハンドラーを実行する前に、1 回だけ実行される初期化コードを提供することもできます。[ステップ 4: Lambda 関数を作成してテストする](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) に示す Lambda 関数には、SDK for JavaScript in Node をインポートし、Amazon SNS 用のクライアントを作成する初期化コードがあります。これらのオブジェクトはハンドラの外部で 1 回のみ定義します。

関数の実行後、AWS Lambda は関数のそれ以降の呼び出しに対してコンテナを再利用する場合があります。この場合、関数ハンドラは、初期化コードで定義したリソースを再利用できる可能性があります (AWS Lambda がコンテナを保持する期間や、コンテナを再利用するかどうかを制御することはできません)。

AWS Lambda を使用した DynamoDB トリガーの場合は、次のことをお勧めします。
+ AWS のサービスのクライアントは、ハンドラではなく初期化コードでインスタンス化する必要があります。これにより、AWS Lambda コンテナは、コンテナの有効期間中は既存の接続を再利用することができます。
+ 通常、お客様が明示的に接続を管理したり、接続プールを実装したりする必要はありません。これは AWS Lambda によって自動的に管理されます。

DynamoDB ストリームの Lambda コンシューマーは、正確に一度だけ配信されることを保証するものではなく、時折重複が発生する可能性があります。重複処理が原因で予期しない問題が発生しないように、Lambda 関数コードは必ずべき等性にしてください。

詳細については、「*AWS Lambda デベロッパーガイド*」の「[AWS Lambda 関数を操作するためのベストプラクティス](https://docs.aws.amazon.com/lambda/latest/dg/best-practices.html)」を参照してください。

# DynamoDB Streams と Apache Flink
<a name="StreamsApacheFlink.xml"></a>

Apache Flink で Amazon DynamoDB Streams レコードを使用できます。[Amazon Managed Service for Apache Flink](https://aws.amazon.com/managed-service-apache-flink/) を使用すると、Apache Flink を使用してストリーミングデータをリアルタイムで変換および分析できます。Apache Flink は、リアルタイムデータを処理するためのオープンソースのストリーム処理フレームワークです。Apache Flink 用 Amazon DynamoDB Streams コネクタは、Apache Flink ワークロードの構築と管理を簡素化し、アプリケーションを他の AWS のサービスと統合できるようにします。

Amazon Managed Service for Apache Flink は、ログ分析、クリックストリーム分析、モノのインターネット (IoT)、アドテック、ゲームなどのためのエンドツーエンドのストリーム処理アプリケーションを迅速に構築するのに役立ちます。最も一般的な 4 つのユースケースは、ストリーミングの抽出変換ロード (ETL)、イベント駆動型アプリケーション、応答性の高いリアルタイム分析、データストリームのインタラクティブなクエリです。Amazon DynamoDB Streams から Apache Flink への書き込みの詳細については、「[Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamodb/)」を参照してください。