

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 在 Amazon Keyspaces 中使用變更資料擷取 (CDC) 串流
<a name="cdc"></a>

Amazon Keyspaces 變更資料擷取 (CDC) 會以近乎即時的方式記錄來自 Amazon Keyspaces 資料表的資料列層級變更事件。

Amazon Keyspaces CDC 可啟用事件驅動的使用案例，例如工業 IoT 和詐騙偵測，以及資料處理使用案例，例如全文搜尋和資料封存。Amazon Keyspaces CDC 在串流中擷取的變更事件可供執行關鍵業務功能的下游應用程式取用，例如資料分析、文字搜尋、ML 訓練/推論，以及封存的持續資料備份。例如，您可以將串流資料傳輸到 Amazon OpenSearch Service、Amazon Redshift 和 Amazon S3 等 AWS 分析和儲存服務，以進行進一步處理。

Amazon Keyspaces CDC 為資料表提供按時間排序和已取消重複的變更記錄，資料輸送量的自動擴展和保留時間最多可達 24 小時。

Amazon Keyspaces CDC 串流完全無伺服器，您不需要管理資料基礎設施來擷取變更事件。此外，Amazon Keyspaces CDC 不會使用運算或儲存體的任何資料表容量。如需詳細資訊，請參閱[變更資料擷取 (CDC) 串流如何在 Amazon Keyspaces 中運作](cdc_how-it-works.md)。

您可以使用 Amazon Keyspaces Streams API 來建置使用 Amazon Keyspaces CDC 串流的應用程式，並根據內容採取行動。如需可用的端點，請參閱 [如何在 Amazon Keyspaces 中存取 CDC 串流端點](CDC_access-endpoints.md)。

如需串流 API 中可用於 Amazon Keyspaces 的所有操作的完整清單，請參閱 [https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html)。

**Topics**
+ [變更資料擷取 (CDC) 串流如何在 Amazon Keyspaces 中運作](cdc_how-it-works.md)
+ [如何在 Amazon Keyspaces 中使用變更資料擷取 (CDC) 串流](cdc_how-to-use.md)

# 變更資料擷取 (CDC) 串流如何在 Amazon Keyspaces 中運作
<a name="cdc_how-it-works"></a>

本節概述變更資料擷取 (CDC) 串流如何在 Amazon Keyspaces 中運作。

Amazon Keyspaces 變更資料擷取 (CDC) 會在 Amazon Keyspaces 資料表中記錄一系列列層級修改，並將此資訊存放在稱為*串流*的日誌中長達 24 小時。每個資料列層級修改都會產生新的 CDC 記錄，其中包含主索引鍵資料欄資訊，以及包含所有資料欄的資料列的「之前」和「之後」狀態。應用程式可以近乎即時地存取串流並檢視變動。

當您在資料表上啟用 CDC 時，Amazon Keyspaces 會建立新的 CDC 串流，並開始擷取資料表中每個修改的相關資訊。CDC 串流具有下列格式的 Amazon Resource Name (ARN)：

```
arn:${Partition}:cassandra:{Region}:${Account}:/keyspace/${keyspaceName}/table/${tableName}/stream/${streamLabel}
```

您可以在第一次啟用 CDC 串流時，選取 CDC 串流為每個記錄收集的資訊類型或*檢視類型*。您之後無法變更串流的檢視類型。Amazon Keyspaces 支援下列檢視類型：
+ `NEW_AND_OLD_IMAGES` – 擷取變動前後的資料列版本。這是預設值。
+ `NEW_IMAGE` – 擷取變動之後的資料列版本。
+ `OLD_IMAGE` – 擷取變動前的資料列版本。
+ `KEYS_ONLY` – 擷取已變動資料列的分割區和叢集索引鍵。

每個 CDC 串流都包含 記錄。每個記錄代表 Amazon Keyspaces 資料表中的單一資料列修改。記錄會以邏輯方式組織成稱為*碎片的*群組。這些群組會依主索引鍵的範圍 （分割區索引鍵的組合、叢集索引鍵範圍） 進行邏輯組織，並且是 Amazon Keyspaces 的內部建構。每個碎片都充當多個記錄的容器，並包含透過這些記錄存取和迭代所需的資訊。

