

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# DynamoDB Streams 的變更資料擷取
<a name="Streams"></a>

 DynamoDB Streams 可擷取任何 DynamoDB 資料表中依時間順序排序的項目層級修改，並將此資訊存放於日誌中長達 24 小時。應用程式可以存取此日誌，並近乎即時地檢視資料項目修改前後的顯示內容。

 靜態加密功能會加密 DynamoDB Streams 中的資料。如需詳細資訊，請參閱 [DynamoDB 靜態加密](EncryptionAtRest.md)。

*DynamoDB 串流*是 DynamoDB 資料表中項目變更資訊的排序流程。當您在資料表啟用串流時，DynamoDB 會擷取資料表中資料項目的每項修改資訊。

應用程式每次建立、更新或刪除資料表中的項目時，DynamoDB Streams 都會使用已修改項目的主索引鍵屬性寫入串流紀錄。*串流紀錄*包含 DynamoDB 資料表中單一項目資料修改的相關資訊。您可以設定串流，讓串流紀錄擷取其他資訊，例如修改項目的「之前」與「之後」影像。

DynamoDB Streams 可協助確保下列事項：
+ 每個串流紀錄只在串流中出現一次。
+ 對於 DynamoDB 資料表中的每個修改項目，串流紀錄的出現順序與項目的實際修改順序相同。

DynamoDB Streams 會近乎即時地寫入串流紀錄，讓您可以建立使用這些串流並根據內容採取動作的應用程式。

