

# How to use change data capture (CDC) streams in Amazon Keyspaces
<a name="cdc_how-to-use"></a>

**Topics**
+ [Configure permissions](configure-cdc-permissions.md)
+ [Access CDC stream endpoints](CDC_access-endpoints.md)
+ [Enable a CDC stream for a new table](keyspaces-enable-cdc-new-table.md)
+ [Enable a CDC stream for an existing table](keyspaces-enable-cdc-alter-table.md)
+ [Disable a CDC stream](keyspaces-delete-cdc.md)
+ [View CDC streams](keyspaces-view-cdc.md)
+ [Access CDC streams](keyspaces-records-cdc.md)
+ [Use KCL for processing streams](cdc_how-to-use-kcl.md)

# Configure permissions to work with CDC streams in Amazon Keyspaces
<a name="configure-cdc-permissions"></a>

To enable CDC streams, the principal, for example an IAM user or role, needs the following permissions.

For more information about AWS Identity and Access Management, see [AWS Identity and Access Management for Amazon Keyspaces](security-iam.md).

## Permissions to enable a CDC stream for a table
<a name="cdc-permissions-enable"></a>

To enable a CDC stream for an Amazon Keyspaces table, the principal first needs permissions to create or alter a table and second the permissions to create the service linked role [AWSServiceRoleForAmazonKeyspacesCDC](using-service-linked-roles-CDC-streams.md#service-linked-role-permissions-CDC-streams). Amazon Keyspaces uses the service linked role to publish CloudWatch metrics into your account on your behalf

The following IAM policy is an example of this.

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement":[
        {
            "Effect":"Allow",
            "Action":[
                "cassandra:Create",
                "cassandra:CreateMultiRegionResource",
                "cassandra:Alter",
                "cassandra:AlterMultiRegionResource"
            ],
            "Resource":[
                "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/*",
                "arn:aws:cassandra:us-east-1:111122223333:/keyspace/system*"
            ]
        },
        {
            "Sid": "KeyspacesCDCServiceLinkedRole",
            "Effect": "Allow",
            "Action": "iam:CreateServiceLinkedRole",
            "Resource": "arn:aws:iam::*:role/aws-service-role/cassandra-streams.amazonaws.com/AWSServiceRoleForAmazonKeyspacesCDC",
            "Condition": {
              "StringLike": {
                "iam:AWSServiceName": "cassandra-streams.amazonaws.com"
              }
            }
        }
    ]
}
```

To disable a stream, only `ALTER TABLE` permissions are required.

## Permissions to view a CDC stream
<a name="cdc-permissions-view"></a>

To view or list CDC streams, the principal needs read permissions for the system keyspace. For more information, see [`system_schema_mcs`](working-with-keyspaces.md#keyspace_system_schema_mcs).

The following IAM policy is an example of this.

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":"cassandra:Select",
         "Resource":[
             "arn:aws:cassandra:us-east-1:111122223333:/keyspace/system*"
         ]
      }
   ]
}
```

To view or list CDC streams with the AWS CLI or the Amazon Keyspaces API, the principal needs additional permissions for the actions `cassandra:ListStreams` and `cassandra:GetStream`.

The following IAM policy is an example of this.

```
{
  "Version": "2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "cassandra:Select",
        "cassandra:ListStreams",
        "cassandra:GetStream"
      ],
      "Resource": "*"
    }
  ]
}
```

## Permissions to read a CDC stream
<a name="cdc-permissions-read"></a>

To read CDC streams, the principal needs the following permissions.

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "cassandra:GetStream",
            "cassandra:GetShardIterator",
            "cassandra:GetRecords"
         ],
         "Resource":[
            "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label"
         ]
      }
   ]
}
```

## Permissions to process Amazon Keyspaces CDC streams with the Kinesis Client Library (KCL)
<a name="cdc-permissions-kcl"></a>

To process Amazon Keyspaces CDC streams with KCL, the IAM principal needs the following permissions. 
+ `Amazon Keyspaces` – Read-only access to a specified Amazon Keyspaces CDC stream.
+ `DynamoDB` – Permissions to create `shard lease` tables, read and write access to the tables, and read-access to the index as required for KCL stream processing.
+ `CloudWatch` – Permissions to publish metric data from Amazon Keyspaces CDC streams processing with KCL into the namespace of your KCL client application in your CloudWatch account. For more information about monitoring, see [Monitor the Kinesis Client Library with Amazon CloudWatch](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html).

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "cassandra:GetStream",
            "cassandra:GetShardIterator",
            "cassandra:GetRecords"
         ],
         "Resource":[
            "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:CreateTable",
            "dynamodb:DescribeTable",
            "dynamodb:UpdateTable",
            "dynamodb:GetItem",
            "dynamodb:UpdateItem",
            "dynamodb:PutItem",
            "dynamodb:DeleteItem",
            "dynamodb:Scan"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:CreateTable",
            "dynamodb:DescribeTable",
            "dynamodb:GetItem",
            "dynamodb:UpdateItem",
            "dynamodb:PutItem",
            "dynamodb:DeleteItem",
            "dynamodb:Scan"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME-WorkerMetricStats",
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME-CoordinatorState"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "dynamodb:Query"
         ],
         "Resource":[
            "arn:aws:dynamodb:us-east-1:111122223333:table/KCL_APPLICATION_NAME/index/*"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "cloudwatch:PutMetricData"
         ],
         "Resource":"*"
      }
   ]
}
```