![\[Amazon Keyspaces CDC 串流包含碎片，代表資料列變動集合的 CDC 記錄。\]](http://docs.aws.amazon.com/zh_tw/keyspaces/latest/devguide/images/keyspaces_cdc.png)


每個 CDC 記錄都會獲指派序號，以反映記錄在碎片中發佈的順序。序列號保證在每個碎片中增加且是唯一的。

Amazon Keyspaces 會自動建立和刪除碎片。根據流量負載，Amazon Keyspaces 也可以隨著時間分割或合併碎片。例如，Amazon Keyspaces 可以將一個碎片分割成多個新碎片，或將碎片合併成新的單一碎片。Amazon Keyspaces APIs 會發佈碎片和 CDC 串流資訊，以允許取用應用程式存取碎片的整個譜系圖，以正確順序處理記錄。

Amazon Keyspaces CDC 是以下列原則為基礎，您可以在建置應用程式時倚賴這些原則：
+ 每個資料列層級變動記錄只會在 CDC 串流中出現一次。
+ 當您依歷程順序使用碎片時，每個資料列層級變動記錄會以與主索引鍵上實際變動順序相同的順序顯示。

**Topics**
+ [資料保留](#CDC_how-it-works-data-retention)
+ [TTL 資料過期](#CDC_how-it-works-ttl)
+ [批次操作](#CDC_how-it-works-batch-operations)
+ [靜態資料欄](#CDC_how-it-works-static)
+ [靜態加密](#CDC_how-it-works-encryption)
+ [多區域複製](#CDC_how-it-works-mrr)
+ [與 AWS 服務整合](#howitworks_integration)

## Amazon Keyspaces 中 CDC 串流的資料保留運作方式
<a name="CDC_how-it-works-data-retention"></a>

Amazon Keyspaces 會保留 CDC 串流中的記錄 24 小時。您無法變更保留期間。如果您在資料表上停用 CDC，串流中的資料會繼續讀取 24 小時。在此期間之後，資料會過期，並自動刪除記錄。

## 存留時間 (TTL) 資料過期如何與 Amazon Keyspaces 中的 CDC 串流搭配使用
<a name="CDC_how-it-works-ttl"></a>

Amazon Keyspaces 會顯示資料欄/儲存格層級的過期時間，以及 CDC 變更記錄中稱為 `expirationTime`的中繼資料欄位中的資料列層級。當 Amazon Keyspaces TTL 偵測到儲存格過期時，CDC 會建立新的變更記錄，顯示 TTL 為變更的來源。如需 TTL 的詳細資訊，請參閱 [使用 Amazon Keyspaces 的存留時間 (TTL) 過期資料 （適用於 Apache Cassandra)](TTL.md)。

## Amazon Keyspaces 中 CDC 串流的批次操作運作方式
<a name="CDC_how-it-works-batch-operations"></a>

批次操作在內部分為個別的資料列層級修改。Amazon Keyspaces 會在資料列層級保留 CDC 串流中的所有記錄，即使修改發生在批次操作中。Amazon Keyspaces 會以與資料列層級或主索引鍵上發生的變動順序相同的順序，維護 CDC 串流內的記錄順序。

## 靜態資料欄如何在 Amazon Keyspaces 的 CDC 串流中運作
<a name="CDC_how-it-works-static"></a>

靜態資料欄值會在 Cassandra 中分割區中的所有資料列之間共用。由於此行為，Amazon Keyspaces 會將靜態資料欄的任何更新擷取為 CDC 串流中的個別記錄。下列範例摘要說明靜態資料欄變動的行為：
+ 當僅更新靜態資料欄時，CDC 串流包含靜態資料欄的資料列修改作為資料列中的唯一資料欄。
+ 更新資料列而不變更靜態資料欄時，CDC 串流會包含資料列修改，其中包含靜態資料欄以外的所有資料欄。
+ 當資料列與靜態資料欄一起更新時，CDC 串流包含兩個不同的資料列修改，一個用於靜態資料欄，另一個用於其餘資料列。

## 靜態加密對 Amazon Keyspaces 中 CDC 串流的運作方式
<a name="CDC_how-it-works-encryption"></a>

若要加密 CDC 排序日誌中的靜態資料，Amazon Keyspaces 會使用已用於資料表的相同加密金鑰。如需靜態加密的詳細資訊，請參閱 [Amazon Keyspaces 中的靜態加密](EncryptionAtRest.md)。

## 多區域複寫如何適用於 Amazon Keyspaces 中的 CDC 串流
<a name="CDC_how-it-works-mrr"></a>

您可以使用 `update-table` API 或 `ALTER TABLE` CQL 命令，為多區域資料表的個別複本啟用和停用 CDC 串流。由於非同步複寫和衝突解決，多區域資料表的 CDC 串流跨區域不一致 AWS 區域。因此，Amazon Keyspaces 在串流中擷取的記錄可能會在不同的區域中以不同的順序顯示。

如需多區域複寫的詳細資訊，請參閱 [Amazon Keyspaces 的多區域複寫 （適用於 Apache Cassandra)](multiRegion-replication.md)。

## CDC 串流和與 AWS 服務的整合
<a name="howitworks_integration"></a>

### 如何在 Amazon Keyspaces 中使用 CDC 串流的 VPC 端點
<a name="CDC_how-it-works-vpc"></a>

您可以使用 VPC 端點來存取 Amazon Keyspaces CDC 串流。如需如何建立和存取串流 VPC 端點的詳細資訊，請參閱 [搭配介面 VPC 端點使用 Amazon Keyspaces CDC 串流](vpc-endpoints-streams.md)。

### 使用 CloudWatch 監控 如何在 Amazon Keyspaces 中用於 CDC 串流
<a name="CDC_how-it-works-monitoring"></a>

您可以使用 Amazon CloudWatch 來監控對 Amazon Keyspaces CDC 端點發出的 API 呼叫。如需可用指標的詳細資訊，請參閱 [Amazon Keyspaces 變更資料擷取 (CDC) 的指標](metrics-dimensions.md#keyspaces-cdc-metrics)。

### 使用 CloudTrail 記錄日誌如何在 Amazon Keyspaces 中用於 CDC 串流
<a name="CDC_how-it-works-logging"></a>

Amazon Keyspaces CDC 已與 服務整合 AWS CloudTrail，此服務提供由 Amazon Keyspaces AWS 中的使用者、角色或服務所採取之動作的記錄。CloudTrail 會將 Amazon Keyspaces 的 Data Definition Language (DDL) API 呼叫和 Data Manipulation Language (DML) API 呼叫擷取為事件。擷取的呼叫包括來自 Amazon Keyspaces 主控台的呼叫，以及對 Amazon Keyspaces API 操作的程式設計呼叫。

如需 CloudTrail 擷取之 CDC 事件的詳細資訊，請參閱 [使用 記錄 Amazon Keyspaces API 呼叫 AWS CloudTrail](logging-using-cloudtrail.md)。

### 標記在 Amazon Keyspaces 中如何適用於 CDC 串流
<a name="CDC_how-it-works-tagging"></a>

Amazon Keyspaces CDC 串流是可標記的資源。您可以在使用 CQL、 AWS SDK 或 以程式設計方式建立資料表時標記串流 AWS CLI。您也可以標記現有的串流、刪除標籤或檢視串流的標籤。如需詳細資訊，請參閱[在 Amazon Keyspaces 中標記金鑰空間、資料表和串流](Tagging.Operations.md)。

# 如何在 Amazon Keyspaces 中使用變更資料擷取 (CDC) 串流
<a name="cdc_how-to-use"></a>

**Topics**
+ [設定 許可](configure-cdc-permissions.md)
+ [存取 CDC 串流端點](CDC_access-endpoints.md)
+ [為新資料表啟用 CDC 串流](keyspaces-enable-cdc-new-table.md)
+ [為現有資料表啟用 CDC 串流](keyspaces-enable-cdc-alter-table.md)
+ [停用 CDC 串流](keyspaces-delete-cdc.md)
+ [檢視 CDC 串流](keyspaces-view-cdc.md)
+ [存取 CDC 串流](keyspaces-records-cdc.md)
+ [使用 KCL 處理串流](cdc_how-to-use-kcl.md)

# 設定許可以在 Amazon Keyspaces 中使用 CDC 串流
<a name="configure-cdc-permissions"></a>

若要啟用 CDC 串流，委託人，例如 IAM 使用者或角色，需要下列許可。

如需 的詳細資訊 AWS Identity and Access Management，請參閱 [AWS Identity and Access Management 適用於 Amazon Keyspaces](security-iam.md)。

## 為資料表啟用 CDC 串流的許可
<a name="cdc-permissions-enable"></a>

若要啟用 Amazon Keyspaces 資料表的 CDC 串流，主體首先需要建立或修改資料表的許可，然後需要第二個許可來建立服務連結角色 [AWSServiceRoleForAmazonKeyspacesCDC](using-service-linked-roles-CDC-streams.md#service-linked-role-permissions-CDC-streams)。Amazon Keyspaces 會使用服務連結角色，代表您將 CloudWatch 指標發佈至您的帳戶

下列 IAM 政策為範例。

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement":[
        {
            "Effect":"Allow",
            "Action":[
                "cassandra:Create",
                "cassandra:CreateMultiRegionResource",
                "cassandra:Alter",
                "cassandra:AlterMultiRegionResource"
            ],
            "Resource":[
                "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/*",
                "arn:aws:cassandra:us-east-1:111122223333:/keyspace/system*"
            ]
        },
        {
            "Sid": "KeyspacesCDCServiceLinkedRole",
            "Effect": "Allow",
            "Action": "iam:CreateServiceLinkedRole",
            "Resource": "arn:aws:iam::*:role/aws-service-role/cassandra-streams.amazonaws.com/AWSServiceRoleForAmazonKeyspacesCDC",
            "Condition": {
              "StringLike": {
                "iam:AWSServiceName": "cassandra-streams.amazonaws.com"
              }
            }
        }
    ]
}
```

若要停用串流，只需要`ALTER TABLE`許可。

## 檢視 CDC 串流的許可
<a name="cdc-permissions-view"></a>

若要檢視或列出 CDC 串流，主體需要系統金鑰空間的讀取許可。如需詳細資訊，請參閱[`system_schema_mcs`](working-with-keyspaces.md#keyspace_system_schema_mcs)。

下列 IAM 政策為範例。

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":"cassandra:Select",
         "Resource":[
             "arn:aws:cassandra:us-east-1:111122223333:/keyspace/system*"
         ]
      }
   ]
}
```

若要使用 或 Amazon Keyspaces API 檢視 AWS CLI 或列出 CDC 串流，委託人需要動作 `cassandra:ListStreams`和 的額外許可`cassandra:GetStream`。

下列 IAM 政策為範例。

```
{
  "Version": "2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "cassandra:Select",
        "cassandra:ListStreams",
        "cassandra:GetStream"
      ],
      "Resource": "*"
    }
  ]
}
```

## 讀取 CDC 串流的許可
<a name="cdc-permissions-read"></a>

若要讀取 CDC 串流，委託人需要下列許可。

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "cassandra:GetStream",
            "cassandra:GetShardIterator",
            "cassandra:GetRecords"
         ],
         "Resource":[
            "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label"
         ]
      }
   ]
}
```

## 使用 Kinesis Client Library (KCL) 處理 Amazon Keyspaces CDC 串流的許可
<a name="cdc-permissions-kcl"></a>

若要使用 KCL 處理 Amazon Keyspaces CDC 串流，IAM 主體需要下列許可。
+ `Amazon Keyspaces` – 對指定 Amazon Keyspaces CDC 串流的唯讀存取。
+ `DynamoDB` – 建立`shard lease`資料表、讀取和寫入資料表存取權，以及視需要讀取索引以進行 KCL 串流處理的許可。
+ `CloudWatch` – 將 Amazon Keyspaces CDC 串流處理中的指標資料發佈到 CloudWatch 帳戶中 KCL 用戶端應用程式命名空間的許可。如需監控的詳細資訊，請參閱[使用 Amazon CloudWatch 監控 Kinesis 用戶端程式庫](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html)。

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "cassandra:GetStream",
            "cassandra:GetShardIterator",
            "cassandra:GetRecords"
         ],
         "Resource":[
            "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:CreateTable",
            "dynamodb:DescribeTable",
            "dynamodb:UpdateTable",
            "dynamodb:GetItem",
            "dynamodb:UpdateItem",
            "dynamodb:PutItem",
            "dynamodb:DeleteItem",
            "dynamodb:Scan"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:CreateTable",
            "dynamodb:DescribeTable",
            "dynamodb:GetItem",
            "dynamodb:UpdateItem",
            "dynamodb:PutItem",
            "dynamodb:DeleteItem",
            "dynamodb:Scan"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME-WorkerMetricStats",
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME-CoordinatorState"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:Query"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME/index/*"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "cloudwatch:PutMetricData"
         ],
         "Resource":"*"
      }
   ]
}
```

# 如何在 Amazon Keyspaces 中存取 CDC 串流端點
<a name="CDC_access-endpoints"></a>

Amazon Keyspaces 會在提供 Amazon Keyspaces 的每個 AWS 區域 中維護金鑰空間/資料表和 CDC 串流的個別[端點](programmatic.endpoints.md#global_endpoints)。若要存取 CDC 串流，請選取資料表的區域，並在端點名稱`cassandra-streams`中將`cassandra`字首取代為 ，如下列範例所示：

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/keyspaces/latest/devguide/CDC_access-endpoints.html)

下表包含適用於 的可用公有端點的完整清單 Amazon Keyspaces change data capture streams。 同時 Amazon Keyspaces CDC streams 支援 IPv4 和 IPv6。所有公有端點，例如 `cassandra-streams.us-east-1.api.aws`，都是可針對 IPv4 和 IPv6 設定的雙堆疊端點。

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/keyspaces/latest/devguide/CDC_access-endpoints.html)

# 在 Amazon Keyspaces 中建立新資料表時啟用 CDC 串流
<a name="keyspaces-enable-cdc-new-table"></a>

若要在建立資料表時啟用 CDC 串流，您可以在 CQL 或 `create-table`命令中使用 `CREATE TABLE`陳述式與 AWS CLI。

對於資料表中每個變更的資料列，Amazon Keyspaces 可以根據您`cdc_specification`選取的 `view_type`的 擷取下列變更：
+ `NEW_AND_OLD_IMAGES` – 變更前後的兩個資料列版本。這是預設值。
+ `NEW_IMAGE` – 變更後的列版本。
+ `OLD_IMAGE` – 變更前的資料列版本。
+ `KEYS_ONLY` – 已變更之資料列的分割區和叢集索引鍵。

如需如何標記串流的詳細資訊，請參閱 [建立資料表時，將標籤新增至新串流](Tagging.Operations.new.table.stream.md)。

**注意**  
Amazon Keyspaces CDC 需要存在服務連結角色 (`AWSServiceRoleForAmazonKeyspacesCDC`)，以代表您將指標資料從 Amazon Keyspaces CDC 串流發佈到 CloudWatch 帳戶中`"cloudwatch:namespace": "AWS/Cassandra"`的 。系統會自動建立此角色。如需詳細資訊，請參閱[使用 Amazon Keyspaces CDC 串流的角色](using-service-linked-roles-CDC-streams.md)。

------
#### [ Cassandra Query Language (CQL) ]

**使用 CQL 建立資料表時啟用 CDC 串流**

1. 

   ```
   CREATE TABLE mykeyspace.mytable (a text, b text, PRIMARY KEY(a)) 
   WITH CUSTOM_PROPERTIES={'cdc_specification': {'view_type': 'NEW_IMAGE'}} AND CDC = TRUE;
   ```

1. 若要確認串流設定，您可以使用下列陳述式。

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   該陳述式的輸出看起來應該與此類似。

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';keyspace_name | table_name | cdc  | custom_properties
   ---------------+------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
       mykeyspace |   mytable  | True | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741383893782', 'throughput_mode': 'PAY_PER_REQUEST'}, 'cdc_specification': {'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2025-03-07T21:44:53.783', 'status': 'ENABLED', 'view_type': 'NEW_IMAGE'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}>
   ```

------
#### [ CLI ]

**當您使用 建立資料表時，啟用 CDC 串流 AWS CLI**

1. 若要建立串流，您可以使用下列語法。

   ```
   aws keyspaces create-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --schema-definition 'allColumns=[{name=a,type=text},{name=b,type=text}],partitionKeys=[{name=a}]' \
   --cdc-specification status=ENABLED,viewType=NEW_IMAGE
   ```

1. 該命令的輸出會顯示標準`create-table`回應，看起來與此範例類似。

   ```
   { "resourceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable" }
   ```

------

# 為 Amazon Keyspaces 中的現有資料表啟用 CDC 串流
<a name="keyspaces-enable-cdc-alter-table"></a>

若要為現有資料表啟用 CDC 串流，您可以在 CQL 中使用 `ALTER TABLE`陳述式、將 `update-table`命令與 搭配使用 AWS CLI，也可以使用 主控台。

對於資料表中每個變更的資料列，Amazon Keyspaces 可以根據您`cdc_specification`選取的 `view_type`的 擷取下列變更：
+ `NEW_AND_OLD_IMAGES` – 變更前後的兩個資料列版本。這是預設值。
+ `NEW_IMAGE` – 變更後的列版本。
+ `OLD_IMAGE` – 變更前的資料列版本。
+ `KEYS_ONLY` – 已變更之資料列的分割區和叢集索引鍵。

如需如何標記串流的詳細資訊，請參閱 [將新標籤新增至串流](Tagging.Operations.existing.stream.md)。

**注意**  
Amazon Keyspaces CDC 需要存在服務連結角色 (`AWSServiceRoleForAmazonKeyspacesCDC`)，以代表您將指標資料從 Amazon Keyspaces CDC 串流發佈到 CloudWatch 帳戶中`"cloudwatch:namespace": "AWS/Cassandra"`的 。系統會自動建立此角色。如需詳細資訊，請參閱[使用 Amazon Keyspaces CDC 串流的角色](using-service-linked-roles-CDC-streams.md)。

------
#### [ Cassandra Query Language (CQL) ]

**使用 CQL 啟用串流 (CDC 串流）**

您可以使用 `ALTER TABLE`為現有資料表啟用串流。

1. 下列範例會建立僅擷取變更資料列之分割區和叢集索引鍵變更的串流。

   ```
   ALTER TABLE mykeyspace.mytable
   WITH cdc = TRUE
   AND CUSTOM_PROPERTIES={'cdc_specification': {'view_type': 'KEYS_ONLY'}};
   ```

1. 若要驗證串流設定，您可以使用下列陳述式。

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   陳述式的輸出看起來與此類似。

   ```
    keyspace_name | table_name | cdc  | custom_properties
   ---------------+------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
       mykeyspace |    mytable | True | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741385897045', 'throughput_mode': 'PAY_PER_REQUEST'}, 'cdc_specification': {'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2025-03-07T22:20:10.454', 'status': 'ENABLED', 'view_type': 'KEYS_ONLY'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}
   ```

------
#### [ CLI ]

**使用 建立 CDC 串流 AWS CLI**

1. 若要為現有資料表建立串流，您可以使用下列語法。

   ```
   aws keyspaces update-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --cdc-specification status=ENABLED,viewType=NEW_AND_OLD_IMAGES
   ```

1. 該命令的輸出會顯示標準`create-table`回應，看起來與此範例類似。

   ```
   { "resourceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable" }
   ```

------
#### [ Console ]

**使用 Amazon Keyspaces 主控台啟用 CDC 串流**

1. 登入 AWS 管理主控台，並在 https：//[https://console.aws.amazon.com/keyspaces/home](https://console.aws.amazon.com/keyspaces/home) 開啟 Amazon Keyspaces 主控台。

1. 在導覽窗格中，選擇**資料表**，然後從清單中選擇資料表。

1. 選擇**串流**索引標籤。

1. 選擇**編輯**以啟用串流。

1. 選取**開啟串流**。

1. 選擇串流的**檢視類型**。以下是可用的選項。請注意，您無法在建立串流之後變更串流的檢視類型。
   + **新舊映像** – Amazon Keyspaces 會擷取變更前後的兩個資料列版本。這是預設值。
   + **新映像** – Amazon Keyspaces 只會擷取變更後的資料列版本。
   + **舊映像** – Amazon Keyspaces 只會擷取變更前的資料列版本。
   + **僅限主索引鍵** – Amazon Keyspaces 只會擷取已變更資料列的分割區和叢集索引鍵資料欄。

1. 若要完成，請選擇**儲存變更**。

------

# 在 Amazon Keyspaces 中停用 CDC 串流
<a name="keyspaces-delete-cdc"></a>

若要停用金鑰空間中的 CDC 串流，您可以在 CQL、 `update-table`命令與 AWS CLI或 主控台中使用 `ALTER TABLE`陳述式。

------
#### [ Cassandra Query Language (CQL) ]

**使用 CQL 停用串流 (CDC 串流）**

1. 若要停用串流，您可以使用下列陳述式。

   ```
   ALTER TABLE mykeyspace.mytable
   WITH cdc = FALSE;
   ```

1. 若要確認串流已停用，您可以使用下列陳述式。

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   該陳述式的輸出看起來與此類似。

   ```
    keyspace_name | table_name | cdc   | custom_properties
   ---------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      mykeyspace  |   mytable  | False | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741385668642', 'throughput_mode': 'PAY_PER_REQUEST'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}
   ```

------
#### [ CLI ]

**使用 停用串流 (CDC 串流） AWS CLI**

1. 若要停用串流，您可以使用下列命令。

   ```
   aws keyspaces update-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --cdc-specification status=DISABLED
   ```

1. 命令的輸出看起來與此範例類似。

   ```
   {
       "keyspaceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/",
       "streamName": "my_stream"
   }
   ```

------
#### [ Console ]

**使用 Amazon Keyspaces 主控台停用串流 (CDC 串流）**

1. 登入 AWS 管理主控台，並在 https：//[https://console.aws.amazon.com/keyspaces/home](https://console.aws.amazon.com/keyspaces/home) 開啟 Amazon Keyspaces 主控台。

1. 在導覽窗格中，選擇**資料表**，然後從清單中選擇資料表。

1. 選擇**串流**索引標籤。

1. 選擇**編輯**。

1. 取消選取**開啟串流**。

1. 選擇**儲存變更**以停用串流。

------

# 在 Amazon Keyspaces 中檢視 CDC 串流
<a name="keyspaces-view-cdc"></a>

若要檢視或列出金鑰空間中的所有串流，您可以使用 CQL `system_schema_mcs.streams`中的陳述式查詢系統金鑰空間中的資料表，或使用 `get-stream`和 `list-stream`命令搭配 AWS CLI、 或 主控台。

如要了解必要的許可，請參閱 [設定許可以在 Amazon Keyspaces 中使用 CDC 串流](configure-cdc-permissions.md)。

------
#### [ Cassandra Query Language (CQL) ]

**使用 CQL 檢視 CDC 串流**
+ 若要監控資料表的 CDC 狀態，您可以使用下列陳述式。

  ```
  SELECT custom_properties
  FROM system_schema_mcs.tables 
  WHERE keyspace_name='my_keyspace' and table_name='my_table';
  ```

  命令的輸出看起來與此類似。

  ```
  ...
  custom_properties
  ----------------------------------------------------------------------------------
  {'cdc_specification':{'status': 'Enabled', 'view_type': 'NEW_IMAGE', 'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label''}}
  ...
  ```

------
#### [ CLI ]

**使用 檢視 CDC 串流 AWS CLI**

1. 此範例說明如何查看資料表的串流資訊。

   ```
   aws keyspaces get-table \
   --keyspace-name 'my_keyspace' \
   --table-name 'my_table'
   ```

   命令的輸出如下所示。

   ```
   {
       "keyspaceName": "my_keyspace",
       "tableName": "my_table",
       ... Other fields ...,
       "latestStreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label",
       "cdcSpecification": {
           "status": "ENABLED",
           "viewType": "NEW_AND_OLD_IMAGES"    
       }
   }
   ```

1. 您可以在指定的 中列出帳戶中的所有串流 AWS 區域。下列命令是此範例。

   ```
   aws keyspacesstreams list-streams --region us-east-1
   ```

   命令的輸出可能看起來像這樣。

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t1",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"Create a keyspace with the name catalog. Note
                                   that streams are not supported in multi-Region keyspaces.
               "TableName": "t2",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_2/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_3"
               "TableName": "t1",
           }
       ]
   }
   ```

1. 您也可以使用下列參數列出指定金鑰空間的 CDC 串流。

   ```
   aws keyspacesstreams list-streams --keyspace-name ks_1 --region us-east-1
   ```

   命令的輸出看起來與此類似。

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t1",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t2",
           }
       ]
   }
   ```

1. 您也可以使用下列參數列出指定資料表的 CDC 串流。

   ```
   aws keyspacesstreams list-streams --keyspace-name ks_1 --table-name t2 --region us-east-1
   ```

   命令的輸出看起來與此類似。

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t2",
           }
       ]
   }
   ```

