

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 在 Amazon Keyspaces 中处理变更数据捕获 (CDC) 流
<a name="cdc"></a>

Amazon Keyspaces 变更数据捕获 (CDC) 近乎实时地记录来自亚马逊密钥空间表的行级变更事件。

Amazon Keyspaces CDC 支持事件驱动的用例，例如工业物联网和欺诈检测，以及全文搜索和数据存档等数据处理用例。Amazon Keyspaces CDC 在流中捕获的变更事件可以由执行关键业务功能的下游应用程序使用，例如数据分析、文本搜索、机器学习训练/推理以及用于存档的持续数据备份。例如，您可以将流数据传输到诸如亚马逊服务、Amazon Redshift 和 Amazon S3 等 AWS 分析和存储 OpenSearch 服务进行进一步处理。

Amazon Keyspaces CDC 为表提供按时间排序和重复数据删除的更改记录，可自动扩展数据吞吐量，保留时间长达 24 小时。

Amazon Keyspaces CDC 流完全无服务器，您无需管理用于捕获变更事件的数据基础架构。此外，Amazon Keyspaces CDC 不消耗任何表容量来进行计算或存储。有关更多信息，请参阅 [变更数据捕获 (CDC) 流在 Amazon Keyspaces 中的工作原理](cdc_how-it-works.md)。

您可以使用 Amazon Keyspaces Streams API 来构建使用 Amazon Keyspaces CDC 流的应用程序，并根据内容采取行动。有关可用的终端节点，请参阅[如何在 Amazon Keyspaces 中访问 CDC 直播终端节点](CDC_access-endpoints.md)。

有关 Streams API 中可用于 Amazon Keyspaces 的所有操作的完整列表，请参阅 Amazon K [https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html)。

**Topics**
+ [变更数据捕获 (CDC) 流在 Amazon Keyspaces 中的工作原理](cdc_how-it-works.md)
+ [如何在 Amazon Keyspaces 中使用变更数据捕获 (CDC) 流](cdc_how-to-use.md)

# 变更数据捕获 (CDC) 流在 Amazon Keyspaces 中的工作原理
<a name="cdc_how-it-works"></a>

本节概述了变更数据捕获 (CDC) 流在 Amazon Keyspaces 中的工作原理。

Amazon Keyspaces 更改数据捕获 (CDC) 在 Amazon Keyspaces 表中记录一系列有序的行级修改，并将这些信息存储在名为*流*的日志中长达 24 小时。每次行级修改都会生成一个新的 CDC 记录，其中包含主键列信息以及该行（包括所有列）的 “之前” 和 “之后” 状态。应用程序可以近乎实时地访问直播并查看突变。

当您在表格上启用 CDC 时，Amazon Keyspaces 会创建一个新的 CDC 流，并开始捕获有关表中每个修改的信息。CDC 直播的亚马逊资源名称 (ARN) 格式如下：

```
arn:${Partition}:cassandra:{Region}:${Account}:/keyspace/${keyspaceName}/table/${tableName}/stream/${streamLabel}
```

首次启用 CDC 直播时，您可以选择 CDC 数据流为每条记录收集的信息*类型或视图类型*。之后你无法更改直播的视图类型。Amazon Keyspaces 支持以下视图类型：
+ `NEW_AND_OLD_IMAGES`— 捕获突变之前和之后的行的版本。这是默认值。
+ `NEW_IMAGE`— 捕获突变之后的行的版本。
+ `OLD_IMAGE`— 捕获突变之前行的版本。
+ `KEYS_ONLY`— 捕获被突变的行的分区和群集键。

每个 CDC 直播都由记录组成。每条记录代表 Amazon Keyspaces 表中的单行修改。从逻辑上讲，记录被组织成称为*分片*的组。这些组按主键范围（分区键、集群键范围的组合）进行逻辑组织，是 Amazon Keyspaces 的内部结构。每个分片充当多条记录的容器，并包含访问和迭代这些记录所需的信息。