# How to access CDC stream endpoints in Amazon Keyspaces
<a name="CDC_access-endpoints"></a>

Amazon Keyspaces maintains separate [endpoints](programmatic.endpoints.md#global_endpoints) for keyspaces/tables and for CDC streams in each AWS Region where Amazon Keyspaces is available. To access a CDC stream, select the Region of the table and replace the `cassandra` prefix with `cassandra-streams` in the endpoint name as shown in the following example:

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/keyspaces/latest/devguide/CDC_access-endpoints.html)

The following table contains a complete list of available public endpoints for Amazon Keyspaces change data capture streams. Amazon Keyspaces CDC streams supports both IPv4 and IPv6. All public endpoints, for example `cassandra-streams.us-east-1.api.aws`, are dual stack endpoints that can be configured for IPv4 and IPv6. 

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/keyspaces/latest/devguide/CDC_access-endpoints.html)

# Enable a CDC stream when creating a new table in Amazon Keyspaces
<a name="keyspaces-enable-cdc-new-table"></a>

To enable a CDC stream when you create a table, you can use the `CREATE TABLE` statement in CQL or the `create-table` command with the AWS CLI. 

For each changed row in the table, Amazon Keyspaces can capture the following changes based on the `view_type` of the `cdc_specification` you select:
+ `NEW_AND_OLD_IMAGES` – both versions of the row, before and after the change. This is the default.
+ `NEW_IMAGE` – the version of the row after the change.
+ `OLD_IMAGE` – the version of the row before the change.
+ `KEYS_ONLY` – the partition and clustering keys of the row that was changed.

For information about how to tag a stream, see [Add tags to a new stream when creating a table](Tagging.Operations.new.table.stream.md).

**Note**  
Amazon Keyspaces CDC requires the presence of a service-linked role (`AWSServiceRoleForAmazonKeyspacesCDC`) that publishes metric data from Amazon Keyspaces CDC streams into the `"cloudwatch:namespace": "AWS/Cassandra"` in your CloudWatch account on your behalf. This role is created automatically for you. For more information, see [Using roles for Amazon Keyspaces CDC streams](using-service-linked-roles-CDC-streams.md).

------
#### [ Cassandra Query Language (CQL) ]

**Enable a CDC stream when you create a table with CQL**

1. 

   ```
   CREATE TABLE mykeyspace.mytable (a text, b text, PRIMARY KEY(a)) 
   WITH CUSTOM_PROPERTIES={'cdc_specification': {'view_type': 'NEW_IMAGE'}} AND CDC = TRUE;
   ```

