

# Working with change data capture (CDC) streams in Amazon Keyspaces
<a name="cdc"></a>

Amazon Keyspaces change data capture (CDC) records row-level change events from an Amazon Keyspaces table in near-real time. 

Amazon Keyspaces CDC enables event-driven use cases such as industrial IoT and fraud detection as well as data processing use cases like full-text search and data archival. The change events that Amazon Keyspaces CDC captures in streams can be consumed by downstream applications that perform business-critical functions such as data analytics, text search, ML training/inference, and continuous data backups for archival. For example, you can transfer stream data to AWS analytics and storage services like Amazon OpenSearch Service, Amazon Redshift, and Amazon S3 for further processing.

Amazon Keyspaces CDC offers time-ordered and de-duplicated change records for tables, with automatic scaling of data throughput and retention time of up to 24 hours. 

Amazon Keyspaces CDC streams are completely serverless, and you don't need to manage the data infrastructure for capturing change events. In addition, Amazon Keyspaces CDC doesn't consume any table capacity for either compute or storage. For more information, see [How change data capture (CDC) streams work in Amazon Keyspaces](cdc_how-it-works.md).

You can use the Amazon Keyspaces Streams API to build applications that consume Amazon Keyspaces CDC streams and take action based on the contents. For available endpoints, see [How to access CDC stream endpoints in Amazon Keyspaces](CDC_access-endpoints.md).

For a complete listing of all operations available for Amazon Keyspaces in the Streams API, see [https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html).

**Topics**
+ [How change data capture (CDC) streams work in Amazon Keyspaces](cdc_how-it-works.md)
+ [How to use change data capture (CDC) streams in Amazon Keyspaces](cdc_how-to-use.md)

# How change data capture (CDC) streams work in Amazon Keyspaces
<a name="cdc_how-it-works"></a>

This section provides an overview of how change data capture (CDC) streams work in Amazon Keyspaces. 

Amazon Keyspaces change data capture (CDC) records an ordered sequence of row-level modifications in Amazon Keyspaces tables and stores this information in a log called *stream* for up to 24 hours. Every row-level modification generates a new CDC record that holds the primary key column information as well as the “before” and “after” states of the row including all the columns. Applications can access the stream and view the mutations in near-real time.

When you enable CDC on your table, Amazon Keyspaces creates a new CDC stream and starts to capture information about every modification in the table. The CDC stream has an Amazon Resource Name (ARN) with the following format: 

```
arn:${Partition}:cassandra:{Region}:${Account}:/keyspace/${keyspaceName}/table/${tableName}/stream/${streamLabel}
```

You can select the type of information or the *view type* that the CDC stream collects for each record when you first enable the CDC stream. You can't change the view type of the stream afterward. Amazon Keyspaces supports the following view types:
+ `NEW_AND_OLD_IMAGES` – Captures the versions of the row before as well as after the mutation. This is the default.
+ `NEW_IMAGE` – Captures the version of the row after the mutation.
+ `OLD_IMAGE` – Captures the version of the row before the mutation.
+ `KEYS_ONLY` – Captures the partition and clustering keys of the row that was mutated.

Every CDC stream consists of records. Each record represents a single row modification in an Amazon Keyspaces table. Records are logically organized into groups known as *shards*. These groups are logically organized by ranges of the primary key (combination of partition key, clustering key ranges) and are an internal construct of Amazon Keyspaces. Each shard acts as a container for multiple records, and contains information required for accessing and iterating through these records.