**Topics**
+ [DynamoDB Streams 的端點](#Streams.Endpoints)
+ [啟用串流](#Streams.Enabling)
+ [讀取及處理串流](#Streams.Processing)
+ [DynamoDB Streams 和存留時間](time-to-live-ttl-streams.md)
+ [使用 DynamoDB Streams Kinesis 轉接器處理串流記錄](Streams.KCLAdapter.md)
+ [DynamoDB Streams 低階 API：Java 範例](Streams.LowLevel.Walkthrough.md)
+ [DynamoDB 串流和 AWS Lambda 觸發](Streams.Lambda.md)
+ [DynamoDB Streams 和 Apache Flink](StreamsApacheFlink.xml.md)

## DynamoDB Streams 的端點
<a name="Streams.Endpoints"></a>

AWS 會維護 DynamoDB 和 DynamoDB Streams 的個別端點。為了使用資料庫資料表與索引，應用程式必須能存取 DynamoDB 端點。為了讀取及處理 DynamoDB Streams 紀錄，應用程式必須能存取同一區域中的 DynamoDB Streams 端點。

DynamoDB Streams 提供兩組端點。這些類別為：
+ **IPv4-only 端點**：具有命名慣例的端點。 `streams.dynamodb.<region>.amazonaws.com`
+ **雙堆疊端點**：與 IPv4 和 IPv6 相容並遵循`streams-dynamodb.<region>.api.aws`命名慣例的新端點。

**注意**  
如需 DynamoDB 和 DynamoDB Streams 區域與端點的完整清單，請參閱 *AWS 一般參考* 中的[區域與端點](https://docs.aws.amazon.com/general/latest/gr/rande.html)。

 AWS SDKs 為 DynamoDB 和 DynamoDB Streams 提供單獨的用戶端。視您需求的不同，應用程式可以存取 DynamoDB 端點、DynamoDB Streams 端點，或同時存取這兩種端點。若要連線到這兩種端點，您的應用程式必須具體化兩個用戶端：一個用於 DynamoDB，另一個用於 DynamoDB Streams。

## 啟用串流
<a name="Streams.Enabling"></a>

您可以使用 AWS CLI 或其中一個 AWS SDKs 在新資料表上啟用串流。您也可以在現有的資料表上啟用或停用串流，或變更串流的設定。DynamoDB Streams 會以非同步方式運作，因此若您啟用串流，也不會影響資料表的效能。

管理 DynamoDB Streams 最簡單的方式就是使用 AWS 管理主控台。

1. 登入 AWS 管理主控台 ，並在 https：//[https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/) 開啟 DynamoDB 主控台。

1. 在 DynamoDB 主控台儀表板上，選擇 **Tables** (資料表)，然後選取現有的資料表。

1. 選擇 **Exports and streams** (匯出與串流) 索引標籤。

1. 在 **DynamoDB 串流詳細資訊**區段中，選擇**開啟**。

1. 在**開啟 DynamoDB 串流**頁面中，選擇每次修改資料表資料時會寫入串流的資訊：
   + **僅限金鑰屬性**：僅已修改項目的金鑰屬性。
   + **新映像**：在修改後出現的整個項目。
   + **舊映像**：在修改前出現的整個項目。
   + **新舊映像**：項目的新舊映像。

   當您滿意設定後，請選擇**開啟串流**。

1. (選用) 若要停用現有的串流，請選擇 **DynamoDB 串流詳細資訊**下的**關閉**。

您也可以使用 `CreateTable` 或 `UpdateTable` API 操作來啟用或修改串流。`StreamSpecification` 參數可決定串流的設定方式：
+ `StreamEnabled`：指定是否啟用 (`true`) 或停用 (`false`) 資料表的串流。
+ `StreamViewType`：指定每次修改資料表資料時，將會寫入串流的資訊：
  + `KEYS_ONLY`：僅已修改項目的索引鍵屬性。
  + `NEW_IMAGE`：在修改後出現的整個項目。
  + `OLD_IMAGE`：在修改前出現的整個項目。
  + `NEW_AND_OLD_IMAGES`：項目的新舊映像。

您可以隨時啟用或停用串流。但若嘗試在已有串流的資料表中啟用串流，就會收到 `ValidationException`。若嘗試在沒有串流的資料表中停用串流，也會收到 `ValidationException`。

當您將 `StreamEnabled` 設定為 `true` 時，DynamoDB 會建立新的串流，並對其指派唯一的串流描述項。如果您停用再重新啟用資料表串流，則會使用不同的串流描述項建立新的串流。

每個串流都可依 Amazon Resource Name (ARN) 唯一識別。以下是名為 `TestTable` 的 DynamoDB 資料表上串流的 ARN 範例。

```
arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291
```

若要判斷資料表的最新串流描述項，請發出 DynamoDB `DescribeTable` 請求，並尋找回應中的 `LatestStreamArn` 元素。

**注意**  
一旦串流設定完畢，就無法再編輯 `StreamViewType`。若您需要變更已設定完畢的串流，就必須停用目前的串流並建立一個新的串流。

## 讀取及處理串流
<a name="Streams.Processing"></a>

若要讀取及處理串流，您的應用程式必須連線到 DynamoDB Streams 端點，並發出 API 請求。

串流由*串流紀錄*所組成。每個串流紀錄均代表串流所屬之 DynamoDB 資料表中的一項資料修改。每個串流紀錄會獲派一個序號，其反映出紀錄發布至串流的順序。

串流紀錄會整理成群組，即*碎片*。每個碎片皆代表多個串流紀錄的容器，並含有存取及重複處理這些紀錄所需的資訊。碎片內的串流紀錄會在 24 小時後自動移除。

碎片為暫時性：系統會視需要自動建立及刪除這些碎片。任何碎片還可分割成多個新的碎片，這也會自動發生。(父碎片也可能只有一個子碎片。) 碎片可能會為了回應其父資料表上的上層寫入活動而分割，讓應用程式可同時處理多個碎片中的紀錄。

如果您停用串流，所有開啟的碎片都會關閉。串流中的資料在 24 小時內仍可供讀取。

由於碎片具有系屬關係 (父系與子系)，因此應用程式一律必須先處理父碎片，再處理子碎片。這有利於確保串流紀錄也按照正確順序處理。(如果您使用的是 DynamoDB Streams Kinesis 轉接器，則系統會為您處理這個問題。應用程式會依正確順序處理碎片和串流紀錄。除了當應用程式正在執行時分割的碎片，其還會自動處理新的或過期碎片。如需詳細資訊，請參閱 [使用 DynamoDB Streams Kinesis 轉接器處理串流記錄](Streams.KCLAdapter.md)。)

下圖說明串流、串流中的碎片及碎片中的串流紀錄之間的關係。

![\[DynamoDB Streams 結構。代表資料修改的串流記錄，會以碎片的形式組織。\]](http://docs.aws.amazon.com/zh_tw/amazondynamodb/latest/developerguide/images/streams-terminology.png)


**注意**  
如果您執行的 `PutItem` 或 `UpdateItem` 操作不會變更項目中的任何資料，則 DynamoDB Streams *不會*寫入該操作的串流紀錄。

若要存取串流及處理其中的串流紀錄，您必須執行下列操作：
+ 判斷您要存取之串流的唯一 ARN。
+ 判斷串流中包含您感興趣之串流紀錄的碎片。
+ 存取碎片及擷取您要的串流紀錄。

**注意**  
最多不得同時從同一個串流碎片讀取 2 個以上的處理程序。每個碎片具有 2 個以上的讀取器會導致限流。

DynamoDB Streams API 提供下列可讓應用程式使用的動作：
+  `[ListStreams](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_ListStreams.html)`：傳回目前帳戶及端點的串流描述項清單。您可以只選擇性地請求特定資料表名稱的串流描述項。
+ `[DescribeStream](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_DescribeStream.html)` – 傳回串流相關資訊，包含目前串流狀態，其 Amazon Resource Name (ARN)、構成的碎片數量，以及其對應的 DynamoDB 資料表。您可以選擇性使用 `ShardFilter` 欄位擷取與父碎片相關的現有子碎片。
+ `[GetShardIterator](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html)`：傳回*碎片疊代運算*，其描述了碎片內的位置。您可以請求疊代運算提供串流中最舊點、最新點或特定點的存取權。
+ `[GetRecords](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html)`：傳回指定碎片內的串流紀錄。您必須提供從 `GetShardIterator` 請求傳回的碎片疊代運算。

如需這些 API 操作的完整說明 (包括範例請求與回應)，請參閱《[Amazon DynamoDB Streams API 參考](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Operations_Amazon_DynamoDB_Streams.html)》。

### 碎片探索
<a name="Streams.ShardDiscovery"></a>



使用兩項強大的方法來探索 DynamoDB 串流中的新碎片。Amazon DynamoDB Streams 使用者可以透過兩種有效的方式追蹤並識別新的碎片：

**輪詢整個串流拓撲**  
使用 `DescribeStream` API 定期輪詢串流。此步驟會傳回串流中的所有碎片，包括任何已建立的新碎片。您可以透過比較結果，偵測新增的碎片。

**探索子碎片**  
使用 `DescribeStream` API 搭配 `ShardFilter` 參數以尋找碎片的子集。透過在請求中指定父碎片，DynamoDB Streams 會傳回其直屬子碎片。此方法適用於僅追蹤碎片譜系，而不掃描整個串流的情況。  
從 DynamoDB Streams 取用資料的應用程式，可使用 `ShardFilter` 參數，有效地從讀取封閉碎片，轉換為讀取其子碎片，避免重複呼叫 `DescribeStream` API，以擷取和周遊所有封閉及開啟碎片的碎片對應。這有助於在父碎片關閉後快速探索子碎片，提高串流處理應用程式的回應性並更符合成本效益。

這兩項方法都有助於您掌握 DynamoDB Streams 不斷變化的結構，確保您不會錯過重要的資料更新或碎片修改。

### DynamoDB Streams 的資料保留限制
<a name="Streams.DataRetention"></a>

DynamoDB Streams 中所有資料的生命週期皆為 24 小時。您可以擷取並分析任何指定資料表過去 24 小時的活動。但超過 24 小時的資料容易隨時受到裁剪 (移除) 的影響。

如果您停用資料表的串流，則串流中的資料在 24 小時內仍可供讀取。過了這段時間後，資料就會過期，且串流紀錄也會自動刪除。沒有手動刪除現有串流的機制。您必須等到保留限制過期 (24 小時)，所有串流紀錄才會被刪除。

# DynamoDB Streams 和存留時間
<a name="time-to-live-ttl-streams"></a>

您可以備份或處理[存留時間](TTL.md) (TTL) 刪除的項目，方法是在資料表上啟用 Amazon DynamoDB Streams 並處理過期項目的串流紀錄。如需詳細資訊，請參閱[讀取及處理串流](Streams.md#Streams.Processing)。

串流紀錄包含使用者身分欄位 `Records[<index>].userIdentity`。

存留時間程序在過期後刪除的項目具有下列欄位：
+ `Records[<index>].userIdentity.type`

  `"Service"`
+ `Records[<index>].userIdentity.principalId`

  `"dynamodb.amazonaws.com"`

**注意**  
在全域資料表中使用 TTL 時，執行 TTL 的區域會設定 `userIdentity` 欄位。複寫刪除時，在其他區域中不會設定此欄位。

下列 JSON 顯示單一串流紀錄的相關部分。

```
"Records": [
    {
        ...

        "userIdentity": {
            "type": "Service",
            "principalId": "dynamodb.amazonaws.com"
        }

        ...

    }
]
```

## 使用 DynamoDB Streams 和 Lambda 封存 TTL 已刪除的項目
<a name="streams-archive-ttl-deleted-items"></a>

合併 [DynamoDB 存留時間 (TTL)](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html)、[DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) 和 [AWS Lambda](https://aws.amazon.com/lambda/) 可協助簡化封存資料、降低 DynamoDB 儲存成本並降低程式碼複雜性。使用 Lambda 做為串流使用者具有許多優勢，最顯著的是與其他使用者 (例如 Kinesis Client Library (KCL)) 相比，成本降低。透過 Lambda 使用事件時，您不會因為 DynamoDB 串流上的 `GetRecords` API 呼叫而需要付費，而且 Lambda 可藉由識別串流事件中的 JSON 模式提供事件篩選功能。透過事件模式內容篩選，您最多可以定義五個不同的篩選條件來控制哪些事件要傳送到 Lambda 進行處理。這有助於減少對 Lambda 函式的調用、簡化程式碼並降低總體成本。

雖然 DynamoDB Streams 包含所有資料修改，例如 `Create`、`Modify` 和 `Remove` 動作，這可能會導致不必要的調用封存 Lambda 函式。例如，假設有一個資料表每小時有 200 萬個資料修改流入串流中，但其中不到 5% 的項目刪除將在 TTL 程序中過期且需要進行封存。搭配 [Lambda 事件來源篩選條件](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)，Lambda 函式每小時只會調用 100,000 次。使用事件篩選，您只需為所需的調用付費，而不是在沒有事件篩選的情況下為 200 萬次的調用付費。

事件篩選套用至 [Lambda 事件來源映射](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html)，它是一種資源，可從選定的事件 (DynamoDB 串流) 中讀取並調用 Lambda 函式。在下圖中，您可以看到 Lambda 函式如何透過串流和事件篩選條件使用存留時間已刪除的項目。

![\[透過 TTL 程序刪除的項目，會啟動使用串流和事件篩選條件的 Lambda 函式。\]](http://docs.aws.amazon.com/zh_tw/amazondynamodb/latest/developerguide/images/streams-lambda-ttl.png)


### DynamoDB 存留時間事件篩選條件模式
<a name="ttl-event-filter-pattern"></a>

將以下 JSON 新增至事件來源映射[篩選條件](https://docs.aws.amazon.com/lambda/latest/dg/API_FilterCriteria.html)，可僅對 TTL 刪除的項目調用 Lambda 函式：

```
{
    "Filters": [
        {
            "Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }
        }
    ]
}
```

### 建立 AWS Lambda 事件來源映射
<a name="create-event-source-mapping"></a>

使用以下程式碼片段建立已篩選的事件來源映射，您可以將它連接到資料表的 DynamoDB 串流。每個程式碼區塊都包含事件篩選條件模式。

------
#### [ AWS CLI ]

```
aws lambda create-event-source-mapping \
--event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \
--batch-size 10 \
--enabled \
--function-name test_func \
--starting-position LATEST \
--filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
```

------
#### [ Java ]

```
LambdaClient client = LambdaClient.builder()
        .region(Region.EU_WEST_1)
        .build();

Filter userIdentity = Filter.builder()
        .pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}")
        .build();

FilterCriteria filterCriteria = FilterCriteria.builder()
        .filters(userIdentity)
        .build();

CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder()
        .eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000")
        .batchSize(10)
        .enabled(Boolean.TRUE)
        .functionName("test_func")
        .startingPosition("LATEST")
        .filterCriteria(filterCriteria)
        .build();

try{
    CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest);
    System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn());

}catch (ServiceException e){
    System.out.println(e.getMessage());
}
```

------
#### [ Node ]

```
const client = new LambdaClient({ region: "eu-west-1" });

const input = {
    EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000",
    BatchSize: 10,
    Enabled: true,
    FunctionName: "test_func",
    StartingPosition: "LATEST",
    FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] }
}

const command = new CreateEventSourceMappingCommand(input);

try {
    const results = await client.send(command);
    console.log(results);
} catch (err) {
    console.error(err);
}
```

------
#### [ Python ]

```
session = boto3.session.Session(region_name = 'eu-west-1')
client = session.client('lambda')

try:
    response = client.create_event_source_mapping(
        EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000',
        BatchSize=10,
        Enabled=True,
        FunctionName='test_func',
        StartingPosition='LATEST',
        FilterCriteria={
            'Filters': [
                {
                    'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"
                },
            ]
        }
    )
    print(response)
except Exception as e:
    print(e)
```

------
#### [ JSON ]

```
{
  "userIdentity": {
     "type": ["Service"],
     "principalId": ["dynamodb.amazonaws.com"]
   }
}
```

------

# 使用 DynamoDB Streams Kinesis 轉接器處理串流記錄
<a name="Streams.KCLAdapter"></a>

建議透過 Amazon Kinesis 轉接器耗用來自 Amazon DynamoDB 的串流。DynamoDB Streams API 類似於 Kinesis Data Streams 的 API 是刻意為之。在這兩種服務中，資料串流由碎片組成，碎片是用於串流紀錄的容器。這兩種服務的 API 都包含 `ListStreams`、`DescribeStream`、`GetShards` 以及 `GetShardIterator` 操作。(雖然這些 DynamoDB Streams 動作與其在 Kinesis Data Streams 中的對應動作類似，但它們並非完全相同。)

身為 DynamoDB Streams 使用者，您可以使用在 KCL 內找到的設計模式來處理 DynamoDB Streams 碎片和串流紀錄。為此，您可以使用 DynamoDB Streams Kinesis 轉接器。Kinesis 轉接器會實作 Kinesis Data Streams 介面，以便您將 KCL 用於耗用和處理來自 DynamoDB Streams 的紀錄。如需如何設定和安裝 DynamoDB Streams Kinesis Adapter 的指示，請參閱 [GitHub 儲存庫](https://github.com/awslabs/dynamodb-streams-kinesis-adapter)。

您可以使用 Kinesis Client Library (KCL) 為 Kinesis Data Streams 撰寫應用程式。KCL 會在低階 Kinesis Data Streams API 上提供有用的抽象，可以簡化程式碼。如需 KCL 的詳細資訊，請參閱《Amazon Kinesis Data Streams 開發人員指南》中的[使用 Kinesis Client Library 開發取用者](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)。

DynamoDB 建議使用 KCL 3.x 版搭配適用於 Java 的 AWS SDK v2.x。目前 DynamoDB Streams Kinesis Adapter 1.x 版搭配 適用於 Java 的 AWS SDK 適用於 v1.x 的 AWS SDK，在過渡期間會繼續如預期完全支援整個生命週期，以符合 [AWS SDKs和工具維護政策](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html)。

**注意**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們強烈建議您，在 2026 年 1 月 30 日之前，將使用 1.x 版的 KCL 應用程式移轉至最新的 KCL 版本。如需尋找最新的 KCL 版本，請參閱 GitHub 上的 [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) 頁面。如需最新 Kinesis Client Library 版本的相關詳細資訊，請參閱[使用 Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html)。如需有關從 KCL 1.x 移轉至 3.x 的詳細資訊，請參閱《從 KCL 1.x 移轉到 KCL 3.x》。

下圖顯示這些程式庫彼此如何互動。

![\[DynamoDB Streams、Kinesis Data Streams 和 KCL 之間的互動，用於處理 DynamoDB Streams 記錄。\]](http://docs.aws.amazon.com/zh_tw/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


有了 DynamoDB Streams Kinesis 轉接器，您就可以開始針對 KCL 介面進行開發，並將 API 呼叫順暢地導向 DynamoDB Streams 端點。

當應用程式啟動後，其會呼叫 KCL 以將工作者執行個體化。您必須向工作者提供應用程式的組態資訊，例如串流描述項和 AWS 登入資料，以及您提供的記錄處理器類別名稱。當其在紀錄處理器中執行程式碼時，工作者會執行下列任務：
+ 連線到串流
+ 列舉串流內的碎片
+ 檢查並列舉串流中已關閉父碎片的子碎片
+ 與其他工作者 (若有) 協調碎片關聯性
+ 為其所管理的每個碎片執行個體化記錄處理器
+ 從串流提取紀錄
+ 在高輸送量期間擴展 GetRecords API 呼叫速率 （如果已設定追趕模式）
+ 將記錄推送至對應的記錄處理器
+ 對已處理的記錄執行檢查點作業
+ 當工作者執行個體數目變更時，平衡碎片與工作者的關聯
+ 當碎片進行分割時，平衡碎片與工作者的關聯

KCL 轉接器支援追趕模式，這是一種自動呼叫速率調整功能，用於處理暫時輸送量增加。當串流處理延遲超過可設定的閾值 （預設一分鐘） 時，追趕模式會將 GetRecords API 呼叫頻率擴展為可設定的 值 （預設 3 倍），以更快地擷取記錄，然後在延遲下降時恢復正常。在高傳輸量期間，DynamoDB 寫入活動可能會使用預設輪詢率讓消費者負擔過重，這非常寶貴。可以透過`catchupEnabled`組態參數啟用追趕模式 （預設 false)。

**注意**  
如需此處所列 KCL 概念的描述，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[使用 Kinesis Client Library 開發取用者](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)。  
如需搭配 使用串流的詳細資訊， AWS Lambda 請參閱 [DynamoDB 串流和 AWS Lambda 觸發](Streams.Lambda.md)

# 從 KCL 1.x 移轉到 KCL 3.x
<a name="streams-migrating-kcl"></a>

## 概觀
<a name="migrating-kcl-overview"></a>

本指南提供如何將取用者應用程式從 KCL 1.x 移轉到 KCL 3.x 的說明。由於 KCL 1.x 和 KCL 3.x 之間的架構差異，移轉需要更新多個元件以確保相容性。

KCL 1.x 使用的類別和介面與 KCL 3.x 不同。您必須先將記錄處理器、記錄處理器處理站及工作者類別移轉至 KCL 3.x 相容格式，並遵循 KCL 1.x 移轉至 KCL 3.x 的移轉步驟。

## 移轉步驟
<a name="migration-steps"></a>

**Topics**
+ [步驟 1：移轉記錄處理器](#step1-record-processor)
+ [步驟 2：移轉記錄處理器處理站](#step2-record-processor-factory)
+ [步驟 3：移轉工作者](#step3-worker-migration)
+ [步驟 4：KCL 3.x 組態概觀和建議](#step4-configuration-migration)
+ [步驟 5：從 KCL 2.x 移轉至 KCL 3.x](#step5-kcl2-to-kcl3)

### 步驟 1：移轉記錄處理器
<a name="step1-record-processor"></a>

以下範例顯示基於 KCL 1.x DynamoDB Streams Kinesis 轉接器所實作的記錄處理器：

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**移轉 RecordProcessor 類別**

1. 將介面從 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` 和 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` 更改為 `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor`，如下所示：

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. 更新 `initialize` 和 `processRecords` 方法的匯入陳述式：

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. 將 `shutdownRequested` 方法取代為以下的新方法：`leaseLost`、`shardEnded` 和 `shutdownRequested`。

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

記錄處理器類別經更新後的版本如下：

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**注意**  
DynamoDB Streams Kinesis 轉接器現在使用 SDKv2 記錄模型。在 SDKv2 中，複雜 `AttributeValue` 物件 (`BS`、`NS`、`M`、`L`、`SS`) 永遠不會傳回空值。使用 `hasBs()`、`hasNs()`、`hasM()`、`hasL()`、`hasSs()` 方法驗證這些值是否存在。

### 步驟 2：移轉記錄處理器處理站
<a name="step2-record-processor-factory"></a>

記錄處理器處理站負責在取得租用時建立記錄處理器。以下是 KCL 1.x 處理站的範例：

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**移轉 `RecordProcessorFactory`**
+ 將實作的介面從 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` 更改為 `software.amazon.kinesis.processor.ShardRecordProcessorFactory`，如下所示：

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

以下是 3.0 中記錄處理器工廠的範例：

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### 步驟 3：移轉工作者
<a name="step3-worker-migration"></a>

在 KCL 的版本 3.0，名為**排程器**​的新類別會取代**工作者**類別。以下是 KCL 1.x 工作者的範例：

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**移轉至工作者**

1. 將 `Worker` 類別的 `import` 陳述式變更為 `Scheduler` 和 `ConfigsBuilder` 類別的匯入陳述式。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. 匯入 `StreamTracker` 並將匯入 `StreamsWorkerFactory` 變更至 `StreamsSchedulerFactory`。

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. 選擇要啟動應用程式的位置。其可能是 `TRIM_HORIZON` 或 `LATEST`。

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. 建立 `StreamTracker` 執行個體。

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. 建立 `AmazonDynamoDBStreamsAdapterClient` 物件。

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. 建立 `ConfigsBuilder` 物件。

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. 使用 `ConfigsBuilder` 建立 `Scheduler`，如下列範例所示：

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**重要**  
`CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` 設定可維持 KCL v3 與 KCL v1 之間的 DynamoDB Streams Kinesis Adapter 相容性，而非 KCL v2 與 v3 之間的相容性。

### 步驟 4：KCL 3.x 組態概觀和建議
<a name="step4-configuration-migration"></a>

如需詳細了解 KCL 3.x 中，在 KCL 1.x 之後推出的相關組態，請參閱 [KCL 組態](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html)和 [KCL 移轉用戶端組態](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration)。

**重要**  
我們建議使用 `ConfigsBuilder` 在 KCL 3.x 及更新版本中設定組態，而非直接建立 `checkpointConfig`、`coordinatorConfig`、`leaseManagementConfig`、`metricsConfig`、`processorConfig` 及 `retrievalConfig` 的物件，以避免排程器初始化問題。`ConfigsBuilder` 則提供更靈活且跟更容易維護的 KCL 應用程式設定方式。

#### KCL 3.x 的更新預設值組態
<a name="kcl3-configuration-overview"></a>

`billingMode`  
KCL 1.x 版中，`billingMode` 的預設值設定為 `PROVISIONED`。不過，KCL 3.x 版 `billingMode` 的預設值為 `PAY_PER_REQUEST` (隨需模式)。我們建議租用資料表使用隨需容量模式，根據用量自動調整容量。如需租用資料表使用佈建容量的指南，請參閱[具有佈建容量模式的租用資料表最佳實務](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html)。

`idleTimeBetweenReadsInMillis`  
在 KCL 1.x 版中，`idleTimeBetweenReadsInMillis` 的預設值設定為 1,000 (或 1 秒)。KCL 3.x 版會將 i`dleTimeBetweenReadsInMillis` 的預設值設定為 1,500 (或 1.5 秒)，但 Amazon DynamoDB Streams Kinesis 轉接器會將預設值覆寫為 1,000 (或 1 秒)。

#### KCL 3.x 中的新組態
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
此組態會定義新發現碎片開始處理之前的時間間隔，計算方式為 1.5 x `leaseAssignmentIntervalMillis`。如果未明確配置此設定，則時間間隔預設為 1.5 x `failoverTimeMillis`。處理新碎片包含掃描租用資料表，並在租用資料表上查詢全域次要索引 (GSI)。降低 `leaseAssignmentIntervalMillis` 會增加掃描和查詢操作的頻率，進而產生更高的 DynamoDB 成本。我們建議將此值設定為 2000 (或 2 秒)，將處理新碎片的延遲降至最低。

`shardConsumerDispatchPollIntervalMillis`  
此組態定義了碎片取用者進行連續輪詢以觸發狀態轉換的間隔時間。在 KCL 1.x 版中，此行為由 `idleTimeInMillis` 參數控制，該參數並未公開為可配置設定。使用 KCL 3.x 版時，我們建議設定此組態，符合您在 KCL 1.x 版設定用於 ` idleTimeInMillis` 的值。

### 步驟 5：從 KCL 2.x 移轉至 KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

為了確保順利轉換至最新的 Kinesis Client Library (KCL) 版本及其相容性，請遵循移轉指南[從 KCL 2.x 升級到 KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics) 中，步驟 5-8 的指示。

如需常見的 KCL 3.x 疑難排解問題，請參閱 [KCL 取用者應用程式疑難排解](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html)。

# 轉返為先前的 KCL 版本
<a name="kcl-migration-rollback"></a>

本主題說明如何將取用者應用程式轉返為先前的 KCL 版本。轉返程序包含兩個步驟：

1. 執行 [KCL 移轉工具](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。

1. 重新部署先前 KCL 版本的程式碼。

## 步驟 1：執行 KCL 移轉工具
<a name="kcl-migration-rollback-step1"></a>

當您需要轉返至先前的 KCL 版本時，您必須執行 KCL 移轉工具。此工具會執行兩項重要任務：
+ 移除 DynamoDB 租用資料表上，稱為工作者指標資料表和全域次要索引的中繼資料資料表。這些成品由 KCL 3.x 建立，但當您轉返至先前版本時，不需要這些成品。
+ 此工具可讓所有工作者在 KCL 1.x 相容模式下執行，並開始使用先前 KCL 版本的負載平衡演算法。如果您使用 KCL 3.x 新負載平衡演算法時遇到問題，此步驟會立即緩解該問題。

**重要**  
DynamoDB 中的協調器狀態資料表必須存在，且不得在移轉、轉返及向前復原過程中刪除。

**注意**  
請務必讓取用者應用程式中的所有工作者，在指定時間使用相同的負載平衡演算法。KCL 移轉工具可確保 KCL 3.x 取用者應用程式中的所有工作者全數切換到 KCL 1.x 相容模式，以便在應用程式轉返至先前的 KCL 版本期間，所有工作者都執行相同的負載平衡演算法。

您可以在 [KCL GitHub 儲存庫](https://github.com/awslabs/amazon-kinesis-client/tree/master)的指令碼目錄中，下載 [KCL 移轉工具](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。從具備適當許可的工作者或主機執行指令碼，以寫入協調器狀態資料表、工作者指標資料表和租用資料表。確保已針對 KCL 取用者應用程式設定適當的 [IAM 許可](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html)。使用指定的命令，每個 KCL 應用程式僅執行一次指令碼：

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### Parameters
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
將*區域*取代為 AWS 區域。

`--application_name`  
如果您使用 DynamoDB 中繼資料資料表 (租用資料表、協調器狀態資料表和工作者指標資料表) 的預設名稱，則需要此參數。如果您已為這些資料表指定自訂名稱，可以省略此參數。使用您現有應用程式的名稱取代 *applicationName*。如果未提供自訂名稱，工具會使用此名稱衍生預設資料表名稱。

`--lease_table_name`  
當您已在 KCL 組態中自訂租用資料表的名稱時，需要此參數。如果您使用的是預設資料表名稱，可以省略此參數。使用您為租用資料表指定的自訂資料表名稱，取代 *leaseTableName*。

`--coordinator_state_table_name`  
當您在 KCL 組態中為自訂協調器狀態資料表名稱時，需要此參數。如果您使用的是預設資料表名稱，可以省略此參數。使用您為協調器狀態資料表指定的自訂資料表名稱，取代 *coordinatorStateTableName*。

`--worker_metrics_table_name`  
當您在 KCL 組態中自訂工作者指標資料表的名稱時，需要此參數。如果您使用的是預設資料表名稱，可以省略此參數。使用您為工作者指標資料表指定的自訂資料表名稱，取代 *workerMetricsTableName*。

## 步驟 2：使用先前的 KCL 版本重新部署程式碼
<a name="kcl-migration-rollback-step2"></a>

**重要**  
KCL 移轉工具產生的輸出中提及 2.x 版時，應解釋為提及 KCL 1.x 版。執行指令碼不會執行完整的轉返，只會切換到 KCL 1.x 版使用的負載平衡演算法。

執行轉返 KCL 移轉工具後，您會看到以下其中一則訊息：

訊息 1  
「轉返已完成。您的應用程式先前執行 2x 相容功能。請使用先前的 KCL 版本部署程式碼，轉返至先前的應用程式二進位檔。」  
**必要動作：**這表示您的工作者在 KCL 1.x 相容模式下執行。將具有先前 KCL 版本的程式碼，重新部署到您的工作者。

訊息 2  
「轉返已完成。您的 KCL 應用程式先前執行 3x 功能，並會轉返至 2x 相容功能。如果您在短時間內沒看到緩解措施，請使用先前的 KCL 版本部署程式碼，轉返到先前的應用程式二進位檔。」  
**必要動作：**這表示您的工作者是在 KCL 3.x 模式下執行，而 KCL 移轉工具會將所有工作者切換到 KCL 1.x 相容模式。將具有先前 KCL 版本的程式碼，重新部署到您的工作者。

訊息 3  
「應用程式已轉返。所有可刪除的 KCLv3 資源都已清除，以避免產生費用，直到應用程式可以透過移轉向前復原為止。」  
**必要動作：**這表示您的工作者已轉返，以在 KCL 1.x 相容模式下執行。將具有先前 KCL 版本的程式碼，重新部署到您的工作者。

# 轉返後向前復原至 KCL 3.x
<a name="kcl-migration-rollforward"></a>

本主題說明如何在轉返後將取用者應用程式向前復原至 KCL 3.x。當您需要向前復原時，您必須完成兩步驟的程序：

1. 執行 [KCL 移轉工具](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。

1. 使用 KCL 3.x 部署程式碼。

## 步驟 1：執行 KCL 移轉工具
<a name="kcl-migration-rollforward-step1"></a>

使用以下命令執行 KCL 移轉工具，以向前復原至 KCL 3.x：

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### Parameters
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
將*區域*取代為您的 AWS 區域。

`--application_name`  
如果您使用協調器狀態資料表的預設名稱，則需要此參數。如果您已指定協調器狀態資料表的自訂名稱，可以省略此參數。使用您現有應用程式的名稱取代 *applicationName*。如果未提供自訂名稱，工具會使用此名稱衍生預設資料表名稱。

`--coordinator_state_table_name`  
當您在 KCL 組態中為自訂協調器狀態資料表名稱時，需要此參數。如果您使用的是預設資料表名稱，可以省略此參數。使用您為協調器狀態資料表指定的自訂資料表名稱，取代 *coordinatorStateTableName*。

以向前復原模式執行移轉工具後，KCL 會建立 KCL 3.x 所需的下列 DynamoDB 資源：
+ 租用資料表上的全域次要索引
+ 工作者指標資料表

## 步驟 2：使用 KCL 3.x 部署程式碼
<a name="kcl-migration-rollforward-step2"></a>

執行向前復原的 KCL 移轉工具之後，請使用 KCL 3.x 將程式碼部署至工作者。若要完成移轉，請參閱[步驟 8：完成移轉](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish)。

# 逐步解說：DynamoDB Streams Kinesis 轉接器
<a name="Streams.KCLAdapter.Walkthrough"></a>

本節會逐步解說使用 Amazon Kinesis Client Library 與 Amazon DynamoDB Streams Kinesis 轉接器的 Java 應用程式。此應用程式示範資料複寫的範例，將一份資料表的寫入活動套用至第二份資料表，讓兩份資料表的內容保持同步。如需來源碼，請參閱「[完整程式：DynamoDB Streams Kinesis 轉接器](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)」。

此程式執行下列操作：

1. 建立兩個 DynamoDB 資料表，並命名為 `KCL-Demo-src` 和 `KCL-Demo-dst`。這些資料表各會啟用串流。

1. 在來源資料表中新增、更新與刪除項目，以產生更新活動。這會導致將資料寫入資料表的串流。

1. 從串流讀取紀錄、將其重新建構為 DynamoDB 請求，再將請求套用至目標資料表。

1. 掃描來源與目標資料表，以確定其內容相同。

1. 刪除資料表以清除。

下列章節將說明這些步驟，並在演練結尾顯示完整的應用程式。

**Topics**
+ [步驟 1：建立 DynamoDB 資料表](#Streams.KCLAdapter.Walkthrough.Step1)
+ [步驟 2：在來源資料表中產生更新活動](#Streams.KCLAdapter.Walkthrough.Step2)
+ [步驟 3：處理串流](#Streams.KCLAdapter.Walkthrough.Step3)
+ [步驟 4：確定這兩個資料表的內容相同](#Streams.KCLAdapter.Walkthrough.Step4)
+ [步驟 5：清除](#Streams.KCLAdapter.Walkthrough.Step5)
+ [完整程式：DynamoDB Streams Kinesis 轉接器](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## 步驟 1：建立 DynamoDB 資料表
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

第一步是建立兩個 DynamoDB 資料表：一個來源資料表與一個目標資料表。來源資料表串流中的 `StreamViewType` 是 `NEW_IMAGE`。這表示每當在此資料表中修改項目時，項目的「修改後」影像就會寫入串流。串流可透過此方法追蹤資料表上的所有寫入活動。

下列範例顯示用來建立這兩份資料表的程式碼。

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## 步驟 2：在來源資料表中產生更新活動
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

下一步是在來源資料表上產生一些寫入活動。在此活動進行期間，來源資料表的串流也會近乎即時地更新。

此應用程式會使用呼叫 `PutItem`、`UpdateItem` 與 `DeleteItem` API 操作的方法，來定義小幫手類別，以寫入資料。下列程式碼範例示範如何使用這些方法。

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## 步驟 3：處理串流
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

程式現在會開始處理串流。DynamoDB Streams Kinesis 轉接器會作為 KCL 與 DynamoDB Streams 端點之間的透明層，讓程式碼可以充分利用 KCL，而不需要發出低階 DynamoDB Streams 呼叫。此程式會執行下列任務：
+ 它會以遵守 KCL 介面定義的方法 (`StreamsRecordProcessor`、`initialize` 與 `processRecords`) 來定義紀錄處理器類別 `shutdown`。`processRecords` 方法包含從來源資料表串流讀取與寫入目標資料表所需的邏輯。
+ 它會定義紀錄處理器類別的類別處理站 (`StreamsRecordProcessorFactory`)。這是使用 KCL 之 Java 程式的必要任務。
+ 它會執行個體化與類別處理站相關聯的新 KCL `Worker`。
+ 它會在紀錄處理完成時關閉 `Worker`。

或者，在 Streams KCL Adapter 組態中啟用追趕模式，以便在串流處理延遲超過一分鐘 （預設） 時自動將 GetRecords API 呼叫速率擴展 3 倍 （預設），協助您的串流取用者處理資料表中的高輸送量峰值。

若要進一步了解 KCL 介面定義，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[使用 Kinesis Client Library 開發取用者](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)。

下列程式碼範例顯示 `StreamsRecordProcessor` 中的主迴圈。`case` 陳述式會根據串流紀錄中顯示的 `OperationType` 來決定要執行的動作。

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## 步驟 4：確定這兩個資料表的內容相同
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

此時，來源與目標資料表的內容會同步。此應用程式會對這兩個資料表發出 `Scan` 請求，以確認其內容事實上相同。

`DemoHelper` 類別包含呼叫低階 `Scan` API 的 `ScanTable` 方法。下列範例示範其使用方法。

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## 步驟 5：清除
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

示範已完成，因此應用程式會刪除來源與目標資料表。請參閱以下程式碼範例。即使在刪除資料表後，其串流也會保持可用長達 24 小時，然後就自動刪除。

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# 完整程式：DynamoDB Streams Kinesis 轉接器
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

以下是執行此[逐步解說：DynamoDB Streams Kinesis 轉接器](Streams.KCLAdapter.Walkthrough.md)所述之任務的完整 Java 程式。當您執行它時，應該會看到類似如下的輸出。

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**重要**  
 若要執行此程式，請確定用戶端應用程式可使用政策存取 DynamoDB 和 Amazon CloudWatch。如需詳細資訊，請參閱[適用於 DynamoDB 的以身分為基礎的政策](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies)。

原始程式碼包含四個`.java`檔案。若要建置此程式，請新增下列相依性，其中包括 Amazon Kinesis Client Library (KCL) 3.x 和適用於 Java v2 的 AWS SDK 做為暫時性相依性：

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

來源檔案為：
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```

# DynamoDB Streams 低階 API：Java 範例
<a name="Streams.LowLevel.Walkthrough"></a>

**注意**  
此頁面上的程式碼並不詳盡，並且無法應對使用 Amazon DynamoDB Streams 的所有案例。從 DynamoDB 使用串流紀錄的建議方式，是透過使用 Kinesis Client Library (KCL) 的 Amazon Kinesis 轉接器，如 [使用 DynamoDB Streams Kinesis 轉接器處理串流記錄](Streams.KCLAdapter.md) 中所述。

本節包含顯示作用中 DynamoDB Streams 的 Java 程式。此程式執行下列操作：

1. 建立啟用串流的 DynamoDB 資料表。

1. 說明此資料表的串流設定。

1. 修改資料表中的資料。

1. 描述串流中的碎片。

1. 讀取碎片中的串流紀錄。

1. 擷取子碎片並持續讀取記錄。

1. 清除。

當您執行此程式時，會看到與下面類似的輸出。

```
Testing Streams Demo
Creating an Amazon DynamoDB table TestTableForStreams with a simple primary key: Id
Waiting for TestTableForStreams to be created...
Current stream ARN for TestTableForStreams: arn:aws:dynamodb:us-east-2:123456789012:table/TestTableForStreams/stream/2018-03-20T16:49:55.208
Stream enabled: true
Update view type: NEW_AND_OLD_IMAGES

Performing write activities on TestTableForStreams
Processing item 1 of 100
Processing item 2 of 100
Processing item 3 of 100
...
Processing item 100 of 100
Shard: {ShardId: shardId-1234567890-...,SequenceNumberRange: {StartingSequenceNumber: 100002572486797508907,},}
    Shard iterator: EjYFEkX2a26eVTWe...
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2000001584047545833909, SizeBytes=22, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2100003604869767892701, SizeBytes=55, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, SequenceNumber=2200001099771112898434, SizeBytes=36, StreamViewType=NEW_AND_OLD_IMAGES)
...
Deleting the table...
Table StreamsDemoTable deleted.
Demo complete
```

**Example 範例**  

```
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter;

public class StreamsLowLevelDemo {


    public static void main(String[] args) {
        final String usage = "Testing Streams Demo";
        try {
            System.out.println(usage);

            String tableName = "StreamsDemoTable";
            String key = "Id";
            System.out.println("Creating an Amazon DynamoDB table " + tableName + " with a simple primary key: " + key);
            Region region = Region.US_WEST_2;
            DynamoDbClient ddb = DynamoDbClient.builder()
                    .region(region)
                    .build();

            DynamoDbStreamsClient ddbStreams = DynamoDbStreamsClient.builder()
                    .region(region)
                    .build();
            DescribeTableRequest describeTableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
            TableDescription tableDescription = null;
            try{
                tableDescription = ddb.describeTable(describeTableRequest).table();
            }catch (Exception e){
                System.out.println("Table " + tableName + " does not exist.");
                tableDescription = createTable(ddb, tableName, key);
            }

            // Print the stream settings for the table
            String streamArn = tableDescription.latestStreamArn();
           
            StreamSpecification streamSpec = tableDescription.streamSpecification();
            System.out.println("Current stream ARN for " + tableDescription.tableName() + ": " +
                   streamArn);
            System.out.println("Stream enabled: " + streamSpec.streamEnabled());
            System.out.println("Update view type: " + streamSpec.streamViewType());
            System.out.println();
            // Generate write activity in the table
            System.out.println("Performing write activities on " + tableName);
            int maxItemCount = 100;
            for (Integer i = 1; i <= maxItemCount; i++) {
                System.out.println("Processing item " + i + " of " + maxItemCount);
                // Write a new item
                putItemInTable(key, i, tableName, ddb);
                // Update the item
                updateItemInTable(key, i, tableName, ddb);
                // Delete the item
                deleteDynamoDBItem(key, i, tableName, ddb);
            }

            // Process Stream
            processStream(streamArn, maxItemCount, ddb, ddbStreams, tableName);

            // Delete the table
            System.out.println("Deleting the table...");
            DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                    .tableName(tableName)
                    .build();
            ddb.deleteTable(deleteTableRequest);
            System.out.println("Table " + tableName + " deleted.");
            System.out.println("Demo complete");
            ddb.close();
        } catch (Exception e) {
            System.out.println("Error: " + e.getMessage());
        }
    }

    private static void processStream(String streamArn, int maxItemCount, DynamoDbClient ddb, DynamoDbStreamsClient ddbStreams, String tableName) {
        // Get all the shard IDs from the stream. Note that DescribeStream returns
        // the shard IDs one page at a time.
        String lastEvaluatedShardId = null;
        do {
            DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
                    .streamArn(streamArn)
                    .exclusiveStartShardId(lastEvaluatedShardId).build();
            DescribeStreamResponse describeStreamResponse = ddbStreams.describeStream(describeStreamRequest);

            List<Shard> shards = describeStreamResponse.streamDescription().shards();

            // Process each shard on this page

            fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, shards);

            // If LastEvaluatedShardId is set, then there is
            // at least one more page of shard IDs to retrieve
            lastEvaluatedShardId = describeStreamResponse.streamDescription().lastEvaluatedShardId();

        } while (lastEvaluatedShardId != null);

    }

    private static void fetchShardsAndReadRecords(String streamArn, int maxItemCount, DynamoDbStreamsClient ddbStreams, List<Shard> shards) {
        for (Shard shard : shards) {
            String shardId = shard.shardId();
            System.out.println("Shard: " + shard);

            // Get an iterator for the current shard
            GetShardIteratorRequest shardIteratorRequest = GetShardIteratorRequest.builder()
                    .streamArn(streamArn).shardId(shardId)
                    .shardIteratorType(ShardIteratorType.TRIM_HORIZON).build();

            GetShardIteratorResponse getShardIteratorResult = ddbStreams.getShardIterator(shardIteratorRequest);

            String currentShardIter = getShardIteratorResult.shardIterator();

            // Shard iterator is not null until the Shard is sealed (marked as READ_ONLY).
            // To prevent running the loop until the Shard is sealed, we process only the
            // items that were written into DynamoDB and then exit.
            int processedRecordCount = 0;
            while (currentShardIter != null && processedRecordCount < maxItemCount) {
                // Use the shard iterator to read the stream records
                GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder()
                        .shardIterator(currentShardIter).build();
                GetRecordsResponse getRecordsResult = ddbStreams.getRecords(getRecordsRequest);
                List<Record> records = getRecordsResult.records();
                for (Record record : records) {
                    System.out.println("        " + record.dynamodb());
                }
                processedRecordCount += records.size();
                currentShardIter = getRecordsResult.nextShardIterator();
            }
            if (currentShardIter == null){
                System.out.println("Shard has been fully processed. Shard iterator is null.");
                System.out.println("Fetch the child shard to continue processing instead of bulk fetching all shards");
                DescribeStreamRequest describeStreamRequestForChildShards = DescribeStreamRequest.builder()
                        .streamArn(streamArn)
                        .shardFilter(ShardFilter.builder()
                                .type(ShardFilterType.CHILD_SHARDS)
                                .shardId(shardId).build())
                        .build();
                DescribeStreamResponse describeStreamResponseChildShards = ddbStreams.describeStream(describeStreamRequestForChildShards);
                fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, describeStreamResponseChildShards.streamDescription().shards());
            }
        }
    }

    private static void putItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());
        item.put("Message", AttributeValue.builder()
                .s("New Item!")
                .build());
        PutItemRequest request = PutItemRequest.builder()
                .tableName(tableName)
                .item(item)
                .build();
        ddb.putItem(request);
    }

    private static void updateItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {

        HashMap<String, AttributeValue> itemKey = new HashMap<>();
        itemKey.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());


        HashMap<String, AttributeValueUpdate> updatedValues = new HashMap<>();
        updatedValues.put("Message", AttributeValueUpdate.builder()
                .value(AttributeValue.builder().s("This is an updated item").build())
                .action(AttributeAction.PUT)
                .build());

        UpdateItemRequest request = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(itemKey)
                .attributeUpdates(updatedValues)
                .build();
        ddb.updateItem(request);
    }

    public static void deleteDynamoDBItem(String key, Integer i, String tableName, DynamoDbClient ddb) {
        HashMap<String, AttributeValue> keyToGet = new HashMap<>();
        keyToGet.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());

        DeleteItemRequest deleteReq = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(keyToGet)
                .build();
        ddb.deleteItem(deleteReq);
    }

    public static TableDescription createTable(DynamoDbClient ddb, String tableName, String key) {
        DynamoDbWaiter dbWaiter = ddb.waiter();
        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType("NEW_AND_OLD_IMAGES")
                .build();
        CreateTableRequest request = CreateTableRequest.builder()
                .attributeDefinitions(AttributeDefinition.builder()
                        .attributeName(key)
                        .attributeType(ScalarAttributeType.S)
                        .build())
                .keySchema(KeySchemaElement.builder()
                        .attributeName(key)
                        .keyType(KeyType.HASH)
                        .build())
                .billingMode(BillingMode.PAY_PER_REQUEST) //  DynamoDB automatically scales based on traffic.
                .tableName(tableName)
                .streamSpecification(streamSpecification)
                .build();

        TableDescription newTable;
        try {
            CreateTableResponse response = ddb.createTable(request);
            DescribeTableRequest tableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
                    
            System.out.println("Waiting for " + tableName + " to be created...");

            // Wait until the Amazon DynamoDB table is created.
            WaiterResponse<DescribeTableResponse> waiterResponse = dbWaiter.waitUntilTableExists(tableRequest);
            waiterResponse.matched().response().ifPresent(System.out::println);
            newTable = response.tableDescription();
            return newTable;

        } catch (DynamoDbException e) {
            System.err.println(e.getMessage());
            System.exit(1);
        }
        return null;
    }



}
```

# DynamoDB 串流和 AWS Lambda 觸發
<a name="Streams.Lambda"></a>

Amazon DynamoDB 與 整合， AWS Lambda 因此您可以建立*觸發，*也就是自動回應 DynamoDB Streams 中事件的程式碼片段。您可以利用觸發條件建立對 DynamoDB 資料表資料修改做出反應的應用程式。

**Topics**
+ [教學課程 \$11：使用篩選條件來處理 Amazon DynamoDB 的所有事件， AWS Lambda 並使用 AWS CLI](Streams.Lambda.Tutorial.md)
+ [教學課程 \$12：使用篩選條件來處理 DynamoDB 和 Lambda 的所有事件](Streams.Lambda.Tutorial2.md)
+ [搭配 Lambda 使用 DynamoDB Streams 的最佳實務](Streams.Lambda.BestPracticesWithDynamoDB.md)

如果您在資料表上啟用 DynamoDB Streams，您可以將串流 Amazon Resource Name (ARN) 與您寫入的 AWS Lambda 函數建立關聯。然後，即可將該 DynamoDB 資料表的所有變動動作擷取為串流上的項目。例如，您可以設定觸發條件，以便在修改資料表中的項目時，新記錄會立即顯示在該資料表的串流中。

**注意**  
如果您為單一 DynamoDB 串流訂閲兩個以上的 Lambda 函式，可能會發生讀取限流。

此 [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) 服務會以每秒四次的速度輪詢串流以偵測新記錄。當有新的串流記錄可用時，系統會同步調用 Lambda 函式。對於相同的 DynamoDB 串流，最多可以訂閱兩個 Lambda 函式。如果您為相同 DynamoDB 串流訂閲兩個以上的 Lambda 函式，可能會發生讀取限流。

Lambda 函式可以傳送通知、啟動工作流程，或執行您指定的其他許多動作。您可以撰寫 Lambda 函式僅將每筆串流紀錄複製到耐久性儲存，例如 Amazon S3 File Gateway (Amazon S3)，並在您的資料表中建立永久的寫入活動稽核軌跡。或者，假設您有一個會寫入 `GameScores` 表的手機遊戲應用程式。每當 `TopScore` 資料表的 `GameScores` 屬性更新時，對應的串流紀錄就會寫入資料表串流。然後，這個事件就會觸發在社交媒體網路張貼賀電的 Lambda 函式。(此函數也可以編寫為忽略所有非更新至 `GameScores` 或不修改 `TopScore` 屬性的串流紀錄)。

如果函數傳回錯誤，Lambda 會不斷重試批次直到處理成功或資料過期。您還可以設定 Lambda，使用較小批次重試、限制重試次數、在記錄太舊時捨棄，以及其他選項。

作為效能最佳實務，Lambda 函式必須為短期函數。為了避免產生不必要的處理延遲，此函數也不應執行複雜的邏輯。尤其對於高速串流而言，相較於同步長時間執行的 Lambda，觸發非同步後續處理 Step Function 工作流程是較佳的做法。

 您可以在 DynamoDB 串流上設定資源型政策，以授予 Lambda 函數的跨帳戶讀取存取權， AWS 藉此跨帳戶使用 Lambda 觸發。若要進一步了解如何設定串流以允許跨帳戶存取，請參閱《DynamoDB 開發人員指南》中的[使用跨帳戶 AWS Lambda 函數共用存取權](rbac-cross-account-access.md#shared-access-cross-acount-lambda)。

如需 的詳細資訊 AWS Lambda，請參閱 [AWS Lambda 開發人員指南](https://docs.aws.amazon.com/lambda/latest/dg/)。

# 教學課程 \$11：使用篩選條件來處理 Amazon DynamoDB 的所有事件， AWS Lambda 並使用 AWS CLI
<a name="Streams.Lambda.Tutorial"></a>

 

在本教學課程中，您將建立 AWS Lambda 觸發程序來處理來自 DynamoDB 資料表的串流。

**Topics**
+ [步驟 1：建立啟用串流的 DynamoDB 資料表](#Streams.Lambda.Tutorial.CreateTable)
+ [步驟 2：建立 Lambda 執行角色](#Streams.Lambda.Tutorial.CreateRole)
+ [步驟 3：建立 Amazon SNS 主題](#Streams.Lambda.Tutorial.SNSTopic)
+ [步驟 4：建立並測試 Lambda 函式](#Streams.Lambda.Tutorial.LambdaFunction)
+ [步驟 5：建立並測試觸發器](#Streams.Lambda.Tutorial.CreateTrigger)

本教學案例為 Woofer，簡易的社交網路。Woofer 使用者使用互相傳送的 *barks (簡短的文字訊息)* 進行通訊。下圖顯示此應用程式的元件和工作流程。

![\[DynamoDB 資料表的 Woofer 應用程式工作流程、串流記錄、Lambda 函式和 Amazon SNS 主題。\]](http://docs.aws.amazon.com/zh_tw/amazondynamodb/latest/developerguide/images/StreamsAndTriggers.png)


1. 使用者將某個項目寫入 DynamoDB 資料表 (`BarkTable`)。資料表中的每個項目代表一個 bark。

1. 寫入新串流紀錄即反映新的項目已新增至 `BarkTable`。

1. 新的串流記錄會觸發 AWS Lambda 函數 (`publishNewBark`)。

1. 如果該串流紀錄指出新的項目已新增至 `BarkTable`，則 Lambda 函式會從串流紀錄讀取資料，將訊息發布至 Amazon Simple Notification Service (Amazon SNS) 中的主題。

1. Amazon SNS 主題的訂閱用戶會收到此訊息。(在此教學中，唯一的訂閱用戶是電子郵件地址)。

**開始之前**  
本教學課程使用 AWS Command Line Interface AWS CLI。若您尚未執行此作業，請遵循《[AWS Command Line Interface 使用者指南](https://docs.aws.amazon.com/cli/latest/userguide/)》中的說明安裝及設定 AWS CLI。

## 步驟 1：建立啟用串流的 DynamoDB 資料表
<a name="Streams.Lambda.Tutorial.CreateTable"></a>

在此步驟中，您會建立 DynamoDB 資料表 (`BarkTable`) 存放 Woofer 使用者的所有 bark。主索引鍵包含 `Username` (分割區索引鍵) 和 `Timestamp` (排序索引鍵)。這些屬性的類型皆為字串。

`BarkTable` 已啟用串流。在本教學課程稍後，您可以透過將 AWS Lambda 函數與串流建立關聯來建立觸發。

1. 輸入下列命令建立資料表。

   ```
   aws dynamodb create-table \
       --table-name BarkTable \
       --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \
       --key-schema AttributeName=Username,KeyType=HASH  AttributeName=Timestamp,KeyType=RANGE \
       --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
       --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
   ```

1. 在輸出中，尋找 `LatestStreamArn`。

   ```
   ...
   "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

   記下 `region` 和 `accountID`，因為您在本教學的其他步驟中會需要它們。

## 步驟 2：建立 Lambda 執行角色
<a name="Streams.Lambda.Tutorial.CreateRole"></a>

在此步驟中，您會建立 AWS Identity and Access Management (IAM) 角色 (`WooferLambdaRole`) 並為其指派許可。這個角色會由您在 [步驟 4：建立並測試 Lambda 函式](#Streams.Lambda.Tutorial.LambdaFunction) 中建立的 Lambda 函式所使用。

您也要為該角色建立政策。此政策包含 Lambda 函式在執行時期所需要的全部許可。

1. 使用下列內容建立名為 `trust-relationship.json` 的檔案。

------
#### [ JSON ]

****  

   ```
   {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "lambda.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
   ```

------

1. 輸入下列命令建立 `WooferLambdaRole`。

   ```
   aws iam create-role --role-name WooferLambdaRole \
       --path "/service-role/" \
       --assume-role-policy-document file://trust-relationship.json
   ```

1. 使用下列內容建立名為 `role-policy.json` 的檔案。（將 `region`和 取代`accountID`為您的 AWS 區域和帳戶 ID。)

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogGroup",
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": "arn:aws:logs:us-east-1:111122223333:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "dynamodb:DescribeStream",
                   "dynamodb:GetRecords",
                   "dynamodb:GetShardIterator",
                   "dynamodb:ListStreams"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:111122223333:table/BarkTable/stream/*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "sns:Publish"
               ],
               "Resource": [
                   "*"
               ]
           }
       ]
   }
   ```

------

   政策有四個陳述式可讓 `WooferLambdaRole` 執行下列操作：
   + 執行 Lambda 函式 (`publishNewBark`)。您稍後要在本教學中建立此函數。
   + 存取 Amazon CloudWatch Logs。Lambda 函式會在執行時期將診斷寫入 CloudWatch Logs。
   + 從 `BarkTable` 的 DynamoDB 串流讀取資料。
   + 將訊息發布到 Amazon SNS。

1. 輸入下列命令將此政策連接至 `WooferLambdaRole`。

   ```
   aws iam put-role-policy --role-name WooferLambdaRole \
       --policy-name WooferLambdaRolePolicy \
       --policy-document file://role-policy.json
   ```

## 步驟 3：建立 Amazon SNS 主題
<a name="Streams.Lambda.Tutorial.SNSTopic"></a>

在此步驟中，您要建立 Amazon SNS 主題 (`wooferTopic`)，並用電子郵件地址訂閱此主題。您的 Lambda 函式會使用此主題發布 Woofer 使用者的新 bark。

1. 輸入以下命令來建立新的 Amazon SNS 主題。

   ```
   aws sns create-topic --name wooferTopic
   ```

1. 輸入下列命令使用電子郵件地址訂閱 `wooferTopic`。(將 `region` 和 `accountID` 更換為 AWS 區域和帳戶 ID，將 `example@example.com` 更換為有效的電子郵件地址。)

   ```
   aws sns subscribe \
       --topic-arn arn:aws:sns:region:accountID:wooferTopic \
       --protocol email \
       --notification-endpoint example@example.com
   ```

1. Amazon SNS 會將確認訊息傳送至電子郵件地址。選擇該郵件中的 **Confirm subscription (確認訂閱)** 連結，完成訂閱程序。

## 步驟 4：建立並測試 Lambda 函式
<a name="Streams.Lambda.Tutorial.LambdaFunction"></a>

在此步驟中，您會建立 AWS Lambda 函數 (`publishNewBark`) 來處理來自 的串流記錄`BarkTable`。

`publishNewBark` 函數只處理對應至 `BarkTable` 新項目的串流事件。此函數會讀取此種事件的資料，然後調用 Amazon SNS 來進行發布。

1. 使用下列內容建立名為 `publishNewBark.js` 的檔案。將 `region`和 取代`accountID`為您的 AWS 區域和帳戶 ID。

   ```
   'use strict';
   var AWS = require("aws-sdk");
   var sns = new AWS.SNS();
   
   exports.handler = (event, context, callback) => {
   
       event.Records.forEach((record) => {
           console.log('Stream record: ', JSON.stringify(record, null, 2));
   
           if (record.eventName == 'INSERT') {
               var who = JSON.stringify(record.dynamodb.NewImage.Username.S);
               var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S);
               var what = JSON.stringify(record.dynamodb.NewImage.Message.S);
               var params = {
                   Subject: 'A new bark from ' + who,
                   Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what,
                   TopicArn: 'arn:aws:sns:region:accountID:wooferTopic'
               };
               sns.publish(params, function(err, data) {
                   if (err) {
                       console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2));
                   } else {
                       console.log("Results from sending message: ", JSON.stringify(data, null, 2));
                   }
               });
           }
       });
       callback(null, `Successfully processed ${event.Records.length} records.`);
   };
   ```

1. 建立 zip 檔以包含 `publishNewBark.js`。如果您有 zip 命令列公用程式，即可輸入下列命令執行此作業。

   ```
   zip publishNewBark.zip publishNewBark.js
   ```

1. 當您建立 Lambda 函式時，您要為自己在 [步驟 2：建立 Lambda 執行角色](#Streams.Lambda.Tutorial.CreateRole) 中建立的 `WooferLambdaRole` 指定 Amazon Resource Name (ARN)。輸入下列命令以擷取此 ARN。

   ```
   aws iam get-role --role-name WooferLambdaRole
   ```

   在輸出中，尋找 `WooferLambdaRole` 的 ARN。

   ```
   ...
   "Arn": "arn:aws:iam::region:role/service-role/WooferLambdaRole"
   ...
   ```

   輸入下列命令建立 Lambda 函式。將 *roleARN* 取代為 `WooferLambdaRole` 的 ARN。

   ```
   aws lambda create-function \
       --region region \
       --function-name publishNewBark \
       --zip-file fileb://publishNewBark.zip \
       --role roleARN \
       --handler publishNewBark.handler \
       --timeout 5 \
       --runtime nodejs16.x
   ```

1. 現在，測試 `publishNewBark` 確認能否作用。若要執行此作業，您要提供類似 DynamoDB Streams 中真實紀錄的輸入。

   使用下列內容建立名為 `payload.json` 的檔案。將 `region` 和 `accountID` 更換為 AWS 區域 和帳戶 ID。

   ```
   {
       "Records": [
           {
               "eventID": "7de3041dd709b024af6f29e4fa13d34c",
               "eventName": "INSERT",
               "eventVersion": "1.1",
               "eventSource": "aws:dynamodb",
               "awsRegion": "region",
               "dynamodb": {
                   "ApproximateCreationDateTime": 1479499740,
                   "Keys": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "NewImage": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Message": {
                           "S": "This is a bark from the Woofer social network"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "SequenceNumber": "13021600000000001596893679",
                   "SizeBytes": 112,
                   "StreamViewType": "NEW_IMAGE"
               },
               "eventSourceARN": "arn:aws:dynamodb:region:account ID:table/BarkTable/stream/2016-11-16T20:42:48.104"
           }
       ]
   }
   ```

   輸入下列命令測試 `publishNewBark` 函數。

   ```
   aws lambda invoke --function-name publishNewBark --payload file://payload.json --cli-binary-format raw-in-base64-out output.txt
   ```

   如果測試成功，您就會看到以下輸出。

   ```
   {
       "StatusCode": 200,
       "ExecutedVersion": "$LATEST"
   }
   ```

   此外，`output.txt` 檔案還會包含以下文字。

   ```
   "Successfully processed 1 records."
   ```

   您也會在幾分鐘內收到新的電子郵件訊息。
**注意**  
AWS Lambda 會將診斷資訊寫入 Amazon CloudWatch Logs。如果 Lambda 函式發生問題，您可以使用這些診斷進行疑難排解：  
在 [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/) 開啟 CloudWatch 主控台。
在導覽窗格中，選擇 **Logs** (日誌)。
選擇下列的日誌群組：`/aws/lambda/publishNewBark`
選擇最新的日誌串流，檢視函數的輸出 (和錯誤)。

## 步驟 5：建立並測試觸發器
<a name="Streams.Lambda.Tutorial.CreateTrigger"></a>

在 [步驟 4：建立並測試 Lambda 函式](#Streams.Lambda.Tutorial.LambdaFunction) 中，您已測試過 Lambda 函式，確保其能正確執行。在此步驟中，透過將 Lambda 函式 (`publishNewBark`) 與事件來源 (`BarkTable` 串流) 建立關聯來建立*觸發條件*。

1. 當您建立觸發時，您需要指定 `BarkTable` 串流的 ARN。輸入下列命令以擷取此 ARN。

   ```
   aws dynamodb describe-table --table-name BarkTable
   ```

   在輸出中，尋找 `LatestStreamArn`。

   ```
   ...
    "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

1. 輸入以下命令來建立觸發。將 `streamARN` 更換為實際的串流 ARN。

   ```
   aws lambda create-event-source-mapping \
       --region region \
       --function-name publishNewBark \
       --event-source streamARN  \
       --batch-size 1 \
       --starting-position TRIM_HORIZON
   ```

1. 測試觸發。輸入下列命令將項目新增至 `BarkTable`。

   ```
   aws dynamodb put-item \
       --table-name BarkTable \
       --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}
   ```

   您應該會在幾分鐘內收到新的電子郵件訊息。

1. 開啟 DynamoDB 主控台，在 `BarkTable` 中多新增幾個項目。您必須指定 `Username` 和 `Timestamp` 屬性的值。(您也應該指定 `Message` 的值，雖然它不是必要的)。您應該會因為每個新增至 `BarkTable` 的項目而收到新的電子郵件訊息。

   Lambda 函式只處理您新增至 `BarkTable` 的新項目。如果您要更新或刪除資料表中的項目，此函數不會執行任何作業。

**注意**  
AWS Lambda 會將診斷資訊寫入 Amazon CloudWatch Logs。如果 Lambda 函式發生問題，您可以使用這些診斷進行疑難排解。  
在 [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/) 開啟 CloudWatch 主控台。
在導覽窗格中，選擇 **Logs** (日誌)。
選擇下列的日誌群組：`/aws/lambda/publishNewBark`
選擇最新的日誌串流，檢視函數的輸出 (和錯誤)。

# 教學課程 \$12：使用篩選條件來處理 DynamoDB 和 Lambda 的所有事件
<a name="Streams.Lambda.Tutorial2"></a>

在本教學課程中，您將建立 AWS Lambda 觸發程序，以僅處理來自 DynamoDB 資料表之串流中的某些事件。

**Topics**
+ [全部整合 - CloudFormation](#Streams.Lambda.Tutorial2.Cloudformation)
+ [整合練習 - CDK](#Streams.Lambda.Tutorial2.CDK)

透過 [Lambda 事件篩選](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)，您可以使用篩選條件表達式來控制 Lambda 要傳送哪些事件給函數進行處理。對於每個 DynamoDB 串流，最多可以設定 5 個不同的篩選條件。如果您使用批次處理間隔，Lambda 會將篩選條件標準套用至每個新事件，以查看是否應將其納入目前的批次中。

篩選條件會透過稱為 `FilterCriteria` 的結構進行套用。`FilterCriteria` 的 3 項主要屬性是 `metadata properties`、`data properties` 和 `filter patterns`。

以下是 DynamoDB Streams 事件的範例結構：

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

`metadata properties` 是事件物件的欄位。若是 DynamoDB Streams，`metadata properties` 是類似 `dynamodb` 或 `eventName` 的欄位。

`data properties` 是事件主體的欄位。若要篩選 `data properties`，請務必將資料屬性包含在適當金鑰內的 `FilterCriteria` 之中。對於 DynamoDB 事件來源，資料金鑰為 `NewImage` 或 `OldImage`。

最後，篩選條件規則將會定義您要套用至特定屬性的篩選條件表達式。以下是一些範例：


| 比較運算子 | 範例 | 規則語法 (部分) | 
| --- | --- | --- | 
|  Null  |  產品類型為 Null  |  `{ "product_type": { "S": null } } `  | 
|  空白  |  產品名稱為空  |  `{ "product_name": { "S": [ ""] } } `  | 
|  Equals  |  州等於佛羅里達州  |  `{ "state": { "S": ["FL"] } } `  | 
|  及  |  產品原產州等於佛羅里達州，且產品類別為巧克力  |  `{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } } `  | 
|  或  |  產品原產州是佛羅里達州或加州  |  `{ "state": { "S": ["FL","CA"] } } `  | 
|  Not  |  產品原產州不是佛羅里達州  |  `{"state": {"S": [{"anything-but": ["FL"]}]}}`  | 
|  存在  |  自製產品存在  |  `{"homemade": {"S": [{"exists": true}]}}`  | 
|  不存在  |  自製產品不存在  |  `{"homemade": {"S": [{"exists": false}]}}`  | 
|  Begins with  |  PK 以 COMPANY 開頭  |  `{"PK": {"S": [{"prefix": "COMPANY"}]}}`  | 

您可以為一個 Lambda 函式最多指定 5 個事件篩選模式。請注意，這 5 個事件中的每一個都將評估為邏輯 OR。因此，如果您設定兩個分別名為 `Filter_One` 和 `Filter_Two` 的篩選條件，Lambda 函式將會執行 `Filter_One` OR `Filter_Two`。

**注意**  
[Lambda 事件篩選](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)頁面中有部分選項可用於篩選和比較數值，但不適用於 DynamoDB 篩選條件事件，因為 DynamoDB 中的數字會儲存為字串。例如 ` "quantity": { "N": "50" }`，從 `"N"` 屬性可以得知這是一個數字。

## 全部整合 - CloudFormation
<a name="Streams.Lambda.Tutorial2.Cloudformation"></a>

若要在實務中顯示事件篩選功能，請參考以下 CloudFormation 範本的範例。此範本將產生一個簡單的 DynamoDB 資料表，其中含有分割區索引鍵 PK 和排序索引鍵 SK，且已啟用 Amazon DynamoDB Streams。它會建立一個 Lambda 函式和一個簡單的 Lambda 執行角色，以允許將日誌寫入 Amazon CloudWatch，並從 Amazon DynamoDB Streams 讀取事件。它也會在 DynamoDB Streams 和 Lambda 函式之間新增事件來源映射，以便每次在 Amazon DynamoDB Streams 中發生事件時，都可以執行該函數。

```
AWSTemplateFormatVersion: "2010-09-09"

Description: Sample application that presents AWS Lambda event source filtering 
with Amazon DynamoDB Streams.

Resources:
  StreamsSampleDDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "PK"
          AttributeType: "S"
        - AttributeName: "SK"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "PK"
          KeyType: "HASH"
        - AttributeName: "SK"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: "NEW_AND_OLD_IMAGES"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17",		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17",		 	 	 
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource: !GetAtt StreamsSampleDDBTable.StreamArn

  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST

  ProcessEventLambda:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: 300
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          import logging

          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)

          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)

Outputs:
  StreamsSampleDDBTable:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.Arn
  StreamARN:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.StreamArn
```

部署此雲端編組範本之後，即可插入下列 Amazon DynamoDB 項目：

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

由於此雲端編組範本內嵌簡單的 Lambda 函式，因此可以在 Amazon CloudWatch 日誌群組中查看 Lambda 函式事件，如下所示：

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

**篩選條件範例**
+ **僅限符合指定州的產品**

此範例修改了 CloudFormation 範本，使其納入符合所有來自佛羅里達州 (縮寫為 "FL") 之產品的篩選條件。

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

重新部署堆疊後，即可將下列 DynamoDB 項目新增至資料表。請注意，它不會出現在 Lambda 函式日誌中，因為此範例中的產品來自加州。

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK#1000",
 "company_id": "1000",
 "fabric": "Florida Chocolates",
 "price": 15,
 "product_id": "1000",
 "quantity": 50,
 "state": "CA",
 "stores": 5,
 "type": ""
}
```
+ **僅限以 PK 和 SK 中部分值開頭的項目**

此範例修改了 CloudFormation 範本，使其納入下列條件：

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

請注意，AND 條件要求條件位於模式內部，且其中的索引鍵 PK 和 SK 位於相同表達式內並以逗號分隔。

以 PK 和 SK 中的部分值開頭或來自特定州。

此範例修改了 CloudFormation 範本，使其納入下列條件：

```
  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

請注意，新增 OR 條件的方法是在篩選條件區段中引入新的模式。

## 整合練習 - CDK
<a name="Streams.Lambda.Tutorial2.CDK"></a>

以下範例 CDK 專案編組範本逐步解說事件篩選功能。在使用此 CDK 專案之前，必須先[安裝必要條件](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html)，包括[執行準備指令碼](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html)。

**建立 CDK 專案**

首先，`cdk init`在空目錄中叫用 來建立新的 AWS CDK 專案。

```
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
```

`cdk init` 命令使用專案資料夾的名稱來命名專案的各種元素，包括類別、子資料夾和檔案。資料夾名稱中的任何連字號都會轉換為底線。否則，該名稱應遵循 Python 識別符的形式。例如，不應以數字開頭或包含空格。

若要使用新專案，請啟用其虛擬環境。如此可將專案的相依性安裝在本機專案資料夾中，而不是全域安裝。

```
source .venv/bin/activate
python -m pip install -r requirements.txt
```

**注意**  
您可以將其識別為用來啟用虛擬環境的 Mac/Linux 命令。Python 範本包含一個批次檔案 `source.bat`，允許在 Windows 上使用相同的命令。傳統的 Windows 命令 `.venv\Scripts\activate.bat` 也同樣適用。如果您使用 AWS CDK Toolkit v1.70.0 或更早版本初始化 AWS CDK 專案，您的虛擬環境位於 `.env`目錄中，而不是 `.venv`。

**基本基礎架構**

在您偏好的文字編輯器中開啟檔案 `./ddb_filters/ddb_filters_stack.py`。此檔案會在您建立 AWS CDK 專案時自動產生。

接下來，新增函數 `_create_ddb_table` 和 `_set_ddb_trigger_function`。這些函數將會建立佈建模式或隨需模式的 DynamoDB 資料表，其中含有分割區索引鍵 PK 和排序索引鍵 SK，且依預設會啟用 Amazon DynamoDB Streams 以顯示新舊映像。

Lambda 函式會儲存在資料夾 `lambda` 內的檔案 `app.py` 中。此檔案會在稍後建立。它會包含環境變數 `APP_TABLE_NAME` (將成為此堆疊所建立 Amazon DynamoDB 資料表的名稱)。在相同的函數中，我們將對 Lambda 函式授予串流讀取權限。最後，它會訂閱 DynamoDB Streams 做為 Lambda 函式的事件來源。

在 `__init__` 方法中的檔案尾端，您將會呼叫相應的建構以在堆疊中對其初始化。對於需要其他元件和服務的較大專案，最好在基本堆疊之外定義這些建構。

```
import os
import json

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_dynamodb as dynamodb,
)
from constructs import Construct


class DdbFiltersStack(Stack):

    def _create_ddb_table(self):
        dynamodb_table = dynamodb.Table(
            self,
            "AppTable",
            partition_key=dynamodb.Attribute(
                name="PK", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
        return dynamodb_table

    def _set_ddb_trigger_function(self, ddb_table):
        events_lambda = _lambda.Function(
            self,
            "LambdaHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="app.handler",
            environment={
                "APP_TABLE_NAME": ddb_table.table_name,
            },
        )

        ddb_table.grant_stream_read(events_lambda)

        event_subscription = _lambda.CfnEventSourceMapping(
            scope=self,
            id="companyInsertsOnlyEventSourceMapping",
            function_name=events_lambda.function_name,
            event_source_arn=ddb_table.table_stream_arn,
            maximum_batching_window_in_seconds=1,
            starting_position="LATEST",
            batch_size=1,
        )

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ddb_table = self._create_ddb_table()
        self._set_ddb_trigger_function(ddb_table)
```

現在，我們將建立一個非常簡單的 Lambda 函式，以將日誌列印至 Amazon CloudWatch。若要執行此操作，請建立名為 `lambda` 的新資料夾。

```
mkdir lambda
touch app.py
```

使用您偏好的文字編輯器，將下列內容新增至 `app.py` 檔案：

```
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info('Received Event: %s', event)
    for rec in event['Records']:
        LOGGER.info('Record: %s', rec)
```

確定您位於 `/ddb_filters/` 資料夾中，並輸入下列命令以建立範例應用程式：

```
cdk deploy
```

在特定時間點，系統會要求您確認是否要部署解決方案。輸入 `Y` 以接受變更。

```
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

Do you wish to deploy these changes (y/n)? y

...

✨  Deployment time: 67.73s

Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
```

部署變更後，請開啟 AWS 主控台，並將一個項目新增至資料表。

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

此時，CloudWatch 日誌應已包含此項目中的所有資訊。

**篩選條件範例**
+ **僅限符合指定州的產品**

開啟檔案 `ddb_filters/ddb_filters/ddb_filters_stack.py` 加以修改，使其納入符合所有等於 "FL" 之產品的篩選條件。這可以在第 45 行中 `event_subscription` 的正下方進行修改。

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```
+ **僅限以 PK 和 SK 中部分值開頭的項目**

修改 Python 指令碼以納入下列條件：

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
        ]
    },
```
+ **以位於 PK 和 SK 中的部分值開頭或來自特定州。**

修改 python 指令碼以納入下列條件：

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```

請注意，新增 OR 條件的方法是在篩選條件陣列中新增更多元素。

**清除**

請在工作目錄的底部找到篩選條件堆疊，然後執行 `cdk destroy`。系統會要求您確認刪除此資源：

```
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y
```

# 搭配 Lambda 使用 DynamoDB Streams 的最佳實務
<a name="Streams.Lambda.BestPracticesWithDynamoDB"></a>

 AWS Lambda 函數會在*容器中*執行，這是與其他函數隔離的執行環境。當您第一次執行函數時， 會 AWS Lambda 建立新的容器，並開始執行函數的程式碼。

Lambda 函式有一個*處理常式*，每次調用只執行一次。此處理常式包含函數的主要商業邏輯。例如，[步驟 4：建立並測試 Lambda 函式](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) 中顯示的 Lambda 函式有一個可處理 DynamoDB Streams 紀錄的處理常式。

您也可以在建立容器之後，但在第一次 AWS Lambda 執行處理常式之前，提供僅執行一次的初始化程式碼。[步驟 4：建立並測試 Lambda 函式](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) 中顯示的 Lambda 函式有初始程式碼，可匯入適用於 Node.js 中 JavaScript 的開發套件，並建立 Amazon SNS 的用戶端。這些物件應該只在處理常式外部定義一次。

函數執行後， AWS Lambda 可能會選擇重複使用容器來後續叫用函數。在這種情況下，您的函數處理常式或許能夠重複使用您在初始化程式碼中定義的資源。(您完全無法控制 AWS Lambda 保留容器多長時間，或容器是否得以重複使用。)

對於使用 的 DynamoDB 觸發程序 AWS Lambda，我們建議下列事項：
+ AWS 服務用戶端應該在初始化程式碼中執行個體化，而不是在處理常式中。這可讓 AWS Lambda 在容器生命週期內重複使用現有的連線。
+ 一般而言，您不需要明確管理連線或實作連線集區， 會為您 AWS Lambda 管理。

DynamoDB 串流的 Lambda 取用者無法保證僅交付一次，並可能偶爾導致出現重複項目。請確定您的 Lambda 函式程式碼具有冪等性，以防止因重複處理而產生意外問題。

如需詳細資訊，請參閱《 *AWS Lambda 開發人員指南*》中的[使用 AWS Lambda 函數的最佳實務](https://docs.aws.amazon.com/lambda/latest/dg/best-practices.html)。

# DynamoDB Streams 和 Apache Flink
<a name="StreamsApacheFlink.xml"></a>

您可以使用 Apache Flink 取用 Amazon DynamoDB Streams 記錄。透過 [Amazon Managed Service for Apache Flink](https://aws.amazon.com/managed-service-apache-flink/)，您可以使用 Apache Flink 即時轉換並分析串流資料。Apache Flink 是開放原始碼串流處理架構，用於處理即時資料。Apache Flink 專用的 Amazon DynamoDB Streams 連接器可簡化 Apache Flink 工作負載的建置和管理，方便您將應用程式與其他 AWS 服務整合。

Amazon Managed Service for Apache Flink 可協助您快速建置端對端串流處理應用程式，以進行日誌分析、點擊流分析、物聯網 (IoT)、廣告技術、遊戲等。四個最常見的使用案例是串流擷取、轉換和載入 (ETL)、事件驅動型應用程式、回應式即時分析，以及資料串流的互動式查詢。如需從 Amazon DynamoDB Streams 寫入 Apache Flink 的詳細資訊，請參閱 [Amazon DynamoDB Streams 連接器](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamodb/)。