1. To confirm the stream settings, you can use the following statement.

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   The output of that statement should look similar to this.

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';keyspace_name | table_name | cdc  | custom_properties
   ---------------+------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
       mykeyspace |   mytable  | True | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741383893782', 'throughput_mode': 'PAY_PER_REQUEST'}, 'cdc_specification': {'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2025-03-07T21:44:53.783', 'status': 'ENABLED', 'view_type': 'NEW_IMAGE'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}>
   ```

------
#### [ CLI ]

**Enable a CDC stream when you create a table with the AWS CLI**

1. To create a stream you can use the following syntax. 

   ```
   aws keyspaces create-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --schema-definition 'allColumns=[{name=a,type=text},{name=b,type=text}],partitionKeys=[{name=a}]' \
   --cdc-specification status=ENABLED,viewType=NEW_IMAGE
   ```

1. The output of that command shows the standard `create-table` response and looks similar to this example. 

   ```
   { "resourceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable" }
   ```

------

# Enable a CDC stream for an existing table in Amazon Keyspaces
<a name="keyspaces-enable-cdc-alter-table"></a>

To enable a CDC stream for an existing table, you can use the `ALTER TABLE` statement in CQL, the `update-table` command with the AWS CLI, or you can use the console.

For each changed row in the table, Amazon Keyspaces can capture the following changes based on the `view_type` of the `cdc_specification` you select:
+ `NEW_AND_OLD_IMAGES` – both versions of the row, before and after the change. This is the default.
+ `NEW_IMAGE` – the version of the row after the change.
+ `OLD_IMAGE` – the version of the row before the change.
+ `KEYS_ONLY` – the partition and clustering keys of the row that was changed.

For information about how to tag a stream, see [Add new tags to a stream](Tagging.Operations.existing.stream.md).

**Note**  
Amazon Keyspaces CDC requires the presence of a service-linked role (`AWSServiceRoleForAmazonKeyspacesCDC`) that publishes metric data from Amazon Keyspaces CDC streams into the `"cloudwatch:namespace": "AWS/Cassandra"` in your CloudWatch account on your behalf. This role is created automatically for you. For more information, see [Using roles for Amazon Keyspaces CDC streams](using-service-linked-roles-CDC-streams.md).

------
#### [ Cassandra Query Language (CQL) ]

**Enable a stream (CDC stream) with CQL**

You can use `ALTER TABLE` to enable a stream for an existing table.

1. The following example creates a stream that only captures changes to partition and clustering keys of a changed row.

   ```
   ALTER TABLE mykeyspace.mytable
   WITH cdc = TRUE
   AND CUSTOM_PROPERTIES={'cdc_specification': {'view_type': 'KEYS_ONLY'}};
   ```

1. To verify the stream settings, you can use the following statement.

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   The output of the statement looks similar to this.

   ```
    keyspace_name | table_name | cdc  | custom_properties
   ---------------+------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
       mykeyspace |    mytable | True | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741385897045', 'throughput_mode': 'PAY_PER_REQUEST'}, 'cdc_specification': {'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2025-03-07T22:20:10.454', 'status': 'ENABLED', 'view_type': 'KEYS_ONLY'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}
   ```

------
#### [ CLI ]

**Create a CDC stream with the AWS CLI**

1. To create a stream for an existing table you can use the following syntax.

   ```
   aws keyspaces update-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --cdc-specification status=ENABLED,viewType=NEW_AND_OLD_IMAGES
   ```

1. The output of that command shows the standard `create-table` response and looks similar to this example.

   ```
   { "resourceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable" }
   ```

------
#### [ Console ]

**Enable a CDC stream with the Amazon Keyspaces console**

1. Sign in to the AWS Management Console, and open the Amazon Keyspaces console at [https://console.aws.amazon.com/keyspaces/home](https://console.aws.amazon.com/keyspaces/home).

1. In the navigation pane, choose **Tables**, and then choose a table from the list.

1. Choose the **Streams** tab.

1. Choose **Edit** to enable a stream.

1. Select **Turn on streams**.

1. Choose **View type** of the stream. The following options are available. Note that you can't change the view type of a stream after it's been created.
   + **New and old images** – Amazon Keyspaces captures both versions of the row, before and after the change. This is the default.
   + **New image** – Amazon Keyspaces captures only the version of the row after the change.
   + **Old image** – Amazon Keyspaces captures only the version of the row before the change.
   + **Primary key only** – Amazon Keyspaces captures only the partition and clustering key columns of the row that was changed.

1. To finish, choose **Save changes**.

------

# Disable a CDC stream in Amazon Keyspaces
<a name="keyspaces-delete-cdc"></a>

To disable a CDC stream in a keyspace, you can use the `ALTER TABLE` statement in CQL, the `update-table` command with the AWS CLI, or the console.

------
#### [ Cassandra Query Language (CQL) ]

**Disable a stream (CDC stream) with CQL**

1. To disable a stream, you can use the following statement.

   ```
   ALTER TABLE mykeyspace.mytable
   WITH cdc = FALSE;
   ```

1. To confirm that the stream is disabled, you can use the following statement.

   ```
   SELECT keyspace_name, table_name, cdc, custom_properties FROM system_schema_mcs.tables WHERE keyspace_name = 'mykeyspace' AND table_name = 'mytable';
   ```

   The output of that statement looks similar to this.

   ```
    keyspace_name | table_name | cdc   | custom_properties
   ---------------+------------+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      mykeyspace  |   mytable  | False | {'capacity_mode': {'last_update_to_pay_per_request_timestamp': '1741385668642', 'throughput_mode': 'PAY_PER_REQUEST'}, 'encryption_specification': {'encryption_type': 'AWS_OWNED_KMS_KEY'}, 'point_in_time_recovery': {'status': 'disabled'}}
   ```

------
#### [ CLI ]

**Disable a stream (CDC stream) with the AWS CLI**

1. To disable a stream, you can use the following command.

   ```
   aws keyspaces update-table \
   --keyspace-name 'mykeyspace' \
   --table-name 'mytable' \
   --cdc-specification status=DISABLED
   ```

1. The output of the command looks similar to this example.

   ```
   {
       "keyspaceArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/",
       "streamName": "my_stream"
   }
   ```

------
#### [ Console ]

**Disable a stream (CDC stream) with the Amazon Keyspaces console**

1. Sign in to the AWS Management Console, and open the Amazon Keyspaces console at [https://console.aws.amazon.com/keyspaces/home](https://console.aws.amazon.com/keyspaces/home).

1. In the navigation pane, choose **Tables**, and then choose a table from the list.

1. Choose the **Streams** tab.

1. Choose **Edit**.

1. Unselect **Turn on streams**. 

1. Choose **Save changes** to disable the stream.

------

# View CDC streams in Amazon Keyspaces
<a name="keyspaces-view-cdc"></a>

To view or list all streams in keyspace, you can query the table `system_schema_mcs.streams` in the system keyspace using a statement in CQL, or use the `get-stream` and `list-stream` commands with the AWS CLI, or the console.

For the required permissions, see [Configure permissions to work with CDC streams in Amazon Keyspaces](configure-cdc-permissions.md).

------
#### [ Cassandra Query Language (CQL) ]

**View CDC streams with CQL**
+ To monitor the CDC status of your table, you can use the following statement.

  ```
  SELECT custom_properties
  FROM system_schema_mcs.tables 
  WHERE keyspace_name='my_keyspace' and table_name='my_table';
  ```

  The output of the command looks similar to this.

  ```
  ...
  custom_properties
  ----------------------------------------------------------------------------------
  {'cdc_specification':{'status': 'Enabled', 'view_type': 'NEW_IMAGE', 'latest_stream_arn': 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label''}}
  ...
  ```

------
#### [ CLI ]

**View CDC streams with the AWS CLI**

1. This example shows how to see the stream information for a table.

   ```
   aws keyspaces get-table \
   --keyspace-name 'my_keyspace' \
   --table-name 'my_table'
   ```

   The output of the command looks like this.

   ```
   {
       "keyspaceName": "my_keyspace",
       "tableName": "my_table",
       ... Other fields ...,
       "latestStreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/my_keyspace/table/my_table/stream/stream_label",
       "cdcSpecification": {
           "status": "ENABLED",
           "viewType": "NEW_AND_OLD_IMAGES"    
       }
   }
   ```

1. You can list all streams in your account in a specified AWS Region. The following command is an example of this.

   ```
   aws keyspacesstreams list-streams --region us-east-1
   ```

   The output of the command could look similar to this.

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t1",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"Create a keyspace with the name catalog. Note
                                   that streams are not supported in multi-Region keyspaces.
               "TableName": "t2",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_2/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_3"
               "TableName": "t1",
           }
       ]
   }
   ```