------
#### [ Console ]

**在 Amazon Keyspaces 主控台中檢視 CDC 串流**

1. 登入 AWS 管理主控台，並在 https：//[https://console.aws.amazon.com/keyspaces/home](https://console.aws.amazon.com/keyspaces/home) 開啟 Amazon Keyspaces 主控台。

1. 在導覽窗格中，選擇**資料表**，然後從清單中選擇資料表。

1. 選擇**串流**索引標籤以檢閱串流詳細資訊。

------

# 在 Amazon Keyspaces 中存取 CDC 串流中的記錄
<a name="keyspaces-records-cdc"></a>

若要存取串流中的記錄，請使用 [Amazon Keyspaces Streams API](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html)。下一節包含如何使用 存取記錄的範例 AWS CLI。

如要了解必要的許可，請參閱 [設定許可以在 Amazon Keyspaces 中使用 CDC 串流](configure-cdc-permissions.md)。

**使用 存取串流中的記錄 AWS CLI**

1. 您可以使用 Amazon Keyspaces Streams API 來存取串流的變更記錄。如需詳細資訊，請參閱 [https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html)。若要擷取串流中的碎片，您可以使用 `get-stream` API，如下列範例所示。

   ```
   aws keyspacesstreams get-stream \
   --stream-arn 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/STREAM_LABEL'
   ```

   以下為輸出範例。

   ```
   {
      "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2023-05-11T21:21:33.291",
      "StreamStatus": "ENABLED",
      "StreamViewType": "NEW_AND_OLD_IMAGES",
      "CreationRequestDateTime": "<CREATION_TIME>",
      "KeyspaceName": "mykeyspace",
      "TableName": "mytable",
      "StreamLabel": "2023-05-11T21:21:33.291",
       "Shards": [
           {
               "SequenceNumberRange": {
                   "EndingSequenceNumber": "<END_SEQUENCE_NUMBER>",
                   "StartingSequenceNumber": "<START_SEQUENCE_NUMBER>"
               },
               "ShardId": "<SHARD_ID>"
           },
       ]
   }
   ```