![\[Amazon Keyspaces CDC 流由表示行突变集合的 CDC 记录的分片组成。\]](http://docs.aws.amazon.com/zh_cn/keyspaces/latest/devguide/images/keyspaces_cdc.png)


每个 CDC 记录都被分配一个序列号，该序列号反映了该记录在分片中发布的顺序。保证序列号在每个分片中不断增加且是唯一的。

Amazon Keyspaces 会自动创建和删除分片。根据流量负载，Amazon Keyspaces 还可以随着时间的推移拆分或合并分片。例如，Amazon Keyspaces 可以将一个分片拆分为多个新分片或将分片合并为一个新的分片。Amazon Keyspaces APIs 发布分片和 CDC 流信息，允许使用应用程序通过访问分片的整个谱系图以正确的顺序处理记录。

Amazon Keyspaces CDC 基于以下原则，您可以在构建应用程序时依靠这些原则：
+ 每条行级突变记录在 CDC 数据流中只出现一次。
+ 当你按世系顺序消耗分片时，每条行级突变记录的显示顺序与主键上的实际突变顺序相同。

**Topics**
+ [数据留存](#CDC_how-it-works-data-retention)
+ [TTL 数据过期](#CDC_how-it-works-ttl)
+ [分批操作](#CDC_how-it-works-batch-operations)
+ [静态列](#CDC_how-it-works-static)
+ [静态加密](#CDC_how-it-works-encryption)
+ [多区域复制](#CDC_how-it-works-mrr)
+ [与 AWS 服务集成](#howitworks_integration)

## 如何在 Amazon Keyspaces 中为 CDC 数据流保留数据
<a name="CDC_how-it-works-data-retention"></a>

Amazon Keyspaces 在 CDC 直播中将记录保留 24 小时。您无法更改保留期。如果您在表上禁用 CDC，则可以在 24 小时内继续读取流中的数据。在此时间之后，数据将过期，记录将自动删除。

## 存活时间 (TTL) 数据过期如何与 Amazon Keyspaces 中的 CDC 数据流配合使用
<a name="CDC_how-it-works-ttl"></a>

Amazon Keyspaces 在 CDC 变更记录中名`expirationTime`为的元数据字段中显示该级别的过期时间以及行级别。 column/cell 当 Amazon Keyspaces TTL 检测到单元格已过期时，CDC 会创建一个新的更改记录，显示 TTL 是更改的来源。有关 TTL 的更多信息，请参阅 [使用 Amazon Keyspaces（Apache Cassandra 兼容）的生存时间（TTL）功能让数据过期](TTL.md)。

## 如何对 Amazon Keyspaces 中的 CDC 直播进行批量操作
<a name="CDC_how-it-works-batch-operations"></a>

Batch 操作在内部分为单独的行级修改。Amazon Keyspaces 在行级别保留 CDC 流中的所有记录，即使修改是在批量操作中发生的。Amazon Keyspaces 按与行级或主键上发生的突变顺序相同的顺序维护 CDC 流中的记录顺序。

## 静态列在 Amazon Keyspaces 的 CDC 流中是如何工作的
<a name="CDC_how-it-works-static"></a>

在 Cassandra 中，静态列值在分区中的所有行之间共享。由于这种行为，Amazon Keyspaces 会将静态列的任何更新作为单独的记录捕获到 CDC 流中。以下示例总结了静态列突变的行为：
+ 当仅更新静态列时，CDC 流会包含对静态列的行修改，作为该行中唯一的列。
+ 当更新某行而不对静态列进行任何更改时，CDC 流将包含行修改，其中包含除静态列之外的所有列。
+ 当一行与静态列一起更新时，CDC 流包含两个单独的行修改，一个用于静态列，另一个用于该行的其余部分。

## 对于 Amazon Keyspaces 中的 CDC 直播进行静态加密的工作原理
<a name="CDC_how-it-works-encryption"></a>

为了对 CDC 排序日志中的静态数据进行加密，Amazon Keyspaces 使用的加密密钥与表中已使用的加密密钥相同。有关静态加密的更多信息，请参阅 [Amazon Keyspaces 中的静态加密](EncryptionAtRest.md)。

## 多区域复制如何适用于 Amazon Keyspaces 中的 CDC 流
<a name="CDC_how-it-works-mrr"></a>

您可以使用 `update-table` API 或 CQL 命令为多区域表的各个副本启用和禁用 `ALTER TABLE` CDC 流。由于异步复制和冲突解决，多区域表的 CDC 数据流不一致 AWS 区域。因此，Amazon Keyspaces 在流中捕获的记录在不同的区域中可能以不同的顺序出现。

有关多区域复制的更多信息，请参阅[Amazon Keyspaces 的多区域复制（适用于 Apache Cassandra）](multiRegion-replication.md)。

## CDC 直播并与 AWS 服务集成
<a name="howitworks_integration"></a>

### 如何在 Amazon Keyspaces 中使用用于 CDC 流的 VPC 终端节点
<a name="CDC_how-it-works-vpc"></a>

您可以使用 VPC 终端节点访问 Amazon Keyspaces CDC 直播。有关如何创建和访问直播的 VPC 终端节点的信息，请参阅[将 Amazon Keyspaces CDC 流与接口 VPC 终端节点一起使用](vpc-endpoints-streams.md)。

### 使用 Amazon Key CloudWatch spaces 中的 CDC 直播进行监控的工作原理
<a name="CDC_how-it-works-monitoring"></a>

您可以使用亚马逊监控对 Amazon CloudWatch Keyspaces CDC 终端节点进行的 API 调用。有关可用指标的更多信息，请参阅[Amazon Keyspaces 变更数据采集 (CDC) 的指标](metrics-dimensions.md#keyspaces-cdc-metrics)。

### 在 Amazon Key CloudTrail spaces 中使用 CDC 直播进行日志记录的工作原理
<a name="CDC_how-it-works-logging"></a>

Amazon Keyspaces CDC 与 AWS CloudTrail一项服务集成，可记录用户、角色或 AWS 服务在 Amazon Keyspaces 中采取的操作。 CloudTrail 将 Amazon Keyspaces 的数据定义语言 (DDL) API 调用和数据操纵语言 (DML) API 调用作为事件捕获。捕获的调用包括来自 Amazon Keyspaces 控制台的调用和对 Amazon Keyspaces API 操作的编程调用。

有关捕获的 CDC 事件的更多信息 CloudTrail，请参阅[使用记录 Amazon Keyspaces API 调用 AWS CloudTrail](logging-using-cloudtrail.md)。

### 在 Amazon Keyspaces 中为 CDC 直播添加标签的工作原理
<a name="CDC_how-it-works-tagging"></a>

Amazon Keyspaces CDC 流是一种可标记的资源。当你使用 CQL、 AWS SDK 或，以编程方式创建表时，你可以为直播添加标签。 AWS CLI您还可以为现有直播添加标签、删除标签或查看直播的标签。有关更多信息，请参阅 [在 Amazon Keyspaces 中标记密钥空间、表和流](Tagging.Operations.md)。

# 如何在 Amazon Keyspaces 中使用变更数据捕获 (CDC) 流
<a name="cdc_how-to-use"></a>

**Topics**
+ [配置 权限](configure-cdc-permissions.md)
+ [访问 CDC 直播端点](CDC_access-endpoints.md)
+ [为新表启用 CDC 流](keyspaces-enable-cdc-new-table.md)
+ [为现有表启用 CDC 流](keyspaces-enable-cdc-alter-table.md)
+ [禁用 CDC 直播](keyspaces-delete-cdc.md)
+ [查看 CDC 直播](keyspaces-view-cdc.md)
+ [访问 CDC 直播](keyspaces-records-cdc.md)
+ [使用 KCL 处理流](cdc_how-to-use-kcl.md)

# 在 Amazon Keyspaces 中配置使用 CDC 流的权限
<a name="configure-cdc-permissions"></a>

要启用 CDC 直播，委托人（例如 IAM 用户或角色）需要以下权限。

有关的更多信息 AWS Identity and Access Management，请参阅[AWS Identity and Access Management 适用于 Amazon Keyspaces](security-iam.md)。

## 为表启用 CDC 直播的权限
<a name="cdc-permissions-enable"></a>

[要为 Amazon Keyspaces 表启用 CDC 流，委托人首先需要创建或更改表的权限，其次需要创建服务关联角色 AWSService RoleForAmazonKeyspaces CDC 的权限。](using-service-linked-roles-CDC-streams.md#service-linked-role-permissions-CDC-streams)Amazon Keyspaces 使用服务关联角色代表您向您的账户发布 CloudWatch 指标

以下 IAM 策略就是一个例子。

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement":[
        {
            "Effect":"Allow",
            "Action":[
                "cassandra:Create",
                "cassandra:CreateMultiRegionResource",
                "cassandra:Alter",
                "cassandra:AlterMultiRegionResource"
            ],
            "Resource":[
                "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/*",
                "arn:aws:cassandra:us-east-1:111122223333:/keyspace/system*"
            ]
        },
        {
            "Sid": "KeyspacesCDCServiceLinkedRole",
            "Effect": "Allow",
            "Action": "iam:CreateServiceLinkedRole",
            "Resource": "arn:aws:iam::*:role/aws-service-role/cassandra-streams.amazonaws.com/AWSServiceRoleForAmazonKeyspacesCDC",
            "Condition": {
              "StringLike": {
                "iam:AWSServiceName": "cassandra-streams.amazonaws.com"
              }
            }
        }
    ]
}
```

要禁用直播，只需要`ALTER TABLE`权限。

## 查看 CDC 直播的权限
<a name="cdc-permissions-view"></a>

要查看或列出 CDC 流，主体需要系统密钥空间的读取权限。有关更多信息，请参阅 [`system_schema_mcs`](working-with-keyspaces.md#keyspace_system_schema_mcs)。

以下 IAM 策略就是一个例子。

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":"cassandra:Select",
         "Resource":[
             "arn:aws:cassandra:us-east-1:111122223333:/keyspace/system*"
         ]
      }
   ]
}
```

要使用或 Amazon Keyspaces API 查看 AWS CLI 或列出 CDC 直播，委托人需要额外的操作`cassandra:ListStreams`权限和。`cassandra:GetStream`

以下 IAM 策略就是一个例子。

```
{
  "Version": "2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "cassandra:Select",
        "cassandra:ListStreams",
        "cassandra:GetStream"
      ],
      "Resource": "*"
    }
  ]
}
```

## 读取 CDC 直播的权限
<a name="cdc-permissions-read"></a>

要读取 CDC 直播，主体需要以下权限。

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "cassandra:GetStream",
            "cassandra:GetShardIterator",
            "cassandra:GetRecords"
         ],
         "Resource":[
            "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label"
         ]
      }
   ]
}
```

## 使用 Kinesis 客户端库 (KCL) 处理 Amazon Keyspaces CDC 直播的权限
<a name="cdc-permissions-kcl"></a>

要使用 KCL 处理 Amazon Keyspaces CDC 流，IAM 委托人需要以下权限。
+ `Amazon Keyspaces`— 对指定的 Amazon Keyspaces CDC 流的只读访问权限。
+ `DynamoDB`— KCL 流处理所需的创建`shard lease`表、对表的读写访问权限以及对索引的读取访问权限。
+ `CloudWatch`— 有权发布来自 Amazon Keyspaces CDC 的指标数据，将使用 KCL 处理的数据流式传输到账户中 KCL 客户端应用程序的命名空间。 CloudWatch 有关监控的更多信息，请参阅使用[亚马逊监控 Kinesis 客户端库](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html)。 CloudWatch

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "cassandra:GetStream",
            "cassandra:GetShardIterator",
            "cassandra:GetRecords"
         ],
         "Resource":[
            "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:CreateTable",
            "dynamodb:DescribeTable",
            "dynamodb:UpdateTable",
            "dynamodb:GetItem",
            "dynamodb:UpdateItem",
            "dynamodb:PutItem",
            "dynamodb:DeleteItem",
            "dynamodb:Scan"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:CreateTable",
            "dynamodb:DescribeTable",
            "dynamodb:GetItem",
            "dynamodb:UpdateItem",
            "dynamodb:PutItem",
            "dynamodb:DeleteItem",
            "dynamodb:Scan"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME-WorkerMetricStats",
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME-CoordinatorState"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:Query"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME/index/*"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "cloudwatch:PutMetricData"
         ],
         "Resource":"*"
      }
   ]
}
```

# 如何在 Amazon Keyspaces 中访问 CDC 直播终端节点
<a name="CDC_access-endpoints"></a>

在每个提供 keyspaces/tables 亚马逊密钥空间的地方，A AWS 区域 mazon Keyspaces 都为 CDC 流维护单独的[终端节点](programmatic.endpoints.md#global_endpoints)。要访问 CDC 流，请选择表的区域，并将终端节点名称`cassandra-streams`中的`cassandra`前缀替换为以下示例所示：

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/keyspaces/latest/devguide/CDC_access-endpoints.html)

下表包含的可用公共终端节点的完整列表 Amazon Keyspaces change data capture streams。 Amazon Keyspaces CDC streams 同时支持 IPv4 和 IPv6。例如`cassandra-streams.us-east-1.api.aws`，所有公共端点都是双堆栈端点，可以为 IPv4 和进行配置 IPv6。

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/keyspaces/latest/devguide/CDC_access-endpoints.html)

# 在 Amazon Keyspaces 中创建新表时启用 CDC 流
<a name="keyspaces-enable-cdc-new-table"></a>

要在创建表时启用 CDC 流，可以使用 CQL 中的`CREATE TABLE`语句或带有的`create-table` AWS CLI命令。

对于表中每更改一行，Amazon Keyspaces 都可以根据`cdc_specification`您选择`view_type`的内容捕获以下更改：
+ `NEW_AND_OLD_IMAGES`— 该行的两个版本，在更改之前和之后。这是默认值。
+ `NEW_IMAGE`— 更改后的行的版本。
+ `OLD_IMAGE`— 更改前行的版本。
+ `KEYS_ONLY`— 已更改行的分区和群集键。

有关如何为直播添加标签的信息，请参阅[创建表时向新直播添加标签](Tagging.Operations.new.table.stream.md)。

**注意**  
Amazon Keyspaces CDC 要求服务相关角色 (`AWSServiceRoleForAmazonKeyspacesCDC`) 代表您 CloudWatch 发布来自亚马逊密钥空间 CDC 流`"cloudwatch:namespace": "AWS/Cassandra"`的指标数据。将自动为您创建此角色。有关更多信息，请参阅 [在 Amazon Keyspaces CDC 直播中使用角色](using-service-linked-roles-CDC-streams.md)。

------
#### [ Cassandra Query Language (CQL) ]

**使用 CQL 创建表时启用 CDC 流**

1. 

   ```
   CREATE TABLE mykeyspace.mytable (a text, b text, PRIMARY KEY(a)) 
   WITH CUSTOM_PROPERTIES={'cdc_specification': {'view_type': 'NEW_IMAGE'}} AND CDC = TRUE;
   ```

1. 要确认直播设置，您可以使用以下语句。

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   该语句的输出应类似于此。

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';keyspace_name | table_name | cdc  | custom_properties
   ---------------+------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
       mykeyspace |   mytable  | True | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741383893782', 'throughput_mode': 'PAY_PER_REQUEST'}, 'cdc_specification': {'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2025-03-07T21:44:53.783', 'status': 'ENABLED', 'view_type': 'NEW_IMAGE'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}>
   ```

------
#### [ CLI ]

**使用创建表时启用 CDC 流 AWS CLI**

1. 要创建直播，您可以使用以下语法。

   ```
   aws keyspaces create-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --schema-definition 'allColumns=[{name=a,type=text},{name=b,type=text}],partitionKeys=[{name=a}]' \
   --cdc-specification status=ENABLED,viewType=NEW_IMAGE
   ```

1. 该命令的输出显示了标准`create-table`响应，看起来与本示例类似。

   ```
   { "resourceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable" }
   ```

------

# 为 Amazon Keyspaces 中的现有表启用 CDC 流
<a name="keyspaces-enable-cdc-alter-table"></a>

要为现有表启用 CDC 流，可以使用 CQL 中的`ALTER TABLE`语句、带的`update-table`命令 AWS CLI，也可以使用控制台。

对于表中每更改一行，Amazon Keyspaces 都可以根据`cdc_specification`您选择`view_type`的内容捕获以下更改：
+ `NEW_AND_OLD_IMAGES`— 该行的两个版本，在更改之前和之后。这是默认值。
+ `NEW_IMAGE`— 更改后的行的版本。
+ `OLD_IMAGE`— 更改前行的版本。
+ `KEYS_ONLY`— 已更改行的分区和群集键。

有关如何为直播添加标签的信息，请参阅[向直播添加新标签](Tagging.Operations.existing.stream.md)。

**注意**  
Amazon Keyspaces CDC 要求服务相关角色 (`AWSServiceRoleForAmazonKeyspacesCDC`) 代表您 CloudWatch 发布来自亚马逊密钥空间 CDC 流`"cloudwatch:namespace": "AWS/Cassandra"`的指标数据。将自动为您创建此角色。有关更多信息，请参阅 [在 Amazon Keyspaces CDC 直播中使用角色](using-service-linked-roles-CDC-streams.md)。

------
#### [ Cassandra Query Language (CQL) ]

**使用 CQL 启用直播（CDC 直播）**

您可以使用`ALTER TABLE`为现有表启用数据流。

1. 以下示例创建了一个流，该流仅捕获已更改行的分区和集群键的更改。

   ```
   ALTER TABLE mykeyspace.mytable
   WITH cdc = TRUE
   AND CUSTOM_PROPERTIES={'cdc_specification': {'view_type': 'KEYS_ONLY'}};
   ```

1. 要验证直播设置，您可以使用以下语句。

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   该语句的输出类似于此。

   ```
    keyspace_name | table_name | cdc  | custom_properties
   ---------------+------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
       mykeyspace |    mytable | True | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741385897045', 'throughput_mode': 'PAY_PER_REQUEST'}, 'cdc_specification': {'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2025-03-07T22:20:10.454', 'status': 'ENABLED', 'view_type': 'KEYS_ONLY'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}
   ```

------
#### [ CLI ]

**使用创建 CDC 直播 AWS CLI**

1. 要为现有表创建流，可以使用以下语法。

   ```
   aws keyspaces update-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --cdc-specification status=ENABLED,viewType=NEW_AND_OLD_IMAGES
   ```

1. 该命令的输出显示了标准`create-table`响应，看起来与本示例类似。

   ```
   { "resourceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable" }
   ```

------
#### [ Console ]

**使用 Amazon Keyspaces 控制台启用 CDC 直播**

1. [登录并在家中打开 Amazon Keyspaces 控制台。 AWS 管理控制台 https://console.aws.amazon.com/keyspaces/](https://console.aws.amazon.com/keyspaces/home)

1. 在导航窗格中，选择 “**表**”，然后从列表中选择一个表。

1. 选择 “**直播**” 选项卡。

1. 选择 **“编辑”** 以启用直播。

1. 选择 “**开启直播**”。

1. 选择直播的**视图类型**。可用选项如下：请注意，在创建直播后，您无法更改其视图类型。
   + **新图像和旧图像** — Amazon Keyspaces 会捕获该行的两个版本，包括更改之前和之后的版本。这是默认值。
   + **新图片** — Amazon Keyspaces 仅捕获更改后的行的版本。
   + **旧图片** — Amazon Keyspaces 仅捕获更改前行的版本。
   + **仅限主键** — Amazon Keyspaces 仅捕获已更改行的分区和群集键列。

1. 要完成操作，请选择**保存更改**。

------

# 在 Amazon Keyspaces 中禁用 CDC 直播
<a name="keyspaces-delete-cdc"></a>

要在密钥空间中禁用 CDC 流，可以使用 CQL 中的`ALTER TABLE`语句、带的`update-table` AWS CLI命令或控制台。

------
#### [ Cassandra Query Language (CQL) ]

**使用 CQL 禁用直播（CDC 数据流）**

1. 要禁用直播，您可以使用以下语句。

   ```
   ALTER TABLE mykeyspace.mytable
   WITH cdc = FALSE;
   ```

1. 要确认直播已禁用，您可以使用以下语句。

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   该语句的输出类似于此。

   ```
    keyspace_name | table_name | cdc   | custom_properties
   ---------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      mykeyspace  |   mytable  | False | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741385668642', 'throughput_mode': 'PAY_PER_REQUEST'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}
   ```

------
#### [ CLI ]

**使用禁用直播（CDC 直播） AWS CLI**

1. 要禁用直播，您可以使用以下命令。

   ```
   aws keyspaces update-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --cdc-specification status=DISABLED
   ```

1. 该命令的输出类似于此示例。

   ```
   {
       "keyspaceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/",
       "streamName": "my_stream"
   }
   ```

------
#### [ Console ]

**使用 Amazon Keyspaces 控制台禁用直播（CDC 直播）**

1. [登录并在家中打开 Amazon Keyspaces 控制台。 AWS 管理控制台 https://console.aws.amazon.com/keyspaces/](https://console.aws.amazon.com/keyspaces/home)

1. 在导航窗格中，选择 “**表**”，然后从列表中选择一个表。

1. 选择 “**直播**” 选项卡。

1. 选择**编辑**。

1. 取消选择 “**开启直播**”。

1. 选择 “**保存更改**” 以禁用直播。

------

# 在 Amazon Keyspaces 中查看 CDC 直播
<a name="keyspaces-view-cdc"></a>

要查看或列出密钥空间中的所有流，可以使用 CQL `system_schema_mcs.streams` 中的语句查询系统密钥空间中的表，或者在、或控制台中 AWS CLI使用`get-stream`和`list-stream`命令。

有关所需的权限，请参阅[在 Amazon Keyspaces 中配置使用 CDC 流的权限](configure-cdc-permissions.md)。

------
#### [ Cassandra Query Language (CQL) ]

**使用 CQL 查看 CDC 直播**
+ 要监控表的 CDC 状态，您可以使用以下语句。

  ```
  SELECT custom_properties
  FROM system_schema_mcs.tables 
  WHERE keyspace_name='my_keyspace' and table_name='my_table';
  ```

  该命令的输出类似于此。

  ```
  ...
  custom_properties
  ----------------------------------------------------------------------------------
  {'cdc_specification':{'status': 'Enabled', 'view_type': 'NEW_IMAGE', 'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label''}}
  ...
  ```

------
#### [ CLI ]

**使用查看 CDC 直播 AWS CLI**

1. 此示例说明如何查看表的流信息。

   ```
   aws keyspaces get-table \
   --keyspace-name 'my_keyspace' \
   --table-name 'my_table'
   ```

   命令的输出如下所示。

   ```
   {
       "keyspaceName": "my_keyspace",
       "tableName": "my_table",
       ... Other fields ...,
       "latestStreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label",
       "cdcSpecification": {
           "status": "ENABLED",
           "viewType": "NEW_AND_OLD_IMAGES"    
       }
   }
   ```

1. 你可以在指定的列表中列出你账户中的所有直播 AWS 区域。下面是一个命令示例。

   ```
   aws keyspacesstreams list-streams --region us-east-1
   ```

   该命令的输出可能与此类似。

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t1",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"Create a keyspace with the name catalog. Note
                                   that streams are not supported in multi-Region keyspaces.
               "TableName": "t2",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_2/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_3"
               "TableName": "t1",
           }
       ]
   }
   ```

1. 您还可以使用以下参数列出给定密钥空间的 CDC 流。

   ```
   aws keyspacesstreams list-streams --keyspace-name ks_1 --region us-east-1
   ```

   该命令的输出类似于此。

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t1",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t2",
           }
       ]
   }
   ```

1. 您还可以使用以下参数列出给定表的 CDC 流。

   ```
   aws keyspacesstreams list-streams --keyspace-name ks_1 --table-name t2 --region us-east-1
   ```

   该命令的输出类似于此。

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t2",
           }
       ]
   }
   ```

------
#### [ Console ]

**在 Amazon Keyspaces 控制台中查看 CDC 直播**

1. [登录并在家中打开 Amazon Keyspaces 控制台。 AWS 管理控制台 https://console.aws.amazon.com/keyspaces/](https://console.aws.amazon.com/keyspaces/home)

1. 在导航窗格中，选择 “**表**”，然后从列表中选择一个表。

1. 选择 “**直播**” 选项卡以查看直播详情。

------

# 在 Amazon Keyspaces 中访问 CDC 直播中的记录
<a name="keyspaces-records-cdc"></a>

要访问直播中的记录，您可以使用 [Amazon Keyspaces St](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html) reams API。以下部分包含有关如何使用访问记录的示例 AWS CLI。

有关所需的权限，请参阅[在 Amazon Keyspaces 中配置使用 CDC 流的权限](configure-cdc-permissions.md)。

**使用访问直播中的记录 AWS CLI**

1. 您可以使用 Amazon Keyspaces Streams API 访问直播的更改记录。有关更多信息，请参阅 [https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html) 参考。要检索流中的分片，你可以使用 `get-stream` API，如以下示例所示。

   ```
   aws keyspacesstreams get-stream \
   --stream-arn 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/STREAM_LABEL'
   ```

   下面是输出的一个示例。

   ```
   {
      "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2023-05-11T21:21:33.291",
      "StreamStatus": "ENABLED",
      "StreamViewType": "NEW_AND_OLD_IMAGES",
      "CreationRequestDateTime": "<CREATION_TIME>",
      "KeyspaceName": "mykeyspace",
      "TableName": "mytable",
      "StreamLabel": "2023-05-11T21:21:33.291",
       "Shards": [
           {
               "SequenceNumberRange": {
                   "EndingSequenceNumber": "<END_SEQUENCE_NUMBER>",
                   "StartingSequenceNumber": "<START_SEQUENCE_NUMBER>"
               },
               "ShardId": "<SHARD_ID>"
           },
       ]
   }
   ```

1. 要从流中检索记录，首先要获取一个迭代器，该迭代器为你提供了访问记录的起点。为此，您可以使用上一步中 API 返回的 CDC 流中的分片。要收集迭代器，可以使用 `get-shard-iterator` API。在此示例中，您使用类型的迭代器，`TRIM_HORIZON`该迭代器从分片的最后一个修剪点（或起点）进行检索。

   ```
   aws keyspacesstreams get-shard-iterator \
   --stream-arn 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/STREAM_LABEL' \
   --shard-id 'SHARD_ID' \
   --shard-iterator-type 'TRIM_HORIZON'
   ```

   该命令的输出如下例所示。

   ```
   {
       "ShardIterator": "<SHARD_ITERATOR>" 
   }
   ```

1. 要使用 `get-records` API 检索 CDC 记录，您可以使用上一步返回的迭代器。下面是一个命令示例。

   ```
   aws keyspacesstreams get-records \
   --shard-iterator 'SHARD_ITERATOR' \
   --limit 100
   ```

# 使用 Kinesis 客户端库 (KCL) 处理 Amazon Keyspaces 直播
<a name="cdc_how-to-use-kcl"></a>

本主题介绍如何使用 Kinesis 客户端库 (KCL) 来使用和处理来自 Amazon Keyspaces 变更数据捕获 (CDC) 流的数据。

使用 Kinesis 客户端库 (KCL) 可以提供许多好处，而不是直接使用 Amazon Keyspaces Streams Streams API，例如：
+ 内置分片血统跟踪和迭代器处理。
+ 在工作人员之间自动进行负载平衡。
+ 容错能力和从工作器故障中恢复。
+ 使用检查点来跟踪处理进度。
+ 适应直播容量的变化。
+ 简化了用于处理 CDC 记录的分布式计算。

以下部分概述了使用 Kinesis 客户端库 (KCL) 处理流的原因和方法，并提供了使用 KCL 处理 Amazon Keyspaces CDC 流的示例。

有关定价的信息，请参阅 [Amazon Keyspaces（Apache Cassandra 兼容）定价](https://aws.amazon.com/keyspaces/pricing)。

## 什么是 Kinesis Client Library？
<a name="cdc-kcl-what-is"></a>

Kinesis 客户端库 (KCL) 是一个独立的 Java 软件库，旨在简化使用和处理来自流的数据的过程。KCL 可以处理许多与分布式计算相关的复杂任务，让您在处理流数据时专注于实现业务逻辑。KCL 管理多个 worker 之间的负载平衡、响应工作器故障、检查已处理记录以及响应流中分片数量的变化等活动。

要处理 Amazon Keyspaces CDC 流，您可以使用 KCL 中的设计模式来处理流分片和流记录。KCL 提供低级 Kinesis Data Streams API 之上的有用抽象来简化编码。有关 KCL 的更多信息，请参阅 *Amazon Kinesis Data Streams* [开发者指南中的使用 KCL 开发消费者](https://docs.aws.amazon.com/kinesis/latest/dev/develop-kcl-consumers.html)。

 要使用 KCL 编写应用程序，请使用 Amazon Keyspaces Streams Kinesis 适配器。Kinesis 适配器实现了 Kinesis Data Streams 接口，因此您可以使用 KCL 来使用和处理来自亚马逊密钥空间流的记录。有关如何设置和安装 Amazon Keyspaces Streams Kinesis 适配器的说明，请访问存储库。[GitHub](https://github.com/aws/keyspaces-streams-kinesis-adapter)

下图显示了这些库是如何相互交互的。

![\[处理亚马逊密钥空间 CDC 流记录 APIs 时，客户端应用程序与 Kinesis Data Streams、KCL、Amazon Keyspaces Streams Kinesis 适配器和亚马逊密钥空间之间的交互。\]](http://docs.aws.amazon.com/zh_cn/keyspaces/latest/devguide/images/keyspaces-streams-kinesis-adapter.png)


KCL 经常更新，以纳入新版底层库、安全改进和错误修复。建议使用最新版本的 KCL，以避免出现已知问题，并从所有最新的改进中受益。要查找最新的 KCL 版本，请参阅 [KCL 存储库 GitHub ](https://github.com/awslabs/amazon-kinesis-client)。

## KCL 概念
<a name="cdc-kcl-concepts"></a>

在使用 KCL 实现消费者应用程序之前，您应该了解以下概念：

**KCL 消费者应用程序**  
KCL 消费者应用程序是一个处理来自 Amazon Keyspaces CDC 流的数据的程序。KCL 充当您的消费者应用程序代码和 Amazon Keyspaces CDC 流之间的中介。

**工人**  
工作程序是 KCL 使用者应用程序的执行单元，用于处理来自 Amazon Keyspaces CDC 流的数据。您的应用程序可以运行分布在多个实例上的多个工作程序。

**记录处理器**  
记录处理器是应用程序中处理来自 Amazon Keyspaces CDC 流中分片的数据的逻辑。记录处理器由工作程序为其管理的每个分片实例化。

**租赁**  
租赁代表分片的处理责任。工作人员使用租约来协调哪个工作人员正在处理哪个分片。KCL 将租赁数据存储在亚马逊 DynamoDB 的表中。

**检查点**  
检查点是记录处理器在分片中成功处理记录的位置的记录。Checkpointing 使您的应用程序能够在工作程序失败时从中断的位置恢复处理。

Amazon Keyspaces Kinesis 适配器到位后，您可以开始基于 KCL 接口进行开发，API 调用无缝定向到亚马逊密钥空间流终端节点。有关可用端点的列表，请参阅[如何在 Amazon Keyspaces 中访问 CDC 直播终端节点](CDC_access-endpoints.md)。

应用程序启动后，调用 KCL 来实例化工作进程。您必须向工作人员提供应用程序的配置信息，例如流描述符和AWS凭证，以及您提供的记录处理器类的名称。在记录处理器中运行代码时，工作进程执行以下任务：
+ 连接到流
+ 枚举流中的分片
+ 协调与其他工作程序的分片关联（如果有）
+ 为其管理的每个分片实例化记录处理器
+ 从流中提取记录
+ 将记录推送到对应的记录处理器
+ 对已处理记录进行检查点操作
+ 在工作程序实例计数更改时均衡分片与工作程序的关联
+ 在分片被拆分时平衡分片与工作程序的关联

# 为 Amazon Keyspaces CDC 直播实施 KCL 消费者应用程序
<a name="cdc-kcl-implementation"></a>

本主题提供了实施 KCL 使用者应用程序来处理 Amazon Keyspaces CDC 流的 step-by-step指南。

1. 先决条件：在开始之前，请确保您已具备以下条件：
   + 带有 CDC 直播的 Amazon Keyspaces 表
   + IAM 委托人必须拥有 IAM 权限，才能访问 Amazon Keyspaces CDC 流、创建和访问用于 KCL 流处理的 DynamoDB 表以及向其发布指标的权限。 CloudWatch有关更多信息和策略示例，请参阅[使用 Kinesis 客户端库 (KCL) 处理 Amazon Keyspaces CDC 直播的权限](configure-cdc-permissions.md#cdc-permissions-kcl)。
   + 确保在本地配置中设置了有效的AWS凭证。有关更多信息，请参阅 [存储用于通过编程方式进行访问的访问密钥](aws.credentials.manage.md)。
   + Java Development Kit（JDK）8 或更高版本
   + 要求列在 Github 上的[自述文件](https://github.com/aws/keyspaces-streams-kinesis-adapter)中。

1. <a name="cdc-kcl-add-dependencies"></a>在此步骤中，您将 KCL 依赖项添加到您的项目中。对于 Maven，请将以下内容添加到你的 pom.xml 中：

   ```
   <dependencies>
           <dependency>
               <groupId>software.amazon.kinesis</groupId>
               <artifactId>amazon-kinesis-client</artifactId>
               <version>3.1.0</version>
           </dependency>
           <dependency>
               <groupId>software.amazon.keyspaces</groupId>
               <artifactId>keyspaces-streams-kinesis-adapter</artifactId>
               <version>1.0.0</version>
           </dependency>
       </dependencies>
   ```
**注意**  
请务必在 KCL 存储库中查看最新版本的 [KCL GitHub ](https://github.com/awslabs/amazon-kinesis-client)。

1. <a name="cdc-kcl-factory"></a>创建一个生成记录处理器实例的工厂类：

   ```
   import software.amazon.awssdk.services.keyspacesstreams.model.Record;
   import software.amazon.keyspaces.streamsadapter.adapter.KeyspacesStreamsClientRecord;
   import software.amazon.keyspaces.streamsadapter.model.KeyspacesStreamsProcessRecordsInput;
   import software.amazon.keyspaces.streamsadapter.processor.KeyspacesStreamsShardRecordProcessor;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
   import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
   import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
   import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
   
   public class RecordProcessor implements KeyspacesStreamsShardRecordProcessor {
       private String shardId;
   
       @Override
       public void initialize(InitializationInput initializationInput) {
           this.shardId = initializationInput.shardId();
           System.out.println("Initializing record processor for shard: " + shardId);
       }
   
       @Override
       public void processRecords(KeyspacesStreamsProcessRecordsInput processRecordsInput) {
           try {
               for (KeyspacesStreamsClientRecord record : processRecordsInput.records()) {
                   Record keyspacesRecord = record.getRecord();
                   System.out.println("Received record: " + keyspacesRecord);
               }
   
               if (!processRecordsInput.records().isEmpty()) {
                   RecordProcessorCheckpointer checkpointer = processRecordsInput.checkpointer();
                   try {
                       checkpointer.checkpoint();
                       System.out.println("Checkpoint successful for shard: " + shardId);
                   } catch (Exception e) {
                       System.out.println("Error while checkpointing for shard: " + shardId + " " + e);
                   }
               }
           } catch (Exception e) {
               System.out.println("Error processing records for shard: " + shardId + " " + e);
           }
       }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
           System.out.println("Lease lost for shard: " + shardId);
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           System.out.println("Shard ended: " + shardId);
           try {
               // This is required. Checkpoint at the end of the shard
               shardEndedInput.checkpointer().checkpoint();
               System.out.println("Final checkpoint successful for shard: " + shardId);
           } catch (Exception e) {
               System.out.println("Error while final checkpointing for shard: " + shardId + " " + e);
               throw new RuntimeException("Error while final checkpointing", e);
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           System.out.println("Shutdown requested for shard " + shardId);
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (Exception e) {
               System.out.println("Error while checkpointing on shutdown for shard: " + shardId + " " + e);
           }
       }
   }
   ```

1. <a name="cdc-kcl-record-factory"></a>创建唱片工厂，如以下示例所示。

   ```
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   import java.util.Queue;
   import java.util.concurrent.ConcurrentLinkedQueue;
   
   public class RecordProcessorFactory implements ShardRecordProcessorFactory {
       private final Queue<RecordProcessor> processors = new ConcurrentLinkedQueue<>();
   
       @Override
       public ShardRecordProcessor shardRecordProcessor() {
           System.out.println("Creating new RecordProcessor");
           RecordProcessor processor = new RecordProcessor();
           processors.add(processor);
           return processor;
       }
   }
   ```

1. <a name="cdc-kcl-consumer"></a>在此步骤中，您将创建要配置的基类 KCLv3 和 Amazon Keyspaces 适配器。

   ```
   import com.example.KCLExample.utils.RecordProcessorFactory;
   import software.amazon.keyspaces.streamsadapter.AmazonKeyspacesStreamsAdapterClient;
   import software.amazon.keyspaces.streamsadapter.StreamsSchedulerFactory;
   import java.util.Arrays;
   import java.util.List;
   import java.util.concurrent.ExecutionException;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
   import software.amazon.awssdk.services.dynamodb.model.DeleteTableResponse;
   import software.amazon.awssdk.services.keyspacesstreams.KeyspacesStreamsClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   import software.amazon.kinesis.coordinator.CoordinatorConfig;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.leases.LeaseManagementConfig;
   import software.amazon.kinesis.processor.ProcessorConfig;
   import software.amazon.kinesis.processor.StreamTracker;
   import software.amazon.kinesis.retrieval.polling.PollingConfig;
   
   public class KCLTestBase {
   
       protected KeyspacesStreamsClient streamsClient;
       protected KinesisAsyncClient adapterClient;
       protected DynamoDbAsyncClient dynamoDbAsyncClient;
       protected CloudWatchAsyncClient cloudWatchClient;
       protected Region region;
       protected RecordProcessorFactory recordProcessorFactory;
       protected Scheduler scheduler;
       protected Thread schedulerThread;
   
       public void baseSetUp() {
           recordProcessorFactory = new RecordProcessorFactory();
           setupKCLBase();
       }
   
       protected void setupKCLBase() {
           region = Region.US_EAST_1;
   
           streamsClient = KeyspacesStreamsClient.builder()
                   .region(region)
                   .build();
           adapterClient = new AmazonKeyspacesStreamsAdapterClient(
                   streamsClient,
                   region);
           dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                   .region(region)
                   .build();
           cloudWatchClient = CloudWatchAsyncClient.builder()
                   .region(region)
                   .build();
       }
   
       protected void startScheduler(Scheduler scheduler) {
           this.scheduler = scheduler;
           schedulerThread = new Thread(() -> scheduler.run());
           schedulerThread.start();
       }
   
       protected void shutdownScheduler() {
           if (scheduler != null) {
               scheduler.shutdown();
               try {
                   schedulerThread.join(30000);
               } catch (InterruptedException e) {
                   System.out.println("Error while shutting down scheduler " + e);
               }
           }
       }
   
       protected Scheduler createScheduler(String streamArn, String leaseTableName) {
           String workerId = "worker-" + System.currentTimeMillis();
   
           // Create ConfigsBuilder
           ConfigsBuilder configsBuilder = createConfigsBuilder(streamArn, workerId, leaseTableName);
   
           // Configure retrieval config for polling
           PollingConfig pollingConfig = new PollingConfig(streamArn, adapterClient);
   
           // Create the Scheduler
           return StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   configsBuilder.coordinatorConfig(),
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig),
                   streamsClient,
                   region
           );
       }
   
       private ConfigsBuilder createConfigsBuilder(String streamArn, String workerId, String leaseTableName) {
           ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamArn,
                   leaseTableName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchClient,
                   workerId,
                   recordProcessorFactory);
   
           configureCoordinator(configsBuilder.coordinatorConfig());
           configureLeaseManagement(configsBuilder.leaseManagementConfig());
           configureProcessor(configsBuilder.processorConfig());
           configureStreamTracker(configsBuilder, streamArn);
   
           return configsBuilder;
       }
   
       private void configureCoordinator(CoordinatorConfig config) {
           config.skipShardSyncAtWorkerInitializationIfLeasesExist(true)
                   .parentShardPollIntervalMillis(1000)
                   .shardConsumerDispatchPollIntervalMillis(500);
       }
   
       private void configureLeaseManagement(LeaseManagementConfig config) {
           config.shardSyncIntervalMillis(0)
                   .leasesRecoveryAuditorInconsistencyConfidenceThreshold(0)
                   .leasesRecoveryAuditorExecutionFrequencyMillis(5000)
                   .leaseAssignmentIntervalMillis(1000L);
       }
   
       private void configureProcessor(ProcessorConfig config) {
           config.callProcessRecordsEvenForEmptyRecordList(true);
       }
   
       private void configureStreamTracker(ConfigsBuilder configsBuilder, String streamArn) {
           StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
                   streamArn,
                   InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
           );
           configsBuilder.streamTracker(streamTracker);
       }
   
       public void deleteAllDdbTables(String baseTableName) {
           List<String> tablesToDelete = Arrays.asList(
                   baseTableName,
                   baseTableName + "-CoordinatorState",
                   baseTableName + "-WorkerMetricStats"
           );
   
           for (String tableName : tablesToDelete) {
               deleteTable(tableName);
           }
       }
   
       private void deleteTable(String tableName) {
           DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                   .tableName(tableName)
                   .build();
   
           try {
               DeleteTableResponse response = dynamoDbAsyncClient.deleteTable(deleteTableRequest).get();
               System.out.println("Table deletion response " + response);
           } catch (InterruptedException | ExecutionException e) {
               System.out.println("Error deleting table: " + tableName + " " + e);
           }
       }
   }
   ```

1. <a name="cdc-kcl-record-processor"></a>在此步骤中，您将为应用程序实现记录处理器类，以开始处理更改事件。

   ```
    import software.amazon.kinesis.coordinator.Scheduler;
   
   public class KCLTest {
   
       private static final int APP_RUNTIME_SECONDS = 1800;
       private static final int SLEEP_INTERNAL_MS = 60*1000;
   
       public static void main(String[] args) {
           KCLTestBase kclTestBase;
   
           kclTestBase = new KCLTestBase();
           kclTestBase.baseSetUp();
   
           // Create and start scheduler
           String leaseTableName = generateUniqueApplicationName();
   
           // Update below to your Stream ARN
           String streamArn = "arn:aws:cassandra:us-east-1:759151643516:/keyspace/cdc_sample_test/table/test_kcl_bool/stream/2025-07-01T15:52:57.529";
           Scheduler scheduler = kclTestBase.createScheduler(streamArn, leaseTableName);
           kclTestBase.startScheduler(scheduler);
   
           // Wait for specified time before shutting down - KCL applications are designed to run forever, however in this
           // example we will shut it down after APP_RUNTIME_SECONDS
           long startTime = System.currentTimeMillis();
           long endTime = startTime + (APP_RUNTIME_SECONDS * 1000);
           while (System.currentTimeMillis() < endTime) {
               try {
                   // Print and sleep every minute
                   Thread.sleep(SLEEP_INTERNAL_MS);
                   System.out.println("Application is running");
               } catch (InterruptedException e) {
                   System.out.println("Interrupted while waiting for records");
                   Thread.currentThread().interrupt();
                   break;
               }
           }
   
           // Stop the scheduler
           kclTestBase.shutdownScheduler();
           kclTestBase.deleteAllDdbTables(leaseTableName);
       }
   
       public static String generateUniqueApplicationName() {
           String timestamp = String.valueOf(System.currentTimeMillis());
           String randomString = java.util.UUID.randomUUID().toString().substring(0, 8);
           return String.format("KCL-App-%s-%s", timestamp, randomString);
       }
   }
   ```

## 最佳实践
<a name="cdc-kcl-best-practices"></a>

在 Amazon Keyspaces CDC 直播中使用 KCL 时，请遵循以下最佳实践：

**错误处理**  
在记录处理器中实现强大的错误处理，以优雅地处理异常。考虑为暂时失败实现重试逻辑。

**检查点频率**  
平衡检查点频率，最大限度地减少重复处理，同时确保合理的进度跟踪。过于频繁的检查点操作会影响性能，而如果工作人员出现故障，检查点操作频率过低可能会导致更多的重新处理。

**工作人员扩展**  
根据您的 CDC 数据流中的分片数量扩展工作人员的数量。一个好的起点是每个分片配置一个工作程序，但您可能需要根据自己的处理要求进行调整。

**监控**  
使用 KCL 提供的 CloudWatch 指标来监控您的消费者应用程序的运行状况和性能。关键指标包括处理延迟、检查点时间和租赁计数。

**测试**  
彻底测试您的消费者应用程序，包括工作程序故障、流重新分片和不同的负载条件等场景。

## 在非 Java 语言中使用 KCL
<a name="cdc-kcl-non-java"></a>

虽然 KCL 主要是一个 Java 库，但您可以通过将其与其他编程语言一起使用。 MultiLangDaemon MultiLangDaemon 是一个基于 Java 的守护程序，用于管理您的非 Java 记录处理器与 KCL 之间的交互。

KCL 为以下语言提供支持：
+ Python
+ Ruby
+ Node.js
+ .NET

有关在非 Java 语言中使用 KCL 的更多信息，请参阅 [K MultiLangDaemon ](https://github.com/awslabs/amazon-kinesis-client/tree/master/amazon-kinesis-client-multilang) CL 文档。

## 问题排查
<a name="cdc-kcl-troubleshooting"></a>

本节提供了在将 KCL 与 Amazon Keyspaces CDC 流配合使用时可能遇到的常见问题的解决方案。

**处理速度慢**  
如果您的消费者应用程序处理记录的速度很慢，请考虑：  
+ 增加工作器实例的数量
+ 优化您的记录处理逻辑
+ 检查下游系统中是否存在瓶颈

**重复处理**  
如果您看到重复处理记录，请检查检查点逻辑。确保在成功处理记录后进行检查点检查。

**工作人员失败**  
如果员工经常失败，请检查：  
+ 资源限制（CPU、内存）
+ 网络连接问题
+ 权限问题

**租赁表问题**  
如果您在使用 KCL 租赁表时遇到问题：  
+ 检查您的应用程序是否具有访问 Amazon Keyspaces 表的相应权限
+ 验证表是否有足够的预配置吞吐量