1. You can also list the CDC streams for a given keyspace using the following parameters. 

   ```
   aws keyspacesstreams list-streams --keyspace-name ks_1 --region us-east-1
   ```

   The output of the command looks similar to this.

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t1/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t1",
           },
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t2",
           }
       ]
   }
   ```

1. You can also list the CDC streams for a given table using the following parameters. 

   ```
   aws keyspacesstreams list-streams --keyspace-name ks_1 --table-name t2 --region us-east-1
   ```

   The output of the command looks similar to this.

   ```
   {
       "Streams": [
           {
               "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/ks_1/table/t2/stream/2023-05-11T21:21:33.291",
               "StreamLabel": "2023-05-11T21:21:33.291",
               "KeyspaceName": "ks_1"
               "TableName": "t2",
           }
       ]
   }
   ```

------
#### [ Console ]

**View CDC streams in the Amazon Keyspaces console**

1. Sign in to the AWS Management Console, and open the Amazon Keyspaces console at [https://console.aws.amazon.com/keyspaces/home](https://console.aws.amazon.com/keyspaces/home).

1. In the navigation pane, choose **Tables**, and then choose a table from the list.

1. Choose the **Streams** tab to review the stream details.

------

# Access records in CDC streams in Amazon Keyspaces
<a name="keyspaces-records-cdc"></a>

To access the records in a stream, you use the [Amazon Keyspaces Streams API](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html). The following section contains examples on how to access records using the AWS CLI.

For the required permissions, see [Configure permissions to work with CDC streams in Amazon Keyspaces](configure-cdc-permissions.md).

**Access records in a stream using the AWS CLI**

1. You can use the Amazon Keyspaces Streams API to access the change records of the stream. For more information, see [https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html](https://docs.aws.amazon.com/keyspaces/latest/StreamsAPIReference/Welcome.html). To retrieve the shards within the stream, you can use the `get-stream` API as shown in the following example.

   ```
   aws keyspacesstreams get-stream \
   --stream-arn 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/STREAM_LABEL'
   ```

   The following is an example of the output.

   ```
   {
      "StreamArn": "arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/2023-05-11T21:21:33.291",
      "StreamStatus": "ENABLED",
      "StreamViewType": "NEW_AND_OLD_IMAGES",
      "CreationRequestDateTime": "<CREATION_TIME>",
      "KeyspaceName": "mykeyspace",
      "TableName": "mytable",
      "StreamLabel": "2023-05-11T21:21:33.291",
       "Shards": [
           {
               "SequenceNumberRange": {
                   "EndingSequenceNumber": "<END_SEQUENCE_NUMBER>",
                   "StartingSequenceNumber": "<START_SEQUENCE_NUMBER>"
               },
               "ShardId": "<SHARD_ID>"
           },
       ]
   }
   ```

1. To retrieve records from the stream, you start with getting an iterator that provides you with the starting point for accessing records. To do this, you can use the shards within the CDC stream returned by the API in the previous step. To gather the iterator, you can use the `get-shard-iterator` API. For this example, you use an iterator of type `TRIM_HORIZON` that retrieves from the last trimmed point or beginning) of the shard.

   ```
   aws keyspacesstreams get-shard-iterator \
   --stream-arn 'arn:aws:cassandra:us-east-1:111122223333:/keyspace/mykeyspace/table/mytable/stream/STREAM_LABEL' \
   --shard-id 'SHARD_ID' \
   --shard-iterator-type 'TRIM_HORIZON'
   ```

   The output of the command looks like in the following example.

   ```
   {
       "ShardIterator": "<SHARD_ITERATOR>" 
   }
   ```

1. To retrieve the CDC records using the `get-records` API, you can use the iterator returned in the last step. The following command is an example of this.

   ```
   aws keyspacesstreams get-records \
   --shard-iterator 'SHARD_ITERATOR' \
   --limit 100
   ```

# Use the Kinesis Client Library (KCL) to process Amazon Keyspaces streams
<a name="cdc_how-to-use-kcl"></a>

This topic describes how to use the Kinesis Client Library (KCL) to consume and process data from Amazon Keyspaces change data capture (CDC) streams.

Instead of working directly with the Amazon Keyspaces Streams API, working with the Kinesis Client Library (KCL) provides many benefits, for example:
+ Built in shard lineage tracking and iterator handling. 
+ Automatic load balancing across workers.
+ Fault tolerance and recovery from worker failures.
+ Checkpointing to track processing progress.
+ Adaptation to changes in stream capacity.
+ Simplified distributed computing for processing CDC records.

The following section outlines why and how to use the Kinesis Client Library (KCL) to process streams and provides an example for processing an Amazon Keyspaces CDC stream with the KCL.

For information about pricing, see [Amazon Keyspaces (for Apache Cassandra) pricing](https://aws.amazon.com/keyspaces/pricing).

## What is the Kinesis Client Library?
<a name="cdc-kcl-what-is"></a>

The Kinesis Client Library (KCL) is a standalone Java software library designed to simplify the process of consuming and processing data from streams. KCL handles many of the complex tasks associated with distributed computing, letting you focus on implementing your business logic when processing stream data. KCL manages activities such as load balancing across multiple workers, responding to worker failures, checkpointing processed records, and responding to changes in the number of shards in the stream.

To process Amazon Keyspaces CDC streams, you can use the design patterns found in the KCL for working with stream shards and stream records. The KCL simplifies coding by providing useful abstractions above the low-level Kinesis Data Streams API. For more information about the KCL, see [Develop consumers with KCL](https://docs.aws.amazon.com/kinesis/latest/dev/develop-kcl-consumers.html) in the *Amazon Kinesis Data Streams Developer Guide*.

 To write applications using the KCL, you use the Amazon Keyspaces Streams Kinesis Adapter. The Kinesis Adapter implements the Kinesis Data Streams interface so that you can use the KCL for consuming and processing records from Amazon Keyspaces streams. For instructions on how to set up and install the Amazon Keyspaces streams Kinesis adapter, visit the [GitHub](https://github.com/aws/keyspaces-streams-kinesis-adapter) repository.

The following diagram shows how these libraries interact with each other.

![\[Interaction between a client applications and Kinesis Data Streams, KCL, the Amazon Keyspaces Streams Kinesis Adapter, and Amazon Keyspaces APIs when processing Amazon Keyspaces CDC stream records.\]](http://docs.aws.amazon.com/keyspaces/latest/devguide/images/keyspaces-streams-kinesis-adapter.png)


KCL is frequently updated to incorporate newer versions of underlying libraries, security improvements, and bug fixes. We recommend that you use the latest version of KCL to avoid known issues and benefit from all latest improvements. To find the latest KCL version, see [KCL GitHub repository](https://github.com/awslabs/amazon-kinesis-client).

## KCL concepts
<a name="cdc-kcl-concepts"></a>

Before you implement a consumer application using KCL, you should understand the following concepts:

**KCL consumer application**  
A KCL consumer application is a program that processes data from an Amazon Keyspaces CDC stream. The KCL acts as an intermediary between your consumer application code and the Amazon Keyspaces CDC stream.

**Worker**  
A worker is an execution unit of your KCL consumer application that processes data from the Amazon Keyspaces CDC stream. Your application can run multiple workers distributed across multiple instances.

**Record processor**  
A record processor is the logic in your application that processes data from a shard in the Amazon Keyspaces CDC stream. A record processor is instantiated by a worker for each shard it manages.

**Lease**  
A lease represents the processing responsibility for a shard. Workers use leases to coordinate which worker is processing which shard. KCL stores lease data in a table in Amazon DynamoDB.

**Checkpoint**  
A checkpoint is a record of the position in the shard up to which the record processor has successfully processed records. Checkpointing enables your application to resume processing from where it left off if a worker fails.

With the Amazon Keyspaces Kinesis adapter in place, you can begin developing against the KCL interface, with the API calls seamlessly directed at the Amazon Keyspaces stream endpoint. For a list of available endpoints, see [How to access CDC stream endpoints in Amazon Keyspaces](CDC_access-endpoints.md).

When your application starts, it calls the KCL to instantiate a worker. You must provide the worker with configuration information for the application, such as the stream descriptor and AWS credentials, and the name of a record processor class that you provide. As it runs the code in the record processor, the worker performs the following tasks:
+ Connects to the stream
+ Enumerates the shards within the stream
+ Coordinates shard associations with other workers (if any)
+ Instantiates a record processor for every shard it manages
+ Pulls records from the stream
+ Pushes the records to the corresponding record processor
+ Checkpoints processed records
+ Balances shard-worker associations when the worker instance count changes
+ Balances shard-worker associations when shards are split

# Implement a KCL consumer application for Amazon Keyspaces CDC streams
<a name="cdc-kcl-implementation"></a>

This topic provides a step-by-step guide to implementing a KCL consumer application to process Amazon Keyspaces CDC streams.

1. Prerequisites: Before you begin, ensure you have:
   + An Amazon Keyspaces table with a CDC stream
   + Required IAM permissions for the IAM principal to access the Amazon Keyspaces CDC stream, create and access DynamoDB tables for KCL stream processing, and permissions to publish metrics to CloudWatch. For more information and a policy example, see [Permissions to process Amazon Keyspaces CDC streams with the Kinesis Client Library (KCL)](configure-cdc-permissions.md#cdc-permissions-kcl).
   + Ensure that valid AWS credentials are set up in your local configuration. For more information, see [Store access keys for programmatic access](aws.credentials.manage.md).
   + Java Development Kit (JDK) 8 or later
   + Requirements listed in the [Readme](https://github.com/aws/keyspaces-streams-kinesis-adapter) on Github.

1. <a name="cdc-kcl-add-dependencies"></a>In this step, you add the KCL dependency to your project. For Maven, add the following to your pom.xml:

   ```
   <dependencies>
           <dependency>
               <groupId>software.amazon.kinesis</groupId>
               <artifactId>amazon-kinesis-client</artifactId>
               <version>3.1.0</version>
           </dependency>
           <dependency>
               <groupId>software.amazon.keyspaces</groupId>
               <artifactId>keyspaces-streams-kinesis-adapter</artifactId>
               <version>1.0.0</version>
           </dependency>
       </dependencies>
   ```
**Note**  
Always check for the latest version of KCL at the [KCL GitHub repository](https://github.com/awslabs/amazon-kinesis-client).

1. <a name="cdc-kcl-factory"></a>Create a factory class that produces record processor instances:

   ```
   import software.amazon.awssdk.services.keyspacesstreams.model.Record;
   import software.amazon.keyspaces.streamsadapter.adapter.KeyspacesStreamsClientRecord;
   import software.amazon.keyspaces.streamsadapter.model.KeyspacesStreamsProcessRecordsInput;
   import software.amazon.keyspaces.streamsadapter.processor.KeyspacesStreamsShardRecordProcessor;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
   import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
   import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
   import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
   
   public class RecordProcessor implements KeyspacesStreamsShardRecordProcessor {
       private String shardId;
   
       @Override
       public void initialize(InitializationInput initializationInput) {
           this.shardId = initializationInput.shardId();
           System.out.println("Initializing record processor for shard: " + shardId);
       }
   
       @Override
       public void processRecords(KeyspacesStreamsProcessRecordsInput processRecordsInput) {
           try {
               for (KeyspacesStreamsClientRecord record : processRecordsInput.records()) {
                   Record keyspacesRecord = record.getRecord();
                   System.out.println("Received record: " + keyspacesRecord);
               }
   
               if (!processRecordsInput.records().isEmpty()) {
                   RecordProcessorCheckpointer checkpointer = processRecordsInput.checkpointer();
                   try {
                       checkpointer.checkpoint();
                       System.out.println("Checkpoint successful for shard: " + shardId);
                   } catch (Exception e) {
                       System.out.println("Error while checkpointing for shard: " + shardId + " " + e);
                   }
               }
           } catch (Exception e) {
               System.out.println("Error processing records for shard: " + shardId + " " + e);
           }
       }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
           System.out.println("Lease lost for shard: " + shardId);
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           System.out.println("Shard ended: " + shardId);
           try {
               // This is required. Checkpoint at the end of the shard
               shardEndedInput.checkpointer().checkpoint();
               System.out.println("Final checkpoint successful for shard: " + shardId);
           } catch (Exception e) {
               System.out.println("Error while final checkpointing for shard: " + shardId + " " + e);
               throw new RuntimeException("Error while final checkpointing", e);
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           System.out.println("Shutdown requested for shard " + shardId);
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (Exception e) {
               System.out.println("Error while checkpointing on shutdown for shard: " + shardId + " " + e);
           }
       }
   }
   ```

1. <a name="cdc-kcl-record-factory"></a>Create a record factory as shown in the following example.

   ```
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   import java.util.Queue;
   import java.util.concurrent.ConcurrentLinkedQueue;
   
   public class RecordProcessorFactory implements ShardRecordProcessorFactory {
       private final Queue<RecordProcessor> processors = new ConcurrentLinkedQueue<>();
   
       @Override
       public ShardRecordProcessor shardRecordProcessor() {
           System.out.println("Creating new RecordProcessor");
           RecordProcessor processor = new RecordProcessor();
           processors.add(processor);
           return processor;
       }
   }
   ```

1. <a name="cdc-kcl-consumer"></a>In this step you create the base class to configure KCLv3 and the Amazon Keyspaces adapter.

   ```
   import com.example.KCLExample.utils.RecordProcessorFactory;
   import software.amazon.keyspaces.streamsadapter.AmazonKeyspacesStreamsAdapterClient;
   import software.amazon.keyspaces.streamsadapter.StreamsSchedulerFactory;
   import java.util.Arrays;
   import java.util.List;
   import java.util.concurrent.ExecutionException;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
   import software.amazon.awssdk.services.dynamodb.model.DeleteTableResponse;
   import software.amazon.awssdk.services.keyspacesstreams.KeyspacesStreamsClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   import software.amazon.kinesis.coordinator.CoordinatorConfig;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.leases.LeaseManagementConfig;
   import software.amazon.kinesis.processor.ProcessorConfig;
   import software.amazon.kinesis.processor.StreamTracker;
   import software.amazon.kinesis.retrieval.polling.PollingConfig;
   
   public class KCLTestBase {
   
       protected KeyspacesStreamsClient streamsClient;
       protected KinesisAsyncClient adapterClient;
       protected DynamoDbAsyncClient dynamoDbAsyncClient;
       protected CloudWatchAsyncClient cloudWatchClient;
       protected Region region;
       protected RecordProcessorFactory recordProcessorFactory;
       protected Scheduler scheduler;
       protected Thread schedulerThread;
   
       public void baseSetUp() {
           recordProcessorFactory = new RecordProcessorFactory();
           setupKCLBase();
       }
   
       protected void setupKCLBase() {
           region = Region.US_EAST_1;
   
           streamsClient = KeyspacesStreamsClient.builder()
                   .region(region)
                   .build();
           adapterClient = new AmazonKeyspacesStreamsAdapterClient(
                   streamsClient,
                   region);
           dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                   .region(region)
                   .build();
           cloudWatchClient = CloudWatchAsyncClient.builder()
                   .region(region)
                   .build();
       }
   
       protected void startScheduler(Scheduler scheduler) {
           this.scheduler = scheduler;
           schedulerThread = new Thread(() -> scheduler.run());
           schedulerThread.start();
       }
   
       protected void shutdownScheduler() {
           if (scheduler != null) {
               scheduler.shutdown();
               try {
                   schedulerThread.join(30000);
               } catch (InterruptedException e) {
                   System.out.println("Error while shutting down scheduler " + e);
               }
           }
       }
   
       protected Scheduler createScheduler(String streamArn, String leaseTableName) {
           String workerId = "worker-" + System.currentTimeMillis();
   
           // Create ConfigsBuilder
           ConfigsBuilder configsBuilder = createConfigsBuilder(streamArn, workerId, leaseTableName);
   
           // Configure retrieval config for polling
           PollingConfig pollingConfig = new PollingConfig(streamArn, adapterClient);
   
           // Create the Scheduler
           return StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   configsBuilder.coordinatorConfig(),
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig),
                   streamsClient,
                   region
           );
       }
   
       private ConfigsBuilder createConfigsBuilder(String streamArn, String workerId, String leaseTableName) {
           ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamArn,
                   leaseTableName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchClient,
                   workerId,
                   recordProcessorFactory);
   
           configureCoordinator(configsBuilder.coordinatorConfig());
           configureLeaseManagement(configsBuilder.leaseManagementConfig());
           configureProcessor(configsBuilder.processorConfig());
           configureStreamTracker(configsBuilder, streamArn);
   
           return configsBuilder;
       }
   
       private void configureCoordinator(CoordinatorConfig config) {
           config.skipShardSyncAtWorkerInitializationIfLeasesExist(true)
                   .parentShardPollIntervalMillis(1000)
                   .shardConsumerDispatchPollIntervalMillis(500);
       }
   
       private void configureLeaseManagement(LeaseManagementConfig config) {
           config.shardSyncIntervalMillis(0)
                   .leasesRecoveryAuditorInconsistencyConfidenceThreshold(0)
                   .leasesRecoveryAuditorExecutionFrequencyMillis(5000)
                   .leaseAssignmentIntervalMillis(1000L);
       }
   
       private void configureProcessor(ProcessorConfig config) {
           config.callProcessRecordsEvenForEmptyRecordList(true);
       }
   
       private void configureStreamTracker(ConfigsBuilder configsBuilder, String streamArn) {
           StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
                   streamArn,
                   InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
           );
           configsBuilder.streamTracker(streamTracker);
       }
   
       public void deleteAllDdbTables(String baseTableName) {
           List<String> tablesToDelete = Arrays.asList(
                   baseTableName,
                   baseTableName + "-CoordinatorState",
                   baseTableName + "-WorkerMetricStats"
           );
   
           for (String tableName : tablesToDelete) {
               deleteTable(tableName);
           }
       }
   
       private void deleteTable(String tableName) {
           DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                   .tableName(tableName)
                   .build();
   
           try {
               DeleteTableResponse response = dynamoDbAsyncClient.deleteTable(deleteTableRequest).get();
               System.out.println("Table deletion response " + response);
           } catch (InterruptedException | ExecutionException e) {
               System.out.println("Error deleting table: " + tableName + " " + e);
           }
       }
   }
   ```

1. <a name="cdc-kcl-record-processor"></a>In this step you implement the record processor class for your application to start processing change events.

   ```
    import software.amazon.kinesis.coordinator.Scheduler;
   
   public class KCLTest {
   
       private static final int APP_RUNTIME_SECONDS = 1800;
       private static final int SLEEP_INTERNAL_MS = 60*1000;
   
       public static void main(String[] args) {
           KCLTestBase kclTestBase;
   
           kclTestBase = new KCLTestBase();
           kclTestBase.baseSetUp();
   
           // Create and start scheduler
           String leaseTableName = generateUniqueApplicationName();
   
           // Update below to your Stream ARN
           String streamArn = "arn:aws:cassandra:us-east-1:759151643516:/keyspace/cdc_sample_test/table/test_kcl_bool/stream/2025-07-01T15:52:57.529";
           Scheduler scheduler = kclTestBase.createScheduler(streamArn, leaseTableName);
           kclTestBase.startScheduler(scheduler);
   
           // Wait for specified time before shutting down - KCL applications are designed to run forever, however in this
           // example we will shut it down after APP_RUNTIME_SECONDS
           long startTime = System.currentTimeMillis();
           long endTime = startTime + (APP_RUNTIME_SECONDS * 1000);
           while (System.currentTimeMillis() < endTime) {
               try {
                   // Print and sleep every minute
                   Thread.sleep(SLEEP_INTERNAL_MS);
                   System.out.println("Application is running");
               } catch (InterruptedException e) {
                   System.out.println("Interrupted while waiting for records");
                   Thread.currentThread().interrupt();
                   break;
               }
           }
   
           // Stop the scheduler
           kclTestBase.shutdownScheduler();
           kclTestBase.deleteAllDdbTables(leaseTableName);
       }
   
       public static String generateUniqueApplicationName() {
           String timestamp = String.valueOf(System.currentTimeMillis());
           String randomString = java.util.UUID.randomUUID().toString().substring(0, 8);
           return String.format("KCL-App-%s-%s", timestamp, randomString);
       }
   }
   ```

## Best practices
<a name="cdc-kcl-best-practices"></a>

Follow these best practices when using KCL with Amazon Keyspaces CDC streams:

**Error handling**  
Implement robust error handling in your record processor to handle exceptions gracefully. Consider implementing retry logic for transient failures.

**Checkpointing frequency**  
Balance checkpointing frequency to minimize duplicate processing while ensuring reasonable progress tracking. Too frequent checkpointing can impact performance, while too infrequent checkpointing can lead to more reprocessing if a worker fails.

**Worker scaling**  
Scale the number of workers based on the number of shards in your CDC stream. A good starting point is to have one worker per shard, but you may need to adjust based on your processing requirements.

**Monitoring**  
Use CloudWatch metrics provided by KCL to monitor the health and performance of your consumer application. Key metrics include processing latency, checkpoint age, and lease counts.

**Testing**  
Test your consumer application thoroughly, including scenarios like worker failures, stream resharding, and varying load conditions.

## Using KCL with non-Java languages
<a name="cdc-kcl-non-java"></a>

While KCL is primarily a Java library, you can use it with other programming languages through the MultiLangDaemon. The MultiLangDaemon is a Java-based daemon that manages the interaction between your non-Java record processor and the KCL.

KCL provides support for the following languages:
+ Python
+ Ruby
+ Node.js
+ .NET

For more information about using KCL with non-Java languages, see the [KCL MultiLangDaemon documentation](https://github.com/awslabs/amazon-kinesis-client/tree/master/amazon-kinesis-client-multilang).

## Troubleshooting
<a name="cdc-kcl-troubleshooting"></a>

This section provides solutions to common issues you might encounter when using KCL with Amazon Keyspaces CDC streams.

**Slow processing**  
If your consumer application is processing records slowly, consider:  
+ Increasing the number of worker instances
+ Optimizing your record processing logic
+ Checking for bottlenecks in downstream systems

**Duplicate processing**  
If you're seeing duplicate processing of records, check your checkpointing logic. Ensure you're checkpointing after successfully processing records.

**Worker failures**  
If workers are failing frequently, check:  
+ Resource constraints (CPU, memory)
+ Network connectivity issues
+ Permissions issues

**Lease table issues**  
If you're experiencing issues with the KCL lease table:  
+ Check that your application has appropriate permissions to access the Amazon Keyspaces table
+ Verify that the table has sufficient provisioned throughput