1. 若要從串流擷取記錄，請先取得迭代器，以提供您存取記錄的起點。若要這樣做，您可以使用上一個步驟中 API 傳回的 CDC 串流中的碎片。若要收集迭代器，您可以使用 `get-shard-iterator` API。在此範例中，您會使用從碎片的最後一個裁剪點或開頭`TRIM_HORIZON`擷取的類型反覆運算器）。

   ```
   aws keyspacesstreams get-shard-iterator \
   --stream-arn 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/STREAM_LABEL' \
   --shard-id 'SHARD_ID' \
   --shard-iterator-type 'TRIM_HORIZON'
   ```

   命令的輸出看起來與下列範例類似。

   ```
   {
       "ShardIterator": "<SHARD_ITERATOR>" 
   }
   ```

1. 若要使用 `get-records` API 擷取 CDC 記錄，您可以使用最後一個步驟中傳回的迭代器。下列命令是此範例。

   ```
   aws keyspacesstreams get-records \
   --shard-iterator 'SHARD_ITERATOR' \
   --limit 100
   ```

# 使用 Kinesis Client Library (KCL) 來處理 Amazon Keyspaces 串流
<a name="cdc_how-to-use-kcl"></a>

本主題說明如何使用 Kinesis Client Library (KCL) 來取用和處理來自 Amazon Keyspaces 變更資料擷取 (CDC) 串流的資料。