![\[An Amazon Keyspaces CDC stream consists of shards that represent a CDC record of a collection of row mutations.\]](http://docs.aws.amazon.com/keyspaces/latest/devguide/images/keyspaces_cdc.png)


Each CDC record is assigned a sequence number, reflecting the order in which the record was published within the shard. The sequence number is guaranteed to be increasing and unique within each shard.

Amazon Keyspaces creates and deletes shards automatically. Based on traffic loads Amazon Keyspaces can also split or merge shards over time. For example, Amazon Keyspaces can split one shard into multiple new shards or merge shards into a new single shard. Amazon Keyspaces APIs publish the shard and CDC stream information to allow consuming applications to process records in the right order by accessing the entire lineage graph of a shard. 

Amazon Keyspaces CDC is based on the following principles that you can rely on when building your application:
+ Each row-level mutation record appears exactly once in the CDC stream.
+ When you consume shards in order of lineage, each row-level mutation record appears in the same sequence as the actual mutation order on the primary key.

**Topics**
+ [Data retention](#CDC_how-it-works-data-retention)
+ [TTL data expiration](#CDC_how-it-works-ttl)
+ [Batch operations](#CDC_how-it-works-batch-operations)
+ [Static columns](#CDC_how-it-works-static)
+ [Encryption at rest](#CDC_how-it-works-encryption)
+ [Multi-Region replication](#CDC_how-it-works-mrr)
+ [Integration with AWS services](#howitworks_integration)

## How data retention works for CDC streams in Amazon Keyspaces
<a name="CDC_how-it-works-data-retention"></a>

Amazon Keyspaces retains the records in the CDC stream for a period of 24 hours. You can't change the retention period. If you disable CDC on a table, the data in the stream continues to be readable for 24 hours. After this time, the data expires and the records are automatically deleted. 

## How Time to Live (TTL) data expiration works with CDC streams in Amazon Keyspaces
<a name="CDC_how-it-works-ttl"></a>

Amazon Keyspaces shows the expiration time at the column/cell level as well as the row level in a metadata field called `expirationTime` in the CDC change records. When Amazon Keyspaces TTL detects expiration of a cell, CDC creates a new change record that shows TTL as the origin of the change. For more information about TTL, see [Expire data with Time to Live (TTL) for Amazon Keyspaces (for Apache Cassandra)](TTL.md).

## How batch operations work for CDC streams in Amazon Keyspaces
<a name="CDC_how-it-works-batch-operations"></a>

Batch operations are internally divided into individual row-level modifications. Amazon Keyspaces retains all records within CDC streams at the row-level, even if the modification occurred in a batch operation. Amazon Keyspaces maintains the order of records within the CDC stream in the same sequence as the mutation order that occurred at the row-level or on the primary key.

## How static columns work in CDC streams in Amazon Keyspaces
<a name="CDC_how-it-works-static"></a>

Static column values are shared among all rows in a partition in Cassandra. Due to this behavior, Amazon Keyspaces captures any updates to a static column as a separate record in the CDC stream. The following examples summarize the behavior of static column mutations: 
+ When only the static column is updated, the CDC stream contains a row-modification for the static column as the only column in the row.
+ When a row is updated without any change to the static column, the CDC stream contains a row-modification that contains all columns except the static column.
+ When a row is updated along with the static column, the CDC stream contains two separate row-modifications, one for the static column and the other for the rest of the row. 

## How encryption at rest works for CDC streams in Amazon Keyspaces
<a name="CDC_how-it-works-encryption"></a>

To encrypt the data at rest in the CDC ordered log, Amazon Keyspaces uses the same encryption key that is already used for the table. For more information about encryption at rest, see [Encryption at rest in Amazon Keyspaces](EncryptionAtRest.md).

## How multi-Region replication works for CDC streams in Amazon Keyspaces
<a name="CDC_how-it-works-mrr"></a>

You can enable and disable CDC streams for individual replicas of a multi-Region table by using either the `update-table` API or the `ALTER TABLE` CQL command. Due to asynchronous replication and conflict resolution, CDC streams for multi-Region tables are not consistent across AWS Regions. Therefore, the records that Amazon Keyspaces captures in the stream might appear in a different order in different Regions.

For more information about multi-Region replication, see [Multi-Region replication for Amazon Keyspaces (for Apache Cassandra)](multiRegion-replication.md).

## CDC streams and integration with AWS services
<a name="howitworks_integration"></a>

### How to work with VPC endpoints for CDC streams in Amazon Keyspaces
<a name="CDC_how-it-works-vpc"></a>

You can use VPC endpoints to access Amazon Keyspaces CDC streams. For information about how to create and access VPC endpoints for streams, see [Using Amazon Keyspaces CDC streams with interface VPC endpoints](vpc-endpoints-streams.md).

### How monitoring with CloudWatch works for CDC streams in Amazon Keyspaces
<a name="CDC_how-it-works-monitoring"></a>

You can use Amazon CloudWatch to monitor API calls made to the Amazon Keyspaces CDC endpoint. For more information about the available metrics, see [Metrics for Amazon Keyspaces change data capture (CDC)](metrics-dimensions.md#keyspaces-cdc-metrics).

### How logging with CloudTrail works for CDC streams in Amazon Keyspaces
<a name="CDC_how-it-works-logging"></a>

Amazon Keyspaces CDC is integrated with AWS CloudTrail, a service that provides a record of actions taken by a user, role, or an AWS service in Amazon Keyspaces. CloudTrail captures Data Definition Language (DDL) API calls and Data Manipulation Language (DML) API calls for Amazon Keyspaces as events. The calls that are captured include calls from the Amazon Keyspaces console and programmatic calls to the Amazon Keyspaces API operations.

For more information about the CDC events captured by CloudTrail, see [Logging Amazon Keyspaces API calls with AWS CloudTrail](logging-using-cloudtrail.md).

### How tagging works for CDC streams in Amazon Keyspaces
<a name="CDC_how-it-works-tagging"></a>

Amazon Keyspaces CDC streams are a taggable resource. You can tag a stream when you create a table programmatically using CQL, the AWS SDK, or the AWS CLI. You can also tag existing streams, delete tags, or view tags of a stream. For more information, see [Tag keyspaces, tables, and streams in Amazon Keyspaces](Tagging.Operations.md).

# How to use change data capture (CDC) streams in Amazon Keyspaces
<a name="cdc_how-to-use"></a>

**Topics**
+ [Configure permissions](configure-cdc-permissions.md)
+ [Access CDC stream endpoints](CDC_access-endpoints.md)
+ [Enable a CDC stream for a new table](keyspaces-enable-cdc-new-table.md)
+ [Enable a CDC stream for an existing table](keyspaces-enable-cdc-alter-table.md)
+ [Disable a CDC stream](keyspaces-delete-cdc.md)
+ [View CDC streams](keyspaces-view-cdc.md)
+ [Access CDC streams](keyspaces-records-cdc.md)
+ [Use KCL for processing streams](cdc_how-to-use-kcl.md)

# Configure permissions to work with CDC streams in Amazon Keyspaces
<a name="configure-cdc-permissions"></a>

To enable CDC streams, the principal, for example an IAM user or role, needs the following permissions.

For more information about AWS Identity and Access Management, see [AWS Identity and Access Management for Amazon Keyspaces](security-iam.md).

## Permissions to enable a CDC stream for a table
<a name="cdc-permissions-enable"></a>

To enable a CDC stream for an Amazon Keyspaces table, the principal first needs permissions to create or alter a table and second the permissions to create the service linked role [AWSServiceRoleForAmazonKeyspacesCDC](using-service-linked-roles-CDC-streams.md#service-linked-role-permissions-CDC-streams). Amazon Keyspaces uses the service linked role to publish CloudWatch metrics into your account on your behalf

The following IAM policy is an example of this.

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement":[
        {
            "Effect":"Allow",
            "Action":[
                "cassandra:Create",
                "cassandra:CreateMultiRegionResource",
                "cassandra:Alter",
                "cassandra:AlterMultiRegionResource"
            ],
            "Resource":[
                "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/*",
                "arn:aws:cassandra:us-east-1:111122223333:/keyspace/system*"
            ]
        },
        {
            "Sid": "KeyspacesCDCServiceLinkedRole",
            "Effect": "Allow",
            "Action": "iam:CreateServiceLinkedRole",
            "Resource": "arn:aws:iam::*:role/aws-service-role/cassandra-streams.amazonaws.com/AWSServiceRoleForAmazonKeyspacesCDC",
            "Condition": {
              "StringLike": {
                "iam:AWSServiceName": "cassandra-streams.amazonaws.com"
              }
            }
        }
    ]
}
```

To disable a stream, only `ALTER TABLE` permissions are required.

## Permissions to view a CDC stream
<a name="cdc-permissions-view"></a>

To view or list CDC streams, the principal needs read permissions for the system keyspace. For more information, see [`system_schema_mcs`](working-with-keyspaces.md#keyspace_system_schema_mcs).

The following IAM policy is an example of this.

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":"cassandra:Select",
         "Resource":[
             "arn:aws:cassandra:us-east-1:111122223333:/keyspace/system*"
         ]
      }
   ]
}
```

To view or list CDC streams with the AWS CLI or the Amazon Keyspaces API, the principal needs additional permissions for the actions `cassandra:ListStreams` and `cassandra:GetStream`.

The following IAM policy is an example of this.

```
{
  "Version": "2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "cassandra:Select",
        "cassandra:ListStreams",
        "cassandra:GetStream"
      ],
      "Resource": "*"
    }
  ]
}
```

## Permissions to read a CDC stream
<a name="cdc-permissions-read"></a>

To read CDC streams, the principal needs the following permissions.

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "cassandra:GetStream",
            "cassandra:GetShardIterator",
            "cassandra:GetRecords"
         ],
         "Resource":[
            "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label"
         ]
      }
   ]
}
```

## Permissions to process Amazon Keyspaces CDC streams with the Kinesis Client Library (KCL)
<a name="cdc-permissions-kcl"></a>

To process Amazon Keyspaces CDC streams with KCL, the IAM principal needs the following permissions. 
+ `Amazon Keyspaces` – Read-only access to a specified Amazon Keyspaces CDC stream.
+ `DynamoDB` – Permissions to create `shard lease` tables, read and write access to the tables, and read-access to the index as required for KCL stream processing.
+ `CloudWatch` – Permissions to publish metric data from Amazon Keyspaces CDC streams processing with KCL into the namespace of your KCL client application in your CloudWatch account. For more information about monitoring, see [Monitor the Kinesis Client Library with Amazon CloudWatch](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html).

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "cassandra:GetStream",
            "cassandra:GetShardIterator",
            "cassandra:GetRecords"
         ],
         "Resource":[
            "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:CreateTable",
            "dynamodb:DescribeTable",
            "dynamodb:UpdateTable",
            "dynamodb:GetItem",
            "dynamodb:UpdateItem",
            "dynamodb:PutItem",
            "dynamodb:DeleteItem",
            "dynamodb:Scan"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:CreateTable",
            "dynamodb:DescribeTable",
            "dynamodb:GetItem",
            "dynamodb:UpdateItem",
            "dynamodb:PutItem",
            "dynamodb:DeleteItem",
            "dynamodb:Scan"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME-WorkerMetricStats",
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME-CoordinatorState"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:Query"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME/index/*"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "cloudwatch:PutMetricData"
         ],
         "Resource":"*"
      }
   ]
}
```

# How to access CDC stream endpoints in Amazon Keyspaces
<a name="CDC_access-endpoints"></a>

Amazon Keyspaces maintains separate [endpoints](programmatic.endpoints.md#global_endpoints) for keyspaces/tables and for CDC streams in each AWS Region where Amazon Keyspaces is available. To access a CDC stream, select the Region of the table and replace the `cassandra` prefix with `cassandra-streams` in the endpoint name as shown in the following example:

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/keyspaces/latest/devguide/CDC_access-endpoints.html)

The following table contains a complete list of available public endpoints for Amazon Keyspaces change data capture streams. Amazon Keyspaces CDC streams supports both IPv4 and IPv6. All public endpoints, for example `cassandra-streams.us-east-1.api.aws`, are dual stack endpoints that can be configured for IPv4 and IPv6. 

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/keyspaces/latest/devguide/CDC_access-endpoints.html)

# Enable a CDC stream when creating a new table in Amazon Keyspaces
<a name="keyspaces-enable-cdc-new-table"></a>

To enable a CDC stream when you create a table, you can use the `CREATE TABLE` statement in CQL or the `create-table` command with the AWS CLI. 

For each changed row in the table, Amazon Keyspaces can capture the following changes based on the `view_type` of the `cdc_specification` you select:
+ `NEW_AND_OLD_IMAGES` – both versions of the row, before and after the change. This is the default.
+ `NEW_IMAGE` – the version of the row after the change.
+ `OLD_IMAGE` – the version of the row before the change.
+ `KEYS_ONLY` – the partition and clustering keys of the row that was changed.

For information about how to tag a stream, see [Add tags to a new stream when creating a table](Tagging.Operations.new.table.stream.md).

**Note**  
Amazon Keyspaces CDC requires the presence of a service-linked role (`AWSServiceRoleForAmazonKeyspacesCDC`) that publishes metric data from Amazon Keyspaces CDC streams into the `"cloudwatch:namespace": "AWS/Cassandra"` in your CloudWatch account on your behalf. This role is created automatically for you. For more information, see [Using roles for Amazon Keyspaces CDC streams](using-service-linked-roles-CDC-streams.md).

------
#### [ Cassandra Query Language (CQL) ]

**Enable a CDC stream when you create a table with CQL**

1. 

   ```
   CREATE TABLE mykeyspace.mytable (a text, b text, PRIMARY KEY(a)) 
   WITH CUSTOM_PROPERTIES={'cdc_specification': {'view_type': 'NEW_IMAGE'}} AND CDC = TRUE;
   ```

1. To confirm the stream settings, you can use the following statement.

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   The output of that statement should look similar to this.

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';keyspace_name | table_name | cdc  | custom_properties
   ---------------+------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
       mykeyspace |   mytable  | True | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741383893782', 'throughput_mode': 'PAY_PER_REQUEST'}, 'cdc_specification': {'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2025-03-07T21:44:53.783', 'status': 'ENABLED', 'view_type': 'NEW_IMAGE'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}>
   ```

------
#### [ CLI ]

**Enable a CDC stream when you create a table with the AWS CLI**

1. To create a stream you can use the following syntax. 

   ```
   aws keyspaces create-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --schema-definition 'allColumns=[{name=a,type=text},{name=b,type=text}],partitionKeys=[{name=a}]' \
   --cdc-specification status=ENABLED,viewType=NEW_IMAGE
   ```

1. The output of that command shows the standard `create-table` response and looks similar to this example. 

   ```
   { "resourceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable" }
   ```

------

# Enable a CDC stream for an existing table in Amazon Keyspaces
<a name="keyspaces-enable-cdc-alter-table"></a>

To enable a CDC stream for an existing table, you can use the `ALTER TABLE` statement in CQL, the `update-table` command with the AWS CLI, or you can use the console.

For each changed row in the table, Amazon Keyspaces can capture the following changes based on the `view_type` of the `cdc_specification` you select:
+ `NEW_AND_OLD_IMAGES` – both versions of the row, before and after the change. This is the default.
+ `NEW_IMAGE` – the version of the row after the change.
+ `OLD_IMAGE` – the version of the row before the change.
+ `KEYS_ONLY` – the partition and clustering keys of the row that was changed.

For information about how to tag a stream, see [Add new tags to a stream](Tagging.Operations.existing.stream.md).

**Note**  
Amazon Keyspaces CDC requires the presence of a service-linked role (`AWSServiceRoleForAmazonKeyspacesCDC`) that publishes metric data from Amazon Keyspaces CDC streams into the `"cloudwatch:namespace": "AWS/Cassandra"` in your CloudWatch account on your behalf. This role is created automatically for you. For more information, see [Using roles for Amazon Keyspaces CDC streams](using-service-linked-roles-CDC-streams.md).

------
#### [ Cassandra Query Language (CQL) ]

**Enable a stream (CDC stream) with CQL**

You can use `ALTER TABLE` to enable a stream for an existing table.

1. The following example creates a stream that only captures changes to partition and clustering keys of a changed row.

   ```
   ALTER TABLE mykeyspace.mytable
   WITH cdc = TRUE
   AND CUSTOM_PROPERTIES={'cdc_specification': {'view_type': 'KEYS_ONLY'}};
   ```

1. To verify the stream settings, you can use the following statement.

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   The output of the statement looks similar to this.

   ```
    keyspace_name | table_name | cdc  | custom_properties
   ---------------+------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
       mykeyspace |    mytable | True | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741385897045', 'throughput_mode': 'PAY_PER_REQUEST'}, 'cdc_specification': {'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2025-03-07T22:20:10.454', 'status': 'ENABLED', 'view_type': 'KEYS_ONLY'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}
   ```

------
#### [ CLI ]

**Create a CDC stream with the AWS CLI**

1. To create a stream for an existing table you can use the following syntax.

   ```
   aws keyspaces update-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --cdc-specification status=ENABLED,viewType=NEW_AND_OLD_IMAGES
   ```

1. The output of that command shows the standard `create-table` response and looks similar to this example.

   ```
   { "resourceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable" }
   ```

------
#### [ Console ]

**Enable a CDC stream with the Amazon Keyspaces console**

1. Sign in to the AWS Management Console, and open the Amazon Keyspaces console at [https://console.aws.amazon.com/keyspaces/home](https://console.aws.amazon.com/keyspaces/home).

1. In the navigation pane, choose **Tables**, and then choose a table from the list.

1. Choose the **Streams** tab.

1. Choose **Edit** to enable a stream.

1. Select **Turn on streams**.

1. Choose **View type** of the stream. The following options are available. Note that you can't change the view type of a stream after it's been created.
   + **New and old images** – Amazon Keyspaces captures both versions of the row, before and after the change. This is the default.
   + **New image** – Amazon Keyspaces captures only the version of the row after the change.
   + **Old image** – Amazon Keyspaces captures only the version of the row before the change.
   + **Primary key only** – Amazon Keyspaces captures only the partition and clustering key columns of the row that was changed.

1. To finish, choose **Save changes**.

------

# Disable a CDC stream in Amazon Keyspaces
<a name="keyspaces-delete-cdc"></a>

To disable a CDC stream in a keyspace, you can use the `ALTER TABLE` statement in CQL, the `update-table` command with the AWS CLI, or the console.

------
#### [ Cassandra Query Language (CQL) ]

**Disable a stream (CDC stream) with CQL**

1. To disable a stream, you can use the following statement.

   ```
   ALTER TABLE mykeyspace.mytable
   WITH cdc = FALSE;
   ```

1. To confirm that the stream is disabled, you can use the following statement.

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   The output of that statement looks similar to this.

   ```
    keyspace_name | table_name | cdc   | custom_properties
   ---------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      mykeyspace  |   mytable  | False | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741385668642', 'throughput_mode': 'PAY_PER_REQUEST'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}
   ```

------
#### [ CLI ]

**Disable a stream (CDC stream) with the AWS CLI**

1. To disable a stream, you can use the following command.

   ```
   aws keyspaces update-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --cdc-specification status=DISABLED
   ```

1. The output of the command looks similar to this example.

   ```
   {
       "keyspaceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/",
       "streamName": "my_stream"
   }
   ```

------
#### [ Console ]

**Disable a stream (CDC stream) with the Amazon Keyspaces console**

1. Sign in to the AWS Management Console, and open the Amazon Keyspaces console at [https://console.aws.amazon.com/keyspaces/home](https://console.aws.amazon.com/keyspaces/home).

1. In the navigation pane, choose **Tables**, and then choose a table from the list.

1. Choose the **Streams** tab.

1. Choose **Edit**.

1. Unselect **Turn on streams**. 

1. Choose **Save changes** to disable the stream.

------

# View CDC streams in Amazon Keyspaces
<a name="keyspaces-view-cdc"></a>

To view or list all streams in keyspace, you can query the table `system_schema_mcs.streams` in the system keyspace using a statement in CQL, or use the `get-stream` and `list-stream` commands with the AWS CLI, or the console.

For the required permissions, see [Configure permissions to work with CDC streams in Amazon Keyspaces](configure-cdc-permissions.md).

------
#### [ Cassandra Query Language (CQL) ]

**View CDC streams with CQL**
+ To monitor the CDC status of your table, you can use the following statement.

  ```
  SELECT custom_properties
  FROM system_schema_mcs.tables 
  WHERE keyspace_name='my_keyspace' and table_name='my_table';
  ```

  The output of the command looks similar to this.

  ```
  ...
  custom_properties
  ----------------------------------------------------------------------------------
  {'cdc_specification':{'status': 'Enabled', 'view_type': 'NEW_IMAGE', 'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label''}}
  ...
  ```

------
#### [ CLI ]

**View CDC streams with the AWS CLI**

1. This example shows how to see the stream information for a table.

   ```
   aws keyspaces get-table \
   --keyspace-name 'my_keyspace' \
   --table-name 'my_table'
   ```

   The output of the command looks like this.

   ```
   {
       "keyspaceName": "my_keyspace",
       "tableName": "my_table",
       ... Other fields ...,
       "latestStreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label",
       "cdcSpecification": {
           "status": "ENABLED",
           "viewType": "NEW_AND_OLD_IMAGES"    
       }
   }
   ```

1. You can list all streams in your account in a specified AWS Region. The following command is an example of this.

   ```
   aws keyspacesstreams list-streams --region us-east-1
   ```

   The output of the command could look similar to this.

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t1",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"Create a keyspace with the name catalog. Note
                                   that streams are not supported in multi-Region keyspaces.
               "TableName": "t2",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_2/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_3"
               "TableName": "t1",
           }
       ]
   }
   ```

1. You can also list the CDC streams for a given keyspace using the following parameters. 

   ```
   aws keyspacesstreams list-streams --keyspace-name ks_1 --region us-east-1
   ```

   The output of the command looks similar to this.

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t1",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t2",
           }
       ]
   }
   ```

1. You can also list the CDC streams for a given table using the following parameters. 

   ```
   aws keyspacesstreams list-streams --keyspace-name ks_1 --table-name t2 --region us-east-1
   ```

   The output of the command looks similar to this.

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t2",
           }
       ]
   }
   ```

------
#### [ Console ]

**View CDC streams in the Amazon Keyspaces console**

1. Sign in to the AWS Management Console, and open the Amazon Keyspaces console at [https://console.aws.amazon.com/keyspaces/home](https://console.aws.amazon.com/keyspaces/home).

1. In the navigation pane, choose **Tables**, and then choose a table from the list.

1. Choose the **Streams** tab to review the stream details.

------

# Access records in CDC streams in Amazon Keyspaces
<a name="keyspaces-records-cdc"></a>

To access the records in a stream, you use the [Amazon Keyspaces Streams API](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html). The following section contains examples on how to access records using the AWS CLI.

For the required permissions, see [Configure permissions to work with CDC streams in Amazon Keyspaces](configure-cdc-permissions.md).

**Access records in a stream using the AWS CLI**

1. You can use the Amazon Keyspaces Streams API to access the change records of the stream. For more information, see [https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html). To retrieve the shards within the stream, you can use the `get-stream` API as shown in the following example.

   ```
   aws keyspacesstreams get-stream \
   --stream-arn 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/STREAM_LABEL'
   ```

   The following is an example of the output.

   ```
   {
      "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2023-05-11T21:21:33.291",
      "StreamStatus": "ENABLED",
      "StreamViewType": "NEW_AND_OLD_IMAGES",
      "CreationRequestDateTime": "<CREATION_TIME>",
      "KeyspaceName": "mykeyspace",
      "TableName": "mytable",
      "StreamLabel": "2023-05-11T21:21:33.291",
       "Shards": [
           {
               "SequenceNumberRange": {
                   "EndingSequenceNumber": "<END_SEQUENCE_NUMBER>",
                   "StartingSequenceNumber": "<START_SEQUENCE_NUMBER>"
               },
               "ShardId": "<SHARD_ID>"
           },
       ]
   }
   ```

1. To retrieve records from the stream, you start with getting an iterator that provides you with the starting point for accessing records. To do this, you can use the shards within the CDC stream returned by the API in the previous step. To gather the iterator, you can use the `get-shard-iterator` API. For this example, you use an iterator of type `TRIM_HORIZON` that retrieves from the last trimmed point or beginning) of the shard.

   ```
   aws keyspacesstreams get-shard-iterator \
   --stream-arn 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/STREAM_LABEL' \
   --shard-id 'SHARD_ID' \
   --shard-iterator-type 'TRIM_HORIZON'
   ```

   The output of the command looks like in the following example.

   ```
   {
       "ShardIterator": "<SHARD_ITERATOR>" 
   }
   ```

1. To retrieve the CDC records using the `get-records` API, you can use the iterator returned in the last step. The following command is an example of this.

   ```
   aws keyspacesstreams get-records \
   --shard-iterator 'SHARD_ITERATOR' \
   --limit 100
   ```

# Use the Kinesis Client Library (KCL) to process Amazon Keyspaces streams
<a name="cdc_how-to-use-kcl"></a>

This topic describes how to use the Kinesis Client Library (KCL) to consume and process data from Amazon Keyspaces change data capture (CDC) streams.

Instead of working directly with the Amazon Keyspaces Streams API, working with the Kinesis Client Library (KCL) provides many benefits, for example:
+ Built in shard lineage tracking and iterator handling. 
+ Automatic load balancing across workers.
+ Fault tolerance and recovery from worker failures.
+ Checkpointing to track processing progress.
+ Adaptation to changes in stream capacity.
+ Simplified distributed computing for processing CDC records.

The following section outlines why and how to use the Kinesis Client Library (KCL) to process streams and provides an example for processing an Amazon Keyspaces CDC stream with the KCL.

For information about pricing, see [Amazon Keyspaces (for Apache Cassandra) pricing](https://aws.amazon.com/keyspaces/pricing).

## What is the Kinesis Client Library?
<a name="cdc-kcl-what-is"></a>

The Kinesis Client Library (KCL) is a standalone Java software library designed to simplify the process of consuming and processing data from streams. KCL handles many of the complex tasks associated with distributed computing, letting you focus on implementing your business logic when processing stream data. KCL manages activities such as load balancing across multiple workers, responding to worker failures, checkpointing processed records, and responding to changes in the number of shards in the stream.

To process Amazon Keyspaces CDC streams, you can use the design patterns found in the KCL for working with stream shards and stream records. The KCL simplifies coding by providing useful abstractions above the low-level Kinesis Data Streams API. For more information about the KCL, see [Develop consumers with KCL](https://docs.aws.amazon.com/kinesis/latest/dev/develop-kcl-consumers.html) in the *Amazon Kinesis Data Streams Developer Guide*.

 To write applications using the KCL, you use the Amazon Keyspaces Streams Kinesis Adapter. The Kinesis Adapter implements the Kinesis Data Streams interface so that you can use the KCL for consuming and processing records from Amazon Keyspaces streams. For instructions on how to set up and install the Amazon Keyspaces streams Kinesis adapter, visit the [GitHub](https://github.com/aws/keyspaces-streams-kinesis-adapter) repository.

The following diagram shows how these libraries interact with each other.

![\[Interaction between a client applications and Kinesis Data Streams, KCL, the Amazon Keyspaces Streams Kinesis Adapter, and Amazon Keyspaces APIs when processing Amazon Keyspaces CDC stream records.\]](http://docs.aws.amazon.com/keyspaces/latest/devguide/images/keyspaces-streams-kinesis-adapter.png)


KCL is frequently updated to incorporate newer versions of underlying libraries, security improvements, and bug fixes. We recommend that you use the latest version of KCL to avoid known issues and benefit from all latest improvements. To find the latest KCL version, see [KCL GitHub repository](https://github.com/awslabs/amazon-kinesis-client).

## KCL concepts
<a name="cdc-kcl-concepts"></a>

Before you implement a consumer application using KCL, you should understand the following concepts:

**KCL consumer application**  
A KCL consumer application is a program that processes data from an Amazon Keyspaces CDC stream. The KCL acts as an intermediary between your consumer application code and the Amazon Keyspaces CDC stream.

**Worker**  
A worker is an execution unit of your KCL consumer application that processes data from the Amazon Keyspaces CDC stream. Your application can run multiple workers distributed across multiple instances.

**Record processor**  
A record processor is the logic in your application that processes data from a shard in the Amazon Keyspaces CDC stream. A record processor is instantiated by a worker for each shard it manages.

**Lease**  
A lease represents the processing responsibility for a shard. Workers use leases to coordinate which worker is processing which shard. KCL stores lease data in a table in Amazon DynamoDB.

**Checkpoint**  
A checkpoint is a record of the position in the shard up to which the record processor has successfully processed records. Checkpointing enables your application to resume processing from where it left off if a worker fails.

With the Amazon Keyspaces Kinesis adapter in place, you can begin developing against the KCL interface, with the API calls seamlessly directed at the Amazon Keyspaces stream endpoint. For a list of available endpoints, see [How to access CDC stream endpoints in Amazon Keyspaces](CDC_access-endpoints.md).

When your application starts, it calls the KCL to instantiate a worker. You must provide the worker with configuration information for the application, such as the stream descriptor and AWS credentials, and the name of a record processor class that you provide. As it runs the code in the record processor, the worker performs the following tasks:
+ Connects to the stream
+ Enumerates the shards within the stream
+ Coordinates shard associations with other workers (if any)
+ Instantiates a record processor for every shard it manages
+ Pulls records from the stream
+ Pushes the records to the corresponding record processor
+ Checkpoints processed records
+ Balances shard-worker associations when the worker instance count changes
+ Balances shard-worker associations when shards are split

# Implement a KCL consumer application for Amazon Keyspaces CDC streams
<a name="cdc-kcl-implementation"></a>

This topic provides a step-by-step guide to implementing a KCL consumer application to process Amazon Keyspaces CDC streams.

1. Prerequisites: Before you begin, ensure you have:
   + An Amazon Keyspaces table with a CDC stream
   + Required IAM permissions for the IAM principal to access the Amazon Keyspaces CDC stream, create and access DynamoDB tables for KCL stream processing, and permissions to publish metrics to CloudWatch. For more information and a policy example, see [Permissions to process Amazon Keyspaces CDC streams with the Kinesis Client Library (KCL)](configure-cdc-permissions.md#cdc-permissions-kcl).
   + Ensure that valid AWS credentials are set up in your local configuration. For more information, see [Store access keys for programmatic access](aws.credentials.manage.md).
   + Java Development Kit (JDK) 8 or later
   + Requirements listed in the [Readme](https://github.com/aws/keyspaces-streams-kinesis-adapter) on Github.

1. <a name="cdc-kcl-add-dependencies"></a>In this step, you add the KCL dependency to your project. For Maven, add the following to your pom.xml:

   ```
   <dependencies>
           <dependency>
               <groupId>software.amazon.kinesis</groupId>
               <artifactId>amazon-kinesis-client</artifactId>
               <version>3.1.0</version>
           </dependency>
           <dependency>
               <groupId>software.amazon.keyspaces</groupId>
               <artifactId>keyspaces-streams-kinesis-adapter</artifactId>
               <version>1.0.0</version>
           </dependency>
       </dependencies>
   ```
**Note**  
Always check for the latest version of KCL at the [KCL GitHub repository](https://github.com/awslabs/amazon-kinesis-client).

1. <a name="cdc-kcl-factory"></a>Create a factory class that produces record processor instances:

   ```
   import software.amazon.awssdk.services.keyspacesstreams.model.Record;
   import software.amazon.keyspaces.streamsadapter.adapter.KeyspacesStreamsClientRecord;
   import software.amazon.keyspaces.streamsadapter.model.KeyspacesStreamsProcessRecordsInput;
   import software.amazon.keyspaces.streamsadapter.processor.KeyspacesStreamsShardRecordProcessor;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
   import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
   import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
   import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
   
   public class RecordProcessor implements KeyspacesStreamsShardRecordProcessor {
       private String shardId;
   
       @Override
       public void initialize(InitializationInput initializationInput) {
           this.shardId = initializationInput.shardId();
           System.out.println("Initializing record processor for shard: " + shardId);
       }
   
       @Override
       public void processRecords(KeyspacesStreamsProcessRecordsInput processRecordsInput) {
           try {
               for (KeyspacesStreamsClientRecord record : processRecordsInput.records()) {
                   Record keyspacesRecord = record.getRecord();
                   System.out.println("Received record: " + keyspacesRecord);
               }
   
               if (!processRecordsInput.records().isEmpty()) {
                   RecordProcessorCheckpointer checkpointer = processRecordsInput.checkpointer();
                   try {
                       checkpointer.checkpoint();
                       System.out.println("Checkpoint successful for shard: " + shardId);
                   } catch (Exception e) {
                       System.out.println("Error while checkpointing for shard: " + shardId + " " + e);
                   }
               }
           } catch (Exception e) {
               System.out.println("Error processing records for shard: " + shardId + " " + e);
           }
       }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
           System.out.println("Lease lost for shard: " + shardId);
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           System.out.println("Shard ended: " + shardId);
           try {
               // This is required. Checkpoint at the end of the shard
               shardEndedInput.checkpointer().checkpoint();
               System.out.println("Final checkpoint successful for shard: " + shardId);
           } catch (Exception e) {
               System.out.println("Error while final checkpointing for shard: " + shardId + " " + e);
               throw new RuntimeException("Error while final checkpointing", e);
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           System.out.println("Shutdown requested for shard " + shardId);
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (Exception e) {
               System.out.println("Error while checkpointing on shutdown for shard: " + shardId + " " + e);
           }
       }
   }
   ```

1. <a name="cdc-kcl-record-factory"></a>Create a record factory as shown in the following example.

   ```
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   import java.util.Queue;
   import java.util.concurrent.ConcurrentLinkedQueue;
   
   public class RecordProcessorFactory implements ShardRecordProcessorFactory {
       private final Queue<RecordProcessor> processors = new ConcurrentLinkedQueue<>();
   
       @Override
       public ShardRecordProcessor shardRecordProcessor() {
           System.out.println("Creating new RecordProcessor");
           RecordProcessor processor = new RecordProcessor();
           processors.add(processor);
           return processor;
       }
   }
   ```

1. <a name="cdc-kcl-consumer"></a>In this step you create the base class to configure KCLv3 and the Amazon Keyspaces adapter.

   ```
   import com.example.KCLExample.utils.RecordProcessorFactory;
   import software.amazon.keyspaces.streamsadapter.AmazonKeyspacesStreamsAdapterClient;
   import software.amazon.keyspaces.streamsadapter.StreamsSchedulerFactory;
   import java.util.Arrays;
   import java.util.List;
   import java.util.concurrent.ExecutionException;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
   import software.amazon.awssdk.services.dynamodb.model.DeleteTableResponse;
   import software.amazon.awssdk.services.keyspacesstreams.KeyspacesStreamsClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   import software.amazon.kinesis.coordinator.CoordinatorConfig;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.leases.LeaseManagementConfig;
   import software.amazon.kinesis.processor.ProcessorConfig;
   import software.amazon.kinesis.processor.StreamTracker;
   import software.amazon.kinesis.retrieval.polling.PollingConfig;
   
   public class KCLTestBase {
   
       protected KeyspacesStreamsClient streamsClient;
       protected KinesisAsyncClient adapterClient;
       protected DynamoDbAsyncClient dynamoDbAsyncClient;
       protected CloudWatchAsyncClient cloudWatchClient;
       protected Region region;
       protected RecordProcessorFactory recordProcessorFactory;
       protected Scheduler scheduler;
       protected Thread schedulerThread;
   
       public void baseSetUp() {
           recordProcessorFactory = new RecordProcessorFactory();
           setupKCLBase();
       }
   
       protected void setupKCLBase() {
           region = Region.US_EAST_1;
   
           streamsClient = KeyspacesStreamsClient.builder()
                   .region(region)
                   .build();
           adapterClient = new AmazonKeyspacesStreamsAdapterClient(
                   streamsClient,
                   region);
           dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                   .region(region)
                   .build();
           cloudWatchClient = CloudWatchAsyncClient.builder()
                   .region(region)
                   .build();
       }
   
       protected void startScheduler(Scheduler scheduler) {
           this.scheduler = scheduler;
           schedulerThread = new Thread(() -> scheduler.run());
           schedulerThread.start();
       }
   
       protected void shutdownScheduler() {
           if (scheduler != null) {
               scheduler.shutdown();
               try {
                   schedulerThread.join(30000);
               } catch (InterruptedException e) {
                   System.out.println("Error while shutting down scheduler " + e);
               }
           }
       }
   
       protected Scheduler createScheduler(String streamArn, String leaseTableName) {
           String workerId = "worker-" + System.currentTimeMillis();
   
           // Create ConfigsBuilder
           ConfigsBuilder configsBuilder = createConfigsBuilder(streamArn, workerId, leaseTableName);
   
           // Configure retrieval config for polling
           PollingConfig pollingConfig = new PollingConfig(streamArn, adapterClient);
   
           // Create the Scheduler
           return StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   configsBuilder.coordinatorConfig(),
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig),
                   streamsClient,
                   region
           );
       }
   
       private ConfigsBuilder createConfigsBuilder(String streamArn, String workerId, String leaseTableName) {
           ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamArn,
                   leaseTableName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchClient,
                   workerId,
                   recordProcessorFactory);
   
           configureCoordinator(configsBuilder.coordinatorConfig());
           configureLeaseManagement(configsBuilder.leaseManagementConfig());
           configureProcessor(configsBuilder.processorConfig());
           configureStreamTracker(configsBuilder, streamArn);
   
           return configsBuilder;
       }
   
       private void configureCoordinator(CoordinatorConfig config) {
           config.skipShardSyncAtWorkerInitializationIfLeasesExist(true)
                   .parentShardPollIntervalMillis(1000)
                   .shardConsumerDispatchPollIntervalMillis(500);
       }
   
       private void configureLeaseManagement(LeaseManagementConfig config) {
           config.shardSyncIntervalMillis(0)
                   .leasesRecoveryAuditorInconsistencyConfidenceThreshold(0)
                   .leasesRecoveryAuditorExecutionFrequencyMillis(5000)
                   .leaseAssignmentIntervalMillis(1000L);
       }
   
       private void configureProcessor(ProcessorConfig config) {
           config.callProcessRecordsEvenForEmptyRecordList(true);
       }
   
       private void configureStreamTracker(ConfigsBuilder configsBuilder, String streamArn) {
           StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
                   streamArn,
                   InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
           );
           configsBuilder.streamTracker(streamTracker);
       }
   
       public void deleteAllDdbTables(String baseTableName) {
           List<String> tablesToDelete = Arrays.asList(
                   baseTableName,
                   baseTableName + "-CoordinatorState",
                   baseTableName + "-WorkerMetricStats"
           );
   
           for (String tableName : tablesToDelete) {
               deleteTable(tableName);
           }
       }
   
       private void deleteTable(String tableName) {
           DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                   .tableName(tableName)
                   .build();
   
           try {
               DeleteTableResponse response = dynamoDbAsyncClient.deleteTable(deleteTableRequest).get();
               System.out.println("Table deletion response " + response);
           } catch (InterruptedException | ExecutionException e) {
               System.out.println("Error deleting table: " + tableName + " " + e);
           }
       }
   }
   ```

1. <a name="cdc-kcl-record-processor"></a>In this step you implement the record processor class for your application to start processing change events.

   ```
    import software.amazon.kinesis.coordinator.Scheduler;
   
   public class KCLTest {
   
       private static final int APP_RUNTIME_SECONDS = 1800;
       private static final int SLEEP_INTERNAL_MS = 60*1000;
   
       public static void main(String[] args) {
           KCLTestBase kclTestBase;
   
           kclTestBase = new KCLTestBase();
           kclTestBase.baseSetUp();
   
           // Create and start scheduler
           String leaseTableName = generateUniqueApplicationName();
   
           // Update below to your Stream ARN
           String streamArn = "arn:aws:cassandra:us-east-1:759151643516:/keyspace/cdc_sample_test/table/test_kcl_bool/stream/2025-07-01T15:52:57.529";
           Scheduler scheduler = kclTestBase.createScheduler(streamArn, leaseTableName);
           kclTestBase.startScheduler(scheduler);
   
           // Wait for specified time before shutting down - KCL applications are designed to run forever, however in this
           // example we will shut it down after APP_RUNTIME_SECONDS
           long startTime = System.currentTimeMillis();
           long endTime = startTime + (APP_RUNTIME_SECONDS * 1000);
           while (System.currentTimeMillis() < endTime) {
               try {
                   // Print and sleep every minute
                   Thread.sleep(SLEEP_INTERNAL_MS);
                   System.out.println("Application is running");
               } catch (InterruptedException e) {
                   System.out.println("Interrupted while waiting for records");
                   Thread.currentThread().interrupt();
                   break;
               }
           }
   
           // Stop the scheduler
           kclTestBase.shutdownScheduler();
           kclTestBase.deleteAllDdbTables(leaseTableName);
       }
   
       public static String generateUniqueApplicationName() {
           String timestamp = String.valueOf(System.currentTimeMillis());
           String randomString = java.util.UUID.randomUUID().toString().substring(0, 8);
           return String.format("KCL-App-%s-%s", timestamp, randomString);
       }
   }
   ```

## Best practices
<a name="cdc-kcl-best-practices"></a>

Follow these best practices when using KCL with Amazon Keyspaces CDC streams:

**Error handling**  
Implement robust error handling in your record processor to handle exceptions gracefully. Consider implementing retry logic for transient failures.

**Checkpointing frequency**  
Balance checkpointing frequency to minimize duplicate processing while ensuring reasonable progress tracking. Too frequent checkpointing can impact performance, while too infrequent checkpointing can lead to more reprocessing if a worker fails.

**Worker scaling**  
Scale the number of workers based on the number of shards in your CDC stream. A good starting point is to have one worker per shard, but you may need to adjust based on your processing requirements.

**Monitoring**  
Use CloudWatch metrics provided by KCL to monitor the health and performance of your consumer application. Key metrics include processing latency, checkpoint age, and lease counts.

**Testing**  
Test your consumer application thoroughly, including scenarios like worker failures, stream resharding, and varying load conditions.

## Using KCL with non-Java languages
<a name="cdc-kcl-non-java"></a>

While KCL is primarily a Java library, you can use it with other programming languages through the MultiLangDaemon. The MultiLangDaemon is a Java-based daemon that manages the interaction between your non-Java record processor and the KCL.

KCL provides support for the following languages:
+ Python
+ Ruby
+ Node.js
+ .NET

For more information about using KCL with non-Java languages, see the [KCL MultiLangDaemon documentation](https://github.com/awslabs/amazon-kinesis-client/tree/master/amazon-kinesis-client-multilang).

## Troubleshooting
<a name="cdc-kcl-troubleshooting"></a>

This section provides solutions to common issues you might encounter when using KCL with Amazon Keyspaces CDC streams.

**Slow processing**  
If your consumer application is processing records slowly, consider:  
+ Increasing the number of worker instances
+ Optimizing your record processing logic
+ Checking for bottlenecks in downstream systems

**Duplicate processing**  
If you're seeing duplicate processing of records, check your checkpointing logic. Ensure you're checkpointing after successfully processing records.

**Worker failures**  
If workers are failing frequently, check:  
+ Resource constraints (CPU, memory)
+ Network connectivity issues
+ Permissions issues

**Lease table issues**  
If you're experiencing issues with the KCL lease table:  
+ Check that your application has appropriate permissions to access the Amazon Keyspaces table
+ Verify that the table has sufficient provisioned throughput