使用 Kinesis Client Library (KCL) 提供許多好處，而不是直接使用 Amazon Keyspaces Streams API，例如：
+ 內建碎片歷程追蹤和迭代器處理。
+ 跨工作者的自動負載平衡。
+ 工作者故障的容錯能力和復原能力。
+ 檢查點以追蹤處理進度。
+ 調整串流容量的變更。
+ 簡化處理 CDC 記錄的分散式運算。

下一節概述為什麼和如何使用 Kinesis Client Library (KCL) 來處理串流，並提供使用 KCL 處理 Amazon Keyspaces CDC 串流的範例。

如需定價的相關資訊，請參閱 [Amazon Keyspaces （適用於 Apache Cassandra) 定價](https://aws.amazon.com/keyspaces/pricing)。

## 什麼是 Kinesis Client Library？
<a name="cdc-kcl-what-is"></a>

Kinesis Client Library (KCL) 是獨立的 Java 軟體程式庫，旨在簡化從串流取用和處理資料的程序。KCL 會處理許多與分散式運算相關的複雜任務，讓您專注於在處理串流資料時實作商業邏輯。KCL 會管理各種活動，例如跨多個工作者的負載平衡、回應工作者失敗、檢查點處理過的記錄，以及回應串流中碎片數量的變更。

若要處理 Amazon Keyspaces CDC 串流，您可以使用 KCL 中找到的設計模式來處理串流碎片和串流記錄。KCL 會在低階 Kinesis Data Streams API 上提供有用的抽象，可以簡化程式碼。如需 KCL 的詳細資訊，請參閱《*Amazon Kinesis Data Streams 開發人員指南》中的*[使用 KCL 開發消費者](https://docs.aws.amazon.com/kinesis/latest/dev/develop-kcl-consumers.html)。

 若要使用 KCL 撰寫應用程式，請使用 Amazon Keyspaces Streams Kinesis Adapter。Kinesis Adapter 實作 Kinesis Data Streams 介面，讓您可以使用 KCL 來取用和處理來自 Amazon Keyspaces 串流的記錄。如需如何設定和安裝 Amazon Keyspaces 串流 Kinesis 轉接器的指示，請造訪 [GitHub](https://github.com/aws/keyspaces-streams-kinesis-adapter) 儲存庫。

下圖顯示這些程式庫如何彼此互動。

![\[處理 Amazon Keyspaces CDC 串流記錄時，用戶端應用程式與 Kinesis Data Streams、KCL、Amazon Keyspaces Streams Kinesis Adapter 和 Amazon Keyspaces APIs 之間的互動。\]](http://docs.aws.amazon.com/zh_tw/keyspaces/latest/devguide/images/keyspaces-streams-kinesis-adapter.png)


KCL 經常更新，以納入基礎程式庫的較新版本、安全性改善和錯誤修正。我們建議您使用最新版本的 KCL，以避免已知問題並從所有最新的改進中獲益。若要尋找最新的 KCL 版本，請參閱 [KCL GitHub 儲存庫](https://github.com/awslabs/amazon-kinesis-client)。

## KCL 概念
<a name="cdc-kcl-concepts"></a>

在使用 KCL 實作消費者應用程式之前，您應該了解下列概念：

**KCL 消費者應用程式**  
KCL 取用者應用程式是處理來自 Amazon Keyspaces CDC 串流資料的程式。KCL 可做為取用者應用程式程式碼與 Amazon Keyspaces CDC 串流之間的媒介。

**工作者**  
工作者是 KCL 取用者應用程式的執行單位，可處理來自 Amazon Keyspaces CDC 串流的資料。您的應用程式可以執行分散在多個執行個體的多個工作者。

**記錄處理器**  
記錄處理器是應用程式中的邏輯，可處理 Amazon Keyspaces CDC 串流中碎片的資料。記錄處理器會由工作者針對其管理的每個碎片進行執行個體化。

**租用**  
租用代表碎片的處理責任。工作者使用租用來協調哪個工作者正在處理哪個碎片。KCL 會將租用資料存放在 Amazon DynamoDB 的資料表中。

**檢查點**  
檢查點是碎片中位置的記錄，直到記錄處理器成功處理記錄為止。檢查點可讓您的應用程式在工作者失敗時，從停止的地方繼續處理。

使用 Amazon Keyspaces Kinesis 轉接器，您可以開始針對 KCL 介面進行開發，並將 API 呼叫無縫導向 Amazon Keyspaces 串流端點。如需可用端點的清單，請參閱 [如何在 Amazon Keyspaces 中存取 CDC 串流端點](CDC_access-endpoints.md)。

當應用程式啟動後，其會呼叫 KCL 以將工作者執行個體化。您必須向工作者提供應用程式的組態資訊，例如串流描述項和AWS登入資料，以及您提供的記錄處理器類別名稱。當其在紀錄處理器中執行程式碼時，工作者會執行下列任務：
+ 連線到串流
+ 列舉串流內的碎片
+ 與其他工作者 (若有) 協調碎片關聯性
+ 為其所管理的每個碎片執行個體化記錄處理器
+ 從串流提取紀錄
+ 將記錄推送至對應的記錄處理器
+ 對已處理的記錄執行檢查點作業
+ 當工作者執行個體數目變更時，平衡碎片與工作者的關聯
+ 當碎片進行分割時，平衡碎片與工作者的關聯

# 實作 Amazon Keyspaces CDC 串流的 KCL 取用者應用程式
<a name="cdc-kcl-implementation"></a>

本主題提供step-by-step指南，以處理 Amazon Keyspaces CDC 串流。

1. 先決條件：開始之前，請確定您已：
   + 具有 CDC 串流的 Amazon Keyspaces 資料表
   + IAM 主體需要 IAM 許可才能存取 Amazon Keyspaces CDC 串流、建立和存取 DynamoDB 資料表以進行 KCL 串流處理，以及將指標發佈至 CloudWatch 的許可。如需詳細資訊和政策範例，請參閱 [使用 Kinesis Client Library (KCL) 處理 Amazon Keyspaces CDC 串流的許可](configure-cdc-permissions.md#cdc-permissions-kcl)。
   + 請確定已在本機組態中設定有效的AWS登入資料。如需詳細資訊，請參閱[存放用於程式設計存取的存取金鑰](aws.credentials.manage.md)。
   + Java 開發套件 (JDK) 8 或更新版本
   + Github 上的 [Readme](https://github.com/aws/keyspaces-streams-kinesis-adapter) 中列出的需求。

1. <a name="cdc-kcl-add-dependencies"></a>在此步驟中，您將 KCL 相依性新增至您的專案。對於 Maven，請將下列項目新增至您的 pom.xml：

   ```
   <dependencies>
           <dependency>
               <groupId>software.amazon.kinesis</groupId>
               <artifactId>amazon-kinesis-client</artifactId>
               <version>3.1.0</version>
           </dependency>
           <dependency>
               <groupId>software.amazon.keyspaces</groupId>
               <artifactId>keyspaces-streams-kinesis-adapter</artifactId>
               <version>1.0.0</version>
           </dependency>
       </dependencies>
   ```
**注意**  
一律在 KCL [ GitHub 儲存庫檢查 KCL ](https://github.com/awslabs/amazon-kinesis-client)的最新版本。

1. <a name="cdc-kcl-factory"></a>建立產生記錄處理器執行個體的工廠類別：

   ```
   import software.amazon.awssdk.services.keyspacesstreams.model.Record;
   import software.amazon.keyspaces.streamsadapter.adapter.KeyspacesStreamsClientRecord;
   import software.amazon.keyspaces.streamsadapter.model.KeyspacesStreamsProcessRecordsInput;
   import software.amazon.keyspaces.streamsadapter.processor.KeyspacesStreamsShardRecordProcessor;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
   import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
   import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
   import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
   
   public class RecordProcessor implements KeyspacesStreamsShardRecordProcessor {
       private String shardId;
   
       @Override
       public void initialize(InitializationInput initializationInput) {
           this.shardId = initializationInput.shardId();
           System.out.println("Initializing record processor for shard: " + shardId);
       }
   
       @Override
       public void processRecords(KeyspacesStreamsProcessRecordsInput processRecordsInput) {
           try {
               for (KeyspacesStreamsClientRecord record : processRecordsInput.records()) {
                   Record keyspacesRecord = record.getRecord();
                   System.out.println("Received record: " + keyspacesRecord);
               }
   
               if (!processRecordsInput.records().isEmpty()) {
                   RecordProcessorCheckpointer checkpointer = processRecordsInput.checkpointer();
                   try {
                       checkpointer.checkpoint();
                       System.out.println("Checkpoint successful for shard: " + shardId);
                   } catch (Exception e) {
                       System.out.println("Error while checkpointing for shard: " + shardId + " " + e);
                   }
               }
           } catch (Exception e) {
               System.out.println("Error processing records for shard: " + shardId + " " + e);
           }
       }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
           System.out.println("Lease lost for shard: " + shardId);
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           System.out.println("Shard ended: " + shardId);
           try {
               // This is required. Checkpoint at the end of the shard
               shardEndedInput.checkpointer().checkpoint();
               System.out.println("Final checkpoint successful for shard: " + shardId);
           } catch (Exception e) {
               System.out.println("Error while final checkpointing for shard: " + shardId + " " + e);
               throw new RuntimeException("Error while final checkpointing", e);
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           System.out.println("Shutdown requested for shard " + shardId);
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (Exception e) {
               System.out.println("Error while checkpointing on shutdown for shard: " + shardId + " " + e);
           }
       }
   }
   ```

1. <a name="cdc-kcl-record-factory"></a>建立記錄工廠，如下列範例所示。

   ```
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   import java.util.Queue;
   import java.util.concurrent.ConcurrentLinkedQueue;
   
   public class RecordProcessorFactory implements ShardRecordProcessorFactory {
       private final Queue<RecordProcessor> processors = new ConcurrentLinkedQueue<>();
   
       @Override
       public ShardRecordProcessor shardRecordProcessor() {
           System.out.println("Creating new RecordProcessor");
           RecordProcessor processor = new RecordProcessor();
           processors.add(processor);
           return processor;
       }
   }
   ```

1. <a name="cdc-kcl-consumer"></a>在此步驟中，您會建立基本類別來設定 KCLv3 和 Amazon Keyspaces 轉接器。

   ```
   import com.example.KCLExample.utils.RecordProcessorFactory;
   import software.amazon.keyspaces.streamsadapter.AmazonKeyspacesStreamsAdapterClient;
   import software.amazon.keyspaces.streamsadapter.StreamsSchedulerFactory;
   import java.util.Arrays;
   import java.util.List;
   import java.util.concurrent.ExecutionException;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
   import software.amazon.awssdk.services.dynamodb.model.DeleteTableResponse;
   import software.amazon.awssdk.services.keyspacesstreams.KeyspacesStreamsClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   import software.amazon.kinesis.coordinator.CoordinatorConfig;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.leases.LeaseManagementConfig;
   import software.amazon.kinesis.processor.ProcessorConfig;
   import software.amazon.kinesis.processor.StreamTracker;
   import software.amazon.kinesis.retrieval.polling.PollingConfig;
   
   public class KCLTestBase {
   
       protected KeyspacesStreamsClient streamsClient;
       protected KinesisAsyncClient adapterClient;
       protected DynamoDbAsyncClient dynamoDbAsyncClient;
       protected CloudWatchAsyncClient cloudWatchClient;
       protected Region region;
       protected RecordProcessorFactory recordProcessorFactory;
       protected Scheduler scheduler;
       protected Thread schedulerThread;
   
       public void baseSetUp() {
           recordProcessorFactory = new RecordProcessorFactory();
           setupKCLBase();
       }
   
       protected void setupKCLBase() {
           region = Region.US_EAST_1;
   
           streamsClient = KeyspacesStreamsClient.builder()
                   .region(region)
                   .build();
           adapterClient = new AmazonKeyspacesStreamsAdapterClient(
                   streamsClient,
                   region);
           dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                   .region(region)
                   .build();
           cloudWatchClient = CloudWatchAsyncClient.builder()
                   .region(region)
                   .build();
       }
   
       protected void startScheduler(Scheduler scheduler) {
           this.scheduler = scheduler;
           schedulerThread = new Thread(() -> scheduler.run());
           schedulerThread.start();
       }
   
       protected void shutdownScheduler() {
           if (scheduler != null) {
               scheduler.shutdown();
               try {
                   schedulerThread.join(30000);
               } catch (InterruptedException e) {
                   System.out.println("Error while shutting down scheduler " + e);
               }
           }
       }
   
       protected Scheduler createScheduler(String streamArn, String leaseTableName) {
           String workerId = "worker-" + System.currentTimeMillis();
   
           // Create ConfigsBuilder
           ConfigsBuilder configsBuilder = createConfigsBuilder(streamArn, workerId, leaseTableName);
   
           // Configure retrieval config for polling
           PollingConfig pollingConfig = new PollingConfig(streamArn, adapterClient);
   
           // Create the Scheduler
           return StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   configsBuilder.coordinatorConfig(),
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig),
                   streamsClient,
                   region
           );
       }
   
       private ConfigsBuilder createConfigsBuilder(String streamArn, String workerId, String leaseTableName) {
           ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamArn,
                   leaseTableName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchClient,
                   workerId,
                   recordProcessorFactory);
   
           configureCoordinator(configsBuilder.coordinatorConfig());
           configureLeaseManagement(configsBuilder.leaseManagementConfig());
           configureProcessor(configsBuilder.processorConfig());
           configureStreamTracker(configsBuilder, streamArn);
   
           return configsBuilder;
       }
   
       private void configureCoordinator(CoordinatorConfig config) {
           config.skipShardSyncAtWorkerInitializationIfLeasesExist(true)
                   .parentShardPollIntervalMillis(1000)
                   .shardConsumerDispatchPollIntervalMillis(500);
       }
   
       private void configureLeaseManagement(LeaseManagementConfig config) {
           config.shardSyncIntervalMillis(0)
                   .leasesRecoveryAuditorInconsistencyConfidenceThreshold(0)
                   .leasesRecoveryAuditorExecutionFrequencyMillis(5000)
                   .leaseAssignmentIntervalMillis(1000L);
       }
   
       private void configureProcessor(ProcessorConfig config) {
           config.callProcessRecordsEvenForEmptyRecordList(true);
       }
   
       private void configureStreamTracker(ConfigsBuilder configsBuilder, String streamArn) {
           StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
                   streamArn,
                   InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
           );
           configsBuilder.streamTracker(streamTracker);
       }
   
       public void deleteAllDdbTables(String baseTableName) {
           List<String> tablesToDelete = Arrays.asList(
                   baseTableName,
                   baseTableName + "-CoordinatorState",
                   baseTableName + "-WorkerMetricStats"
           );
   
           for (String tableName : tablesToDelete) {
               deleteTable(tableName);
           }
       }
   
       private void deleteTable(String tableName) {
           DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                   .tableName(tableName)
                   .build();
   
           try {
               DeleteTableResponse response = dynamoDbAsyncClient.deleteTable(deleteTableRequest).get();
               System.out.println("Table deletion response " + response);
           } catch (InterruptedException | ExecutionException e) {
               System.out.println("Error deleting table: " + tableName + " " + e);
           }
       }
   }
   ```

1. <a name="cdc-kcl-record-processor"></a>在此步驟中，您會為應用程式實作記錄處理器類別，以開始處理變更事件。

   ```
    import software.amazon.kinesis.coordinator.Scheduler;
   
   public class KCLTest {
   
       private static final int APP_RUNTIME_SECONDS = 1800;
       private static final int SLEEP_INTERNAL_MS = 60*1000;
   
       public static void main(String[] args) {
           KCLTestBase kclTestBase;
   
           kclTestBase = new KCLTestBase();
           kclTestBase.baseSetUp();
   
           // Create and start scheduler
           String leaseTableName = generateUniqueApplicationName();
   
           // Update below to your Stream ARN
           String streamArn = "arn:aws:cassandra:us-east-1:759151643516:/keyspace/cdc_sample_test/table/test_kcl_bool/stream/2025-07-01T15:52:57.529";
           Scheduler scheduler = kclTestBase.createScheduler(streamArn, leaseTableName);
           kclTestBase.startScheduler(scheduler);
   
           // Wait for specified time before shutting down - KCL applications are designed to run forever, however in this
           // example we will shut it down after APP_RUNTIME_SECONDS
           long startTime = System.currentTimeMillis();
           long endTime = startTime + (APP_RUNTIME_SECONDS * 1000);
           while (System.currentTimeMillis() < endTime) {
               try {
                   // Print and sleep every minute
                   Thread.sleep(SLEEP_INTERNAL_MS);
                   System.out.println("Application is running");
               } catch (InterruptedException e) {
                   System.out.println("Interrupted while waiting for records");
                   Thread.currentThread().interrupt();
                   break;
               }
           }
   
           // Stop the scheduler
           kclTestBase.shutdownScheduler();
           kclTestBase.deleteAllDdbTables(leaseTableName);
       }
   
       public static String generateUniqueApplicationName() {
           String timestamp = String.valueOf(System.currentTimeMillis());
           String randomString = java.util.UUID.randomUUID().toString().substring(0, 8);
           return String.format("KCL-App-%s-%s", timestamp, randomString);
       }
   }
   ```

## 最佳實務
<a name="cdc-kcl-best-practices"></a>

搭配 Amazon Keyspaces CDC 串流使用 KCL 時，請遵循下列最佳實務：

**錯誤處理**  
在記錄處理器中實作強大的錯誤處理，以正常處理例外狀況。考慮實作暫時性故障的重試邏輯。

**檢查點頻率**  
平衡檢查點頻率，將重複處理降至最低，同時確保合理的進度追蹤。過於頻繁的檢查點可能會影響效能，而過於頻繁的檢查點可能會在工作者失敗時導致更多的重新處理。

**工作者擴展**  
根據 CDC 串流中的碎片數量來擴展工作者數量。一個很好的起點是每個碎片有一個工作者，但您可能需要根據您的處理需求進行調整。

**監控**  
使用 KCL 提供的 CloudWatch 指標來監控消費者應用程式的運作狀態和效能。關鍵指標包括處理延遲、檢查點存留期和租用計數。

**測試**  
徹底測試您的消費者應用程式，包括工作者失敗、串流重新分片和各種載入條件等案例。

## 使用 KCL 搭配非 Java 語言
<a name="cdc-kcl-non-java"></a>

雖然 KCL 主要是 Java 程式庫，但您可以透過 MultiLangDaemon 將其與其他程式設計語言搭配使用。MultiLangDaemon 是以 Java 為基礎的協助程式，可管理非 Java 記錄處理器與 KCL 之間的互動。

KCL 支援下列語言：
+ Python
+ Ruby
+ Node.js
+ .NET

如需使用 KCL 搭配非 Java 語言的詳細資訊，請參閱 [KCL MultiLangDaemon 文件](https://github.com/awslabs/amazon-kinesis-client/tree/master/amazon-kinesis-client-multilang)。

## 疑難排解
<a name="cdc-kcl-troubleshooting"></a>

本節提供將 KCL 與 Amazon Keyspaces CDC 串流搭配使用時可能遇到的常見問題的解決方案。

**處理緩慢**  
如果您的消費者應用程式處理記錄的速度緩慢，請考慮：  
+ 增加工作者執行個體的數量
+ 最佳化您的記錄處理邏輯
+ 檢查下游系統中是否有瓶頸

**重複處理**  
如果您看到重複處理記錄，請檢查您的檢查點邏輯。確定您在成功處理記錄後進行檢查點。

**工作者故障**  
如果工作者經常失敗，請檢查：  
+ 資源限制 (CPU、記憶體）
+ 網路連線問題
+ 許可問題

**租用資料表問題**  
如果您遇到 KCL 租用資料表的問題：  
+ 檢查您的應用程式是否具有存取 Amazon Keyspaces 資料表的適當許可
+ 確認資料表具有足夠的佈建輸送量