

# Use Kinesis Client Library
<a name="kcl"></a>

## What is Kinesis Client Library?
<a name="kcl-library-what-is"></a>

Kinesis Client Library (KCL) is a standalone Java software library designed to simplify the process of consuming and processing data from Amazon Kinesis Data Streams. KCL handles many of the complex tasks associated with distributed computing, letting developers focus on implementing their business logic for processing data. It manages activities such as load balancing across multiple workers, responding to worker failures, checkpointing processed records, and responding to changes in the number of shards in the stream.

KCL is frequently updated to incorporate newer versions of underlying libraries, security improvements, and bug fixes. We recommend that you use the latest version of KCL to avoid known issues and benefit from all latest improvements. To find the latest KCL version, see [ KCL Github](https://github.com/awslabs/amazon-kinesis-client). 

**Important**  
We recommend that you use the latest KCL version to avoid known bugs and issues. If you are using KCL 2.6.0 or earlier, upgrade to KCL 2.6.1 or later to avoid a rare condition that can block shard processing when stream capacity changes. 
KCL is a Java library. Support for languages other than Java is provided using a Java-based daemon called MultiLangDaemon. MultiLangDaemon interacts with the KCL application over STDIN and STDOUT. For more information about the MultiLangDaemon on GitHub, see [Develop consumers with KCL in non-Java languages](develop-kcl-consumers-non-java.md).
Do not use AWS SDK for Java version 2.27.19 to 2.27.23 with KCL 3.x. These versions include an issue that causes an exception error related to KCL's DynamoDB usage. We recommend that you use the AWS SDK for Java version 2.28.0 or later to avoid this issue. 

## KCL key features and benefits
<a name="kcl-benefits"></a>

Following are the key features and related benefits of the KCL:
+ **Scalability**: KCL enables applications to scale dynamically by distributing the processing load across multiple workers. You can scale your application in or out, manually or with auto-scaling, without worrying about load redistribution.
+ **Load balancing**: KCL automatically balances the processing load across available workers, resulting in an even distribution of work across workers.
+ **Checkpointing**: KCL manages checkpointing of processed records, enabling applications to resume processing from their last successfully processed position.
+ **Fault tolerance**: KCL provides built-in fault tolerance mechanisms, making sure that data processing continues even if individual workers fail. KCL also provides at-least-once delivery.
+ **Handling stream-level changes**: KCL adapts to shard splits and merges that might occur due to changes in data volume. It maintains ordering by making sure that child shards are processed only after their parent shard is completed and checkpointed.
+ **Monitoring**: KCL integrates with Amazon CloudWatch for consumer-level monitoring.
+ **Multi-language support**: KCL natively supports Java and enables multiple non-Java programming languages through MultiLangDaemon.

# KCL concepts
<a name="kcl-concepts"></a>

This section explains the core concepts and interactions of Kinesis Client Library (KCL). These concepts are fundamental to developing and managing KCL consumer applications.
+ **KCL consumer application** – a custom-built application designed to read and process records from Kinesis data streams using the Kinesis Client Library.
+ **Worker** – KCL consumer applications are typically distributed, with one or more workers running simultaneously. KCL coordinates workers to consume data from the stream in a distributed manner and balances the load evenly across multiple workers.
+ **Scheduler** – a high-level class that a KCL worker uses to start processing data. Each KCL worker has one scheduler. The scheduler initializes and oversees various tasks, including syncing shard information from Kinesis data streams, tracking shard assignments among workers, and processing data from the stream based on the assigned shards to the worker. Scheduler can take various configurations that affect the scheduler's behavior, such as the name of the stream to process and AWS credentials. Scheduler initiates the delivery of data records from the stream to the record processors.
+ **Record processor** – defines the logic for how your KCL consumer application processes the data it receives from the data streams. You must implement your own custom data processing logic in the record processor. A KCL worker instantiates a scheduler. The scheduler then instantiates one record processor for every shard to which it holds a lease. A worker can run multiple record processors.
+ **Lease** – defines the assignment between a worker and a shard. KCL consumer applications use leases to distribute data record processing across multiple workers. Each shard is bound to only one worker by a lease at any given time and each worker can hold one or more leases simultaneously. When a worker stops holding a lease due to stopping or failing, KCL assigns another worker to take the lease. To learn more about the lease, see [Github documentation: Lease Lifecycle](https://github.com/awslabs/amazon-kinesis-client/blob/master/docs/lease-lifecycle.md#lease-lifecycle).
+ **Lease table** – is a unique Amazon DynamoDB table used to track all leases for the KCL consumer application. Each KCL consumer application creates its own lease table. The lease table is used to maintain state across all workers to coordinate data processing. For more information, see [DynamoDB metadata tables and load balancing in KCL](kcl-dynamoDB.md).
+ **Checkpointing** – is the process of persistently storing the position of the last successfully processed record in a shard. KCL manages checkpointing to make sure that processing can be resumed from the last checkpointed position if a worker fails or the application restarts. Checkpoints are stored in the DynamoDB lease table as part of the metadata of the lease. This allows workers to continue processing from where the previous worker stopped.

# DynamoDB metadata tables and load balancing in KCL
<a name="kcl-dynamoDB"></a>

KCL manages metadata such as leases and CPU utilization metrics from workers. KCL tracks these metadata using DynamoDB tables. For each Amazon Kinesis Data Streams application, KCL creates three DynamoDB tables to manage the metadata: lease table, worker metrics table, and coordinator state table.

**Note**  
KCL 3.x introduced two new metadata tables: *worker metrics* and *coordinator state* tables.

**Important**  
 You must add proper permissions for KCL applications to create and manage metadata tables in DynamoDB. For details, see [IAM permissions required for KCL consumer applications](kcl-iam-permissions.md).  
KCL consumer application does not automatically remove these three DynamoDB metadata tables. Make sure that you remove these DynamoDB metadata tables created by KCL consumer application when you decommission your consumer application to prevent unnecessary cost.

## Lease table
<a name="kcl-leasetable"></a>

A lease table is a unique Amazon DynamoDB table used to track the shards being leased and processed by the schedulers of the KCL consumer application. Each KCL consumer application creates its own lease table. KCL uses the name of the consumer application for the name of the lease table by default. You can set a custom table name using configuration. KCL also creates a [global secondary index](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html) on the lease table with the partition key of leaseOwner for an efficient lease discovery. Global secondary index mirrors the leaseKey attribute from the base lease table. If the lease table for your KCL consumer application does not exist when the application starts up, one of the workers creates the lease table for your application.

You can view the lease table using the [Amazon DynamoDB console](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) while the consumer application is running.

**Important**  
Each KCL consumer application name must be unique to prevent a duplicated lease table name. 
Your account is charged for the costs associated with the DynamoDB table, in addition to the costs associated with Kinesis Data Streams itself. 

Each row in the lease table represents a shard that is being processed by the schedulers of your consumer application. Key fields include the following:
+ **leaseKey:** For single-stream processing, this is the shard ID. For multi-stream processing with KCL, it's structured as `account-id:StreamName:streamCreationTimestamp:ShardId`. leaseKey is the partition key of the lease table. For more information about multi-stream processing, see [Multi-stream processing with KCL](kcl-multi-stream.md).
+ **checkpoint:** The most recent checkpoint sequence number for the shard. 
+ **checkpointSubSequenceNumber:** When using the Kinesis Producer Library's aggregation feature, this is an extension to **checkpoint** that tracks individual user records within the Kinesis record.
+ **leaseCounter:** Used for checking if a worker is currently processing the lease actively. leaseCounter increases if the lease ownership is transferred to another worker.
+ **leaseOwner:** The current worker that is holding this lease.
+ **ownerSwitchesSinceCheckpoint:** How many times this lease has changed workers since the last checkpoint.
+ **parentShardId:** ID of this shard's parent. Makes sure that the parent shard is fully processed before processing starts on the child shards, maintaining the correct record processing order.
+ **childShardId:** List of child shard IDs resulting from this shard's split or merge. Used to track shard lineage and manage processing order during resharding operations.
+ **startingHashKey:** The lower bound of the hash key range for this shard.
+ **endingHashKey:** The upper bound of the hash key range for this shard.

If you use multi-stream processing with KCL, you see the following two additional fields in the lease table. For more information, see [Multi-stream processing with KCL](kcl-multi-stream.md).
+ **shardID:** The ID of the shard.
+ **streamName:** The identifier of the data stream in the following format: `account-id:StreamName:streamCreationTimestamp`.

## Worker metrics table
<a name="kcl-worker-metrics-table"></a>

Worker metrics table is a unique Amazon DynamoDB table for each KCL application and is used to record CPU utilization metrics from each worker. These metrics will be used by KCL to perform efficient lease assignments to result in balanced resource utilization across workers. KCL uses `KCLApplicationName-WorkerMetricStats` for the name of the worker metrics table by default.

## Coordinator state table
<a name="kcl-coordinator-state-table"></a>

A coordinator state table is a unique Amazon DynamoDB table for each KCL application and is used to store internal state information for workers. For example, the coordinator state table stores data regarding the leader election or metadata associated with the in-place migration from KCL 2.x to KCL 3.x. KCL uses `KCLApplicationName-CoordinatorState` for the name of the coordinator state table by default.

## DynamoDB capacity mode for metadata tables created by KCL
<a name="kcl-capacity-mode"></a>

By default, the Kinesis Client Library (KCL) creates DynamoDB metadata tables such as lease table, worker metrics table, and coordinator state table using the [on-demand capacity mode](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/on-demand-capacity-mode.html). This mode automatically scales read and write capacity to accommodate traffic without requiring capacity planning. We strongly recommend you to keep the capacity mode as on-demand mode for more efficient operation of these metadata tables.

If you decide to switch the lease table to [provisioned capacity mode](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html), follow these best practices:
+ Analyze usage patterns:
  + Monitor your application's read and write patterns and usages (RCU, WCU) using Amazon CloudWatch metrics.
  + Understand peak and average throughput requirements.
+ Calculate the required capacity:
  + Estimate read capacity units (RCUs) and write capacity units (WCUs) based on your analysis.
  + Consider factors like the number of shards, checkpoint frequency, and worker count.
+ Implement auto scaling:
  + Use [DynamoDB auto scaling](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html#ddb-autoscaling) to automatically adjust provisioned capacity and set appropriate minimum and maximum capacity limits. 
  + DynamoDB auto scaling will help to avoid your KCL metadata table from hitting the capacity limit and getting throttled.
+ Regular monitoring and optimization:
  + Continuously monitor CloudWatch metrics for `ThrottledRequests`.
  + Adjust capacity as your workload changes over time.

If you experience a `ProvisionedThroughputExceededException` in metadata DynamoDB tables for your KCL consumer application, you must increase the provisioned throughput capacity of the DynamoDB table. If you set a certain level of read capacity units (RCU) and write capacity units (WCU) when you first create your consumer application, it might not be sufficient as your usage grows. For example, if your KCL consumer application does frequent checkpointing or operates on a stream with many shards, you might need more capacity units. For information about provisioned throughput in DynamoDB, see [DynamoDB throughput capacity](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/capacity-mode.html) and [updating a table](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html#WorkingWithTables.Basics.UpdateTable) in the Amazon DynamoDB Developer Guide.

## How KCL assigns leases to workers and balances the load
<a name="kcl-assign-leases"></a>

KCL continuously gathers and monitors CPU utilization metrics from compute hosts running the workers to ensure even workload distribution. These CPU utilization metrics are stored in the worker metrics table in DynamoDB. If KCL detects that some workers are showing higher CPU utilization rates compared to others, it will reassign leases among workers to lower the load on highly used workers. The goal is to balance the workload more evenly across the consumer application fleet, preventing any single worker from becoming overloaded. As KCL distributes CPU utilization across the consumer application fleet, you can right-size your consumer application fleet capacity by choosing the right number of workers or use auto scaling to efficiently manage the computing capacity to achieve lower cost.

**Important**  
KCL can collect CPU utilization metrics from workers only if certain prerequisites are met. For details, see [Prerequisites](develop-kcl-consumers-java.md#develop-kcl-consumers-java-prerequisites). If KCL cannot collect CPU utilization metrics from workers, KCL will fall back to using throughput per worker to assign leases and balance the load across workers in the fleet. KCL will monitor the throughput that each worker receives at a given time and reassign leases to make sure that each worker gets a similar total throughput level from its assigned leases.

# Develop consumers with KCL
<a name="develop-kcl-consumers"></a>

You can use the Kinesis Client Library (KCL) to build consumer applications that process data from your Kinesis data streams.

KCL is available in multiple languages. This topic covers how to develop KCL consumers in Java and non-Java languages.
+ To view the Kinesis Client Library Javadoc reference, see the [Amazon Kinesis Client Library Javadoc](https://javadoc.io/doc/software.amazon.kinesis/amazon-kinesis-client/latest/index.html).
+ To download KCL for Java from GitHub, see [Amazon Kinesis Client Library for Java](https://github.com/awslabs/amazon-kinesis-client).
+ To locate the KCL for Java on Apache Maven, see the [KCL Maven Central Repository](https://central.sonatype.com/artifact/software.amazon.kinesis/amazon-kinesis-client).

**Topics**
+ [

# Develop consumers with KCL in Java
](develop-kcl-consumers-java.md)
+ [

# Develop consumers with KCL in non-Java languages
](develop-kcl-consumers-non-java.md)

# Develop consumers with KCL in Java
<a name="develop-kcl-consumers-java"></a>

## Prerequisites
<a name="develop-kcl-consumers-java-prerequisites"></a>

Before you start using KCL 3.x, make sure that you have the following:
+ Java Development Kit (JDK) 8 or later
+ AWS SDK for Java 2.x
+ Maven or Gradle for dependency management

KCL collects CPU utilization metrics such as CPU utilization from the compute host that workers are running on to balance the load to achieve an even resource utilization level across workers. To enable KCL to collect CPU utilization metrics from workers, you must meet the following prerequisites:

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ Your operating system must be Linux OS.
+ You must enable [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html) in your EC2 instance.

 **Amazon Elastic Container Service (Amazon ECS) on Amazon EC2**
+ Your operating system must be Linux OS.
+ You must enable [ECS task metadata endpoint version 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html). 
+ Your Amazon ECS container agent version must be 1.39.0 or later.

 **Amazon ECS on AWS Fargate**
+ You must enable [Fargate task metadata endpoint version 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html). If you use Fargate platform version 1.4.0 or later, this is enabled by default. 
+ Fargate platform version 1.4.0 or later.

 **Amazon Elastic Kubernetes Service (Amazon EKS) on Amazon EC2** 
+ Your operating system must be Linux OS.

 **Amazon EKS on AWS Fargate**
+ Fargate platform 1.3.0 or later.

**Important**  
If KCL cannot collect CPU utilization metrics from workers, KCL will fall back to use throughput per worker to assign leases and balance the load across workers in the fleet. For more information, see [How KCL assigns leases to workers and balances the load](kcl-dynamoDB.md#kcl-assign-leases).

## Install and add dependencies
<a name="develop-kcl-consumers-java-installation"></a>

If you're using Maven, add the following dependency to your `pom.xml` file. Make sure you replaced 3.x.x to the latest KCL version. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

If you're using Gradle, add the following to your `build.gradle` file. Make sure you replaced 3.x.x to the latest KCL version. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

You can check for the latest version of the KCL on the [Maven Central Repository](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client).

## Implement the consumer
<a name="develop-kcl-consumers-java-implemetation"></a>

A KCL consumer application consists of the following key components:

**Topics**
+ [

### RecordProcessor
](#implementation-recordprocessor)
+ [

### RecordProcessorFactory
](#implementation-recordprocessorfactory)
+ [

### Scheduler
](#implementation-scheduler)
+ [

### Main Consumer Application
](#implementation-main)

### RecordProcessor
<a name="implementation-recordprocessor"></a>

RecordProcessor is the core component where your business logic for processing Kinesis data stream records resides. It defines how your application processes the data it receives from the Kinesis stream.

Key responsibilities:
+ Initialize processing for a shard
+ Process batches of records from the Kinesis stream
+ Shutdown processing for a shard (for example, when the shard splits or merges, or the lease is handed over to another host)
+ Handle checkpointing to track progress

The following shows an implementation example:

```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class SampleRecordProcessor implements ShardRecordProcessor {
    private static final String SHARD_ID_MDC_KEY = "ShardId";
    private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.shardId();
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size());
            processRecordsInput.records().forEach(r -> 
                log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
            );
            
            // Checkpoint periodically
            processRecordsInput.checkpointer().checkpoint();
        } catch (Throwable t) {
            log.error("Caught throwable while processing records. Aborting.", t);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Lost lease, so terminating.");
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }
}
```

The following is a detailed explanation of each method used in the example:

**initialize(InitializationInput initializationInput)**
+ Purpose: Set up any necessary resources or state for processing records.
+ When it's called: Once, when KCL assigns a shard to this record processor.
+ Key points:
  + `initializationInput.shardId()`: The ID of the shard this processor will handle.
  + `initializationInput.extendedSequenceNumber()`: The sequence number to start processing from.

**processRecords(ProcessRecordsInput processRecordsInput)**
+ Purpose: Process the incoming records and optionally checkpoint progress.
+ When it's called: Repeatedly, as long as the record processor holds the lease for the shard.
+ Key points:
  + `processRecordsInput.records()`: List of records to process.
  + `processRecordsInput.checkpointer()`: Used to checkpoint the progress.
  + Make sure that you handled any exceptions during processing to prevent KCL from failing.
  + This method should be idempotent, as the same record may be processed more than once in some scenarios, such as data that has not been checkpointed before unexpected worker crashes or restarts.
  + Always flush any buffered data before checkpointing to ensure data consistency.

**leaseLost(LeaseLostInput leaseLostInput)**
+ Purpose: Clean up any resources specific to processing this shard.
+ When it's called: When another Scheduler takes over the lease for this shard.
+ Key points:
  + Checkpointing is not allowed in this method.

**shardEnded(ShardEndedInput shardEndedInput)**
+ Purpose: Finish processing for this shard and checkpoint.
+ When it's called: When the shard splits or merges, indicating all data for this shard has been processed.
+ Key points:
  + `shardEndedInput.checkpointer()`: Used to perform the final checkpointing.
  + Checkpointing in this method is mandatory to complete processing.
  + Failing to flush data and checkpoint here may result in data loss or duplicate processing when the shard is reopened.

**shutdownRequested(ShutdownRequestedInput shutdownRequestedInput)**
+ Purpose: Checkpoint and clean up resources when KCL is shutting down.
+ When it's called: When KCL is shutting down, for example, when the application is terminating).
+ Key points:
  + `shutdownRequestedInput.checkpointer()`: Used to perform checkpointing before shutdown.
  + Make sure you implemented checkpointing in the method so that progress is saved before the application stops.
  + Failure to flush data and checkpoint here may result in data loss or reprocessing of records when the application restarts.

**Important**  
KCL 3.x ensures fewer data reprocessing when the lease is handed over from one worker to another worker by checkpointing before the previous worker is shut down. If you don’t implement the checkpointing logic in the `shutdownRequested()` method, you won’t see this benefit. Make sure that you have implemented a checkpointing logic inside the `shutdownRequested()` method.

### RecordProcessorFactory
<a name="implementation-recordprocessorfactory"></a>

RecordProcessorFactory is responsible for creating new RecordProcessor instances. KCL uses this factory to create a new RecordProcessor for each shard that the application needs to process.

Key responsibilities:
+ Create new RecordProcessor instances on demand
+ Make sure that each RecordProcessor is properly initialized

The following is an implementation example:

```
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new SampleRecordProcessor();
    }
}
```

In this example, the factory creates a new SampleRecordProcessor each time shardRecordProcessor() is called. You can extend this to include any necessary initialization logic.

### Scheduler
<a name="implementation-scheduler"></a>

Scheduler is a high-level component that coordinates all the activities of the KCL application. It's responsible for the overall orchestration of data processing.

Key responsibilities:
+ Manage the lifecycle of RecordProcessors
+ Handle lease management for shards
+ Coordinate checkpointing
+ Balance shard processing load across multiple workers of your application
+ Handle graceful shutdown and application termination signals

Scheduler is typically created and started in the Main Application. You can check the implementation example of Scheduler in the following section, Main Consumer Application. 

### Main Consumer Application
<a name="implementation-main"></a>

Main Consumer Application ties all the components together. It's responsible for setting up the KCL consumer, creating necessary clients, configuring the Scheduler, and managing the application's lifecycle.

Key responsibilities:
+ Set up AWS service clients (Kinesis, DynamoDB, CloudWatch)
+ Configure the KCL application
+ Create and start the Scheduler
+ Handle application shutdown

The following is an implementation example:

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;

public class SampleConsumer {
    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    public SampleConsumer(String streamName, Region region) {
        this.streamName = streamName;
        this.region = region;
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    public void run() {
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamName, 
            streamName, 
            kinesisClient, 
            dynamoDbAsyncClient,
            cloudWatchClient, 
            UUID.randomUUID().toString(), 
            new SampleRecordProcessorFactory()
        );

        Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

    public static void main(String[] args) {
        String streamName = "your-stream-name"; // replace with your stream name
        Region region = Region.US_EAST_1; // replace with your region
        new SampleConsumer(streamName, region).run();
    }
}
```

 KCL creates an Enhanced Fan-out (EFO) consumer with dedicated throughput by default. For more information about Enhanced Fan-out, see [Develop enhanced fan-out consumers with dedicated throughput](enhanced-consumers.md). If you have less than 2 consumers or don't need read propagation delays under 200 ms, you must set the following configuration in the scheduler object to use shared-throughput consumers:

```
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
```

The following code is an example of creating a scheduler object that uses shared-throughput consumers:

**Imports**:

```
import software.amazon.kinesis.retrieval.polling.PollingConfig;
```

**Code**:

```
Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );/
```

# Develop consumers with KCL in non-Java languages
<a name="develop-kcl-consumers-non-java"></a>

This section covers the implementation of consumers using Kinesis Client Library (KCL) in Python, Node.js, .NET, and Ruby.

KCL is a Java library. Support for languages other than Java is provided using a multi-language interface called the `MultiLangDaemon`. This daemon is Java-based and runs in the background when you are using a KCL with a language other than Java. Therefore, if you install KCL for non-Java languages and write your consumer app entirely in non-Java languages, you still need Java installed on your system because of the `MultiLangDaemon`. Further, `MultiLangDaemon` has some default settings you might need to customize for your use case (for example, the AWS region that it connects to). For more information about the `MultiLangDaemon` on GitHub, see [KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

While the core concepts remain the same across languages, there are some language-specific considerations and implementations. For the core concepts about the KCL consumer development, see [Develop consumers with KCL in Java](develop-kcl-consumers-java.md). For more detailed information about how to develop KCL consumers in Python, Node.js, .NET, and Ruby and latest updates, please refer to the following GitHub repositories:
+ Python: [amazon-kinesis-client-python](https://github.com/awslabs/amazon-kinesis-client-python)
+ Node.js: [amazon-kinesis-client-nodejs](https://github.com/awslabs/amazon-kinesis-client-nodejs)
+ .NET: [amazon-kinesis-client-net](https://github.com/awslabs/amazon-kinesis-client-net)
+ Ruby: [amazon-kinesis-client-ruby](https://github.com/awslabs/amazon-kinesis-client-ruby)

**Important**  
Don't use the following non-Java KCL library versions if you're using JDK 8. These versions contain a dependency (logback) that is incompatible with JDK 8.  
KCL Python 3.0.2 and 2.2.0
KCL Node.js 2.3.0
KCL .NET 3.1.0
KCL Ruby 2.2.0
We recommend that you use versions released either before or after these affected versions when working with JDK 8.

# Multi-stream processing with KCL
<a name="kcl-multi-stream"></a>

This section describes the required changes in KCL that allow you to create KCL consumer applications that can process more than one data stream at the same time.
**Important**  
Multi-stream processing is only supported in KCL 2.3 or later.
Multi-stream processing is *not* supported for KCL consumers written in non-Java languages that run with `multilangdaemon`.
Multi-stream processing is *not* supported in any versions of KCL 1.x.
+ **MultistreamTracker interface**
  + To build a consumer application that can process multiple streams at the same time, you must implement a new interface called [MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java). This interface includes the `streamConfigList` method that returns the list of data streams and their configurations to be processed by the KCL consumer application. Notice that the data streams that are being processed can be changed during the consumer application runtime. `streamConfigList` is called periodically by KCL to learn about the changes in data streams to process.
  + The `streamConfigList` populates the [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23) list.

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```
  + The `StreamIdentifier` and `InitialPositionInStreamExtended` are required fields, while `consumerArn` is optional. You must provide the `consumerArn` only if you are using KCL to implement an enhanced fan-out consumer application.
  + For more information about `StreamIdentifier`, see [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java\$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129). To create a `StreamIdentifier`, we recommend that you create a multistream instance from the `streamArn` and the `streamCreationEpoch` that is available in KCL 2.5.0 or later. In KCL v2.3 and v2.4, which don't support `streamArm`, create a multistream instance by using the format `account-id:StreamName:streamCreationTimestamp`. This format will be deprecated and no longer supported starting with the next major release.
  +  MultistreamTracker also includes a strategy for deleting leases of old streams in the lease table (formerStreamsLeasesDeletionStrategy). Notice that the strategy CANNOT be changed during the consumer application runtime. For more information, see [https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java).
+   [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java) is an application-wide class that you can use to specify all of the KCL configuration settings to be used when building your KCL consumer application for KCL version 2.x or later. `ConfigsBuilder` class now has support for the `MultistreamTracker` interface. You can initialize ConfigsBuilder either with the name of the one data stream to consume records from: 

  ```
  /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```  

Or you can initialize ConfigsBuilder with `MultiStreamTracker` if you want to implement a KCL consumer application that processes multiple streams at the same time.

```
* Constructor to initialize ConfigsBuilder with MultiStreamTracker
     * @param multiStreamTracker
     * @param applicationName
     * @param kinesisClient
     * @param dynamoDBClient
     * @param cloudWatchClient
     * @param workerIdentifier
     * @param shardRecordProcessorFactory
     */
    public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
            @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
            @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
            @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
        this.appStreamTracker = Either.left(multiStreamTracker);
        this.applicationName = applicationName;
        this.kinesisClient = kinesisClient;
        this.dynamoDBClient = dynamoDBClient;
        this.cloudWatchClient = cloudWatchClient;
        this.workerIdentifier = workerIdentifier;
        this.shardRecordProcessorFactory = shardRecordProcessorFactory;
    }
```
+ With multi-stream support implemented for your KCL consumer application, each row of the application's lease table now contains the shard ID and the stream name of the multiple data streams that this application processes.
+ When multi-stream support for your KCL consumer application is implemented, the leaseKey takes the following structure: `account-id:StreamName:streamCreationTimestamp:ShardId`. For example, `111111111:multiStreamTest-1:12345:shardId-000000000336`.

**Important**  
When your existing KCL consumer application is configured to process only one data stream, the `leaseKey` (which is the partition key for the lease table) is the shard ID. If you reconfigure an existing KCL consumer application to process multiple data streams, it breaks your lease table, because the `leaseKey` structure must be as follows: `account-id:StreamName:StreamCreationTimestamp:ShardId` to support multi-stream.

# Use the AWS Glue Schema registry with KCL
<a name="kcl-glue-schema"></a>

You can integrate Kinesis Data Streams with the AWS Glue Schema registry. The AWS Glue Schema registry lets you centrally discover, control, and evolve schemas, while ensuring data produced is continuously validated by a registered schema. A schema defines the structure and format of a data record. A schema is a versioned specification for reliable data publication, consumption, or storage. The AWS Glue Schema registry lets you improve end-to-end data quality and data governance within your streaming applications. For more information, see [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). One of the ways to set up this integration is through KCL for Java.

**Important**  
AWS Glue Schema registry integration for Kinesis Data Streams is only supported in KCL 2.3 or later.
AWS Glue Schema registry integration for Kinesis Data Streams is *not* supported for KCL consumers written in non-Java languages that run with `multilangdaemon`.
AWS Glue Schema registry integration for Kinesis Data Streams is *not* supported in any versions of KCL 1.x.

For detailed instructions on how to set up integration of Kinesis Data Streamswith AWS Glue Schema registry using KCL, see the "Interacting with Data Using the KPL/KCL Libraries" section in [Use Case: Integrating Amazon Kinesis Data Streams with the AWS Glue Schema Registry.](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)

# IAM permissions required for KCL consumer applications
<a name="kcl-iam-permissions"></a>

 You must add the following permissions to the IAM role or user associated with your KCL consumer application. 

 Security best practices for AWS dictate the use of fine-grained permissions to control access to different resources. AWS Identity and Access Management (IAM) lets you manage users and user permissions in AWS. An IAM policy explicitly lists actions that are allowed and the resources on which the actions are applicable.

The following table shows the minimum IAM permissions generally required for KCL consumer applications:


**Minimum IAM permissions for KCL consumer applications**  

| Service | Actions | Resources (ARNs) | Purpose | 
| --- | --- | --- | --- | 
| Amazon Kinesis Data Streams |  `DescribeStream` `DescribeStreamSummary` `RegisterStreamConsumer`  |  Kinesis data stream from which your KCL application will process the data.`arn:aws:kinesis:region:account:stream/StreamName`  |  Before attempting to read records, the consumer checks if the data stream exists, if it's active, and if the shards are contained in the data stream. Registers consumers to a shard.  | 
| Amazon Kinesis Data Streams |  `GetRecords` `GetShardIterator` `ListShards`  | Kinesis data stream from which your KCL application will process the data.`arn:aws:kinesis:region:account:stream/StreamName` |  Reads records from a shard.  | 
| Amazon Kinesis Data Streams |  `SubscribeToShard` `DescribeStreamConsumer` |  Kinesis data stream from which your KCL application will process the data. Add this action only if you use enhanced fan-out (EFO) consumers. `arn:aws:kinesis:region:account:stream/StreamName/consumer/*`  |  Subscribes to a shard for enhanced fan-out (EFO) consumers.  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `UpdateTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  Lease table (metadata table in DynamoDB created by KCL. `arn:aws:dynamodb:region:account:table/KCLApplicationName`  |  These actions are required for KCL to manag the lease table created in DynamoDB.  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  Worker metrics and coordinator state table (metadata tables in DynamoDB) created by KCL. `arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats` `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`  |  Thess actions are required for KCL to manage the worker metrics and coordinator state metadata tables in DynamoDB.  | 
| Amazon DynamoDB | `Query` |  Global secondary index on the lease table. `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`  |  This action is required for KCL to read the global secondary index of the lease table created in DynamoDB.  | 
| Amazon CloudWatch | `PutMetricData` |  \$1  |  Upload metrics to CloudWatch that are useful for monitoring the application. The asterisk (\$1) is used because there is no spcific resource in CloudWatch on which the `PutMetricData` action is invoked.   | 

**Note**  
Replace "region," "account," "StreamName," and "KCLApplicationName" in the ARNs with your own AWS Region, AWS account number, Kinesis data stream name, and KCL application name respectively. KCL 3.x creates two more metadata tables in DynamoDB. For details about DynamoDB metadata tables created by KCL, see [DynamoDB metadata tables and load balancing in KCL](kcl-dynamoDB.md). If you use configurations to customize names of the metadata tables created by KCL, use those specified table names instead of KCL application name. 

The following is an example policy document for a KCL consumer application. 

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME/consumer/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:UpdateTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-WorkerMetricStats",
    "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-CoordinatorState"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:Query"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME/index/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Before you use this example policy, check the following items:
+ Replace REGION with your AWS Region (for example, us-east-1).
+ Replace ACCOUNT\$1ID with your AWS account ID.
+ Replace STREAM\$1NAME with the name of your Kinesis data stream.
+ Replace CONSUMER\$1NAME with the name of your consumer, typically your application name when using KCL.
+ Replace KCL\$1APPLICATION\$1NAME with the name of your KCL application.

# KCL configurations
<a name="kcl-configuration"></a>

You can set configuration properties to customize Kinesis Client Library's functionality to meet your specific requirements. The following table describes configuration properties and classes.

**Important**  
In KCL 3.x, the load balancing algorithm aims to achieve even CPU utilization across workers, not an equal number of leases per worker. Setting `maxLeasesForWorker` too low, you might limit KCL's ability to balance the workload effectively. If you use the `maxLeasesForWorker` configuration, consider increasing its value to allow for the best possible load distribution.


**This table shows the configuration properties for KCL**  

| Configuration property | Configuration class | Description | Default value | 
| --- | --- | --- | --- | 
| applicationName | ConfigsBuilder | The name for this the KCL application. Used as the default for the tableName and consumerName. | Not applicable | 
| tableName | ConfigsBuilder |  Allows overriding the table name used for the Amazon DynamoDB lease table.  | Not applicable | 
| streamName | ConfigsBuilder |  The name of the stream that this application processes records from.  | Not applicable | 
| workerIdentifier | ConfigsBuilder |  A unique identifier that represents this instantiation of the application processor. This must be unique.  | Not applicable | 
| failoverTimeMillis | LeaseManagementConfig |  The number of milliseconds that must pass before you can consider a lease owner to have failed. For applications that have a large number of shards, this may be set to a higher number to reduce the number of DynamoDB IOPS required for tracking leases.  | 10,000 (10 seconds) | 
| shardSyncIntervalMillis | LeaseManagementConfig |  The time between shard sync calls.  | 60,000 (60 seconds) | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig |  When set, leases are removed as soon as the child leases have started processing.  | TRUE | 
| ignoreUnexpectedChildShards | LeaseManagementConfig |  When set, child shards that have an open shard are ignored. This is primarily for DynamoDB Streams.  | FALSE | 
| maxLeasesForWorker | LeaseManagementConfig |  The maximum number of leases a single worker should accept. Setting it too low may cause data loss if workers can't process all shards, and lead to a suboptimal lease assignment among workers. Consider total shard count, number of workers, and worker processing capacity when configuring it.  | Unlimited | 
| maxLeaseRenewalThreads | LeaseManagementConfig |  Controls the size of the lease renewer thread pool. The more leases that your application could take, the larger this pool should be.  | 20 | 
| billingMode | LeaseManagementConfig |  Determines the capacity mode of the lease table created in DynamoDB. There are two options: on-demand mode (PAY\$1PER\$1REQUEST) and provisioned mode. We recommend using the default setting of on-demand mode because it automatically scales to accommodate your workload without the need for capacity planning.  | PAY\$1PER\$1REQUEST (on-demand mode) | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | The DynamoDB read capacity that is used if the Kinesis Client Library needs to create a new DynamoDB lease table with provisioned capacity mode. You can ignore this configuration if you are using the default on-demand capacity mode in billingMode configuration. | 10 | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | The DynamoDB read capacity that is used if the Kinesis Client Library needs to create a new DynamoDB lease table. You can ignore this configuration if you are using the default on-demand capacity mode in billingMode configuration. | 10 | 
| initialPositionInStreamExtended | LeaseManagementConfig |  The initial position in the stream that the application should start at. This is only used during initial lease creation.  |  InitialPositionInStream.TRIM\$1HORIZON  | 
| reBalanceThresholdPercentage | LeaseManagementConfig |  A percentage value that determines when the load balancing algorithm should consider reassigning shards among workers. This is a new configuration introduced in KCL 3.x.  | 10 | 
| dampeningPercentage | LeaseManagementConfig |  A percentage value that is used to dampen the amount of load that will be moved from the overloaded worker in a single rebalance operation. This is a new configuration introduced in KCL 3.x.  | 60 | 
| allowThroughputOvershoot | LeaseManagementConfig |  Determines whether additional lease still needs to be taken from the overloaded worker even if it causes total amount of lease throughput taken to exceed the desired throughput amount. This is a new configuration introduced in KCL 3.x.  | TRUE | 
| disableWorkerMetrics | LeaseManagementConfig |  Determines if KCL should ignore resource metrics from workers (such as CPU utilization) when reassigning leases and load balancing. Set this to TRUE if you want to prevent KCL from load balancing based on CPU utilization. This is a new configuration introduced in KCL 3.x.  | FALSE | 
| maxThroughputPerHostKBps | LeaseManagementConfig |  Amount of the maximum throughput to assign to a worker during the lease assignment. This is a new configuration introduced in KCL 3.x.  | Unlimited | 
| isGracefulLeaseHandoffEnabled | LeaseManagementConfig |  Controls the behavior of lease handoff between workers. When set to true, KCL will attempt to gracefully transfer leases by allowing the shard's RecordProcessor sufficient time to complete processing before handing off the lease to another worker. This can help ensure data integrity and smooth transitions but may increase handoff time. When set to false, the lease will be handed off immediately without waiting for the RecordProcessor to shut down gracefully. This can lead to faster handoffs but may risk incomplete processing. Note: Checkpointing must be implemented inside the shutdownRequested() method of the RecordProcessor to get benefited from the graceful lease handoff feature. This is a new configuration introduced in KCL 3.x.  | TRUE | 
| gracefulLeaseHandoffTimeoutMillis | LeaseManagementConfig |  Specifies the minimum time (in milliseconds) to wait for the current shard's RecordProcessor to gracefully shut down before forcefully transferring the lease to the next owner. If your processRecords method typically runs longer than the default value, consider increasing this setting. This ensures the RecordProcessor has sufficient time to complete its processing before the lease transfer occurs. This is a new configuration introduced in KCL 3.x.  | 30,000 (30 seconds) | 
| maxRecords | PollingConfig |  Allows setting the maximum number of records that Kinesis returns.  | 10,000 | 
| retryGetRecordsInSeconds | PollingConfig |  Configures the delay between GetRecords attempts for failures.  | None | 
| maxGetRecordsThreadPool | PollingConfig |  The thread pool size used for GetRecords.  | None | 
| idleTimeBetweenReadsInMillis | PollingConfig |  Determines how long KCL waits between GetRecords calls to poll the data from data streams. The unit is milliseconds.  | 1,500 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig |  When set, the record processor is called even when no records were provided from Kinesis.  | FALSE | 
| parentShardPollIntervalMillis | CoordinatorConfig |  How often a record processor should poll to see if the parent shard has been completed. The unit is milliseconds.  | 10,000 (10 seconds) | 
| skipShardSyncAtWorkerInitializationIfLeaseExist | CoordinatorConfig |  Disable synchronizing shard data if the lease table contains existing leases.  |  FALSE  | 
| shardPrioritization | CoordinatorConfig |  Which shard prioritization to use.  |  NoOpShardPrioritization  | 
| ClientVersionConfig | CoordinatorConfig |  Determines which KCL version compatibility mode the application will run in. This configuration is only for the migration from previous KCL versions. When migrating to 3.x, you need to set this configuration to `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X`. You can remove this configuration when you complete the migration.  | CLIENT\$1VERSION\$1CONFIG\$13X | 
| taskBackoffTimeMillis | LifecycleConfig |  The time to wait to retry failed KCL tasks. The unit is milliseconds.  | 500 (0.5 seconds) | 
| logWarningForTaskAfterMillis | LifecycleConfig |  How long to wait before a warning is logged if a task hasn't completed.  | None | 
| listShardsBackoffTimeInMillis | RetrievalConfig | The number of milliseconds to wait between calls to ListShards when failures occur. The unit is milliseconds. | 1,500 (1.5 seconds) | 
| maxListShardsRetryAttempts | RetrievalConfig | The maximum number of times that ListShards retries before giving up. | 50 | 
| metricsBufferTimeMillis | MetricsConfig |  Specifies the maximum duration (in milliseconds) to buffer metrics before publishing them to CloudWatch.  | 10,000 (10 seconds) | 
| metricsMaxQueueSize | MetricsConfig |  Specifies the maximum number of metrics to buffer before publishing to CloudWatch.  | 10,000 | 
| metricsLevel | MetricsConfig |  Specifies the granularity level of CloudWatch metrics to be enabled and published.  Possible values: NONE, SUMMARY, DETAILED.  |  MetricsLevel.DETAILED  | 
| metricsEnabledDimensions | MetricsConfig |  Controls allowed dimensions for CloudWatch Metrics.  | All dimensions | 

**Discontinued configurations in KCL 3.x**

The following configuration properties are discontinued in KCL 3.x:


**The table shows discontinued configuration properties for KCL 3.x**  

| Configuration property | Configuration class | Description | 
| --- | --- | --- | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig |  The maximum number of leases an application should attempt to steal at one time. KCL 3.x will ignore this configuration and reassign leases based on the resource utilization of workers.  | 
| enablePriorityLeaseAssignment | LeaseManagementConfig |  Controls whether workers should prioritize taking very expired leases (leases not renewed for 3x the failover time) and new shard leases, regardless of target lease counts but still respecting max lease limits. KCL 3.x will ignore this configuration and always spread expired leases across workers.  | 

**Important**  
You still must have the discontiuned configuration properties during the migration from previous KCL verisons to KCL 3.x. During the migration, the KCL worker will first start with the KCL 2.x compatible mode and switch to the KCL 3.x functionality mode when it detects that all KCL workers of the application are ready to run KCL 3.x. These discontinued configurations are needed while KCL workers are running the KCL 2.x compatible mode.

# KCL version lifecycle policy
<a name="kcl-version-lifecycle-policy"></a>

This topic outlines the version lifecycle policy for Amazon Kinesis Client Library (KCL). AWS regularly provides new releases for KCL versions to support new features and enhancements, bug fixes, security patches, and dependency updates. We recommend that you stay up-to-date with KCL versions to keep up with the latest features, security updates, and underlying dependencies. We **don't** recommend continued use of an unsupported KCL version.

The lifecycle for major KCL versions consists of the following three phases:
+ **General availability (GA)** – During this phase, the major version is fully supported. AWS provides regular minor and patch version releases that include support for new features or API updates for Kinesis Data Streams, as well as bug and security fixes.
+ **Maintenance mode** – AWS limits patch version releases to address critical bug fixes and security issues only. The major version won't receive updates for new features or APIs of Kinesis Data Streams.
+ **End-of-support** – The major version will no longer receive updates or releases. Previously published releases will continue to be available through public package managers and the code will remain on GitHub. Use of a version which has reached end-of-support is done at the user’s discretion. We recommend that you upgrade to the latest major version.


| Major version | Current phase | Release date | Maintenance mode date | End-of-support date | 
| --- | --- | --- | --- | --- | 
| KCL 1.x | Maintenance mode | 2013-12-19 | 2025-04-17 | 2026-01-30 | 
| KCL 2.x | General availability | 2018-08-02 | -- | -- | 
| KCL 3.x | General availability | 2024-11-06 | -- | -- | 

# Migrate from previous KCL versions
<a name="kcl-migration-previous-versions"></a>

This topic explains how to migrate from previous versions of the Kinesis Client Library (KCL). 

## What's new in KCL 3.0?
<a name="kcl-migration-new-3-0"></a>

Kinesis Client Library (KCL) 3.0 introduces several major enhancements compared to previous versions:
+  It lowers compute costs for consumer applications by automatically redistributing the work from over-utilized workers to under-utilized workers in the consumer application fleet. This new load balancing algorithm ensures the evenly distributed CPU utilization across workers and removes the need to over-provision workers.
+  It reduces the DynamoDB cost associated with KCL by optimizing read operations on the lease table.
+ It minimizes reprocessing of data when leases are reassigned to another worker by allowing the current worker to complete checkpointing the records that it has processed.
+  It uses AWS SDK for Java 2.x for improved performance and security features, fully removing the dependency on AWS SDK for Java 1.x.

For more information, see [KCL 3.0 release note](https://github.com/awslabs/amazon-kinesis-client/blob/master/CHANGELOG.md).

**Topics**
+ [

## What's new in KCL 3.0?
](#kcl-migration-new-3-0)
+ [

# Migrate from KCL 2.x to KCL 3.x
](kcl-migration-from-2-3.md)
+ [

# Roll back to the previous KCL version
](kcl-migration-rollback.md)
+ [

# Roll forward to KCL 3.x after a rollback
](kcl-migration-rollforward.md)
+ [

# Best practices for the lease table with provisioned capacity mode
](kcl-migration-lease-table.md)
+ [

# Migrating from KCL 1.x to KCL 3.x
](kcl-migration-1-3.md)

# Migrate from KCL 2.x to KCL 3.x
<a name="kcl-migration-from-2-3"></a>

This topic provides step-by-step instructions to migrate your consumer from KCL 2.x to KCL 3.x. KCL 3.x supports in-place migration of KCL 2.x consumers. You can continue consuming the data from your Kinesis data stream while migrating your workers in a rolling manner.

**Important**  
KCL 3.x maintains the same interfaces and methods as KCL 2.x. Therefore you don’t have to update your record processing code during the migration. However, you must set the proper configuration and check the required steps for the migration. We highly recommend that you follow the following migration steps for a smooth migration experience.

## Step 1: Prerequisites
<a name="kcl-migration-from-2-3-prerequisites"></a>

Before you start using KCL 3.x, make sure that you have the following:
+ Java Development Kit (JDK) 8 or later
+ AWS SDK for Java 2.x
+ Maven or Gradle for dependency management

**Important**  
Do not use AWS SDK for Java version 2.27.19 to 2.27.23 with KCL 3.x. These versions include an issue that causes an exception error related to KCL's DynamoDB usage. We recommend that you use the AWS SDK for Java version 2.28.0 or later to avoid this issue. 

## Step 2: Add dependencies
<a name="kcl-migration-from-2-3-dependencies"></a>

If you're using Maven, add the following dependency to your `pom.xml` file. Make sure you replaced 3.x.x to the latest KCL version. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

If you're using Gradle, add the following to your `build.gradle` file. Make sure you replaced 3.x.x to the latest KCL version. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

You can check for the latest version of the KCL on the [Maven Central Repository](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client).

## Step 3: Set up the migration-related configuration
<a name="kcl-migration-from-2-3-configuration"></a>

To migrate from KCL 2.x to KCL 3.x, you must set the following configuration parameter:
+ CoordinatorConfig.clientVersionConfig: This configuration determines which KCL version compatibility mode the application will run in. When migrating from KCL 2.x to 3.x, you need to set this configuration to `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X`. To set this configuration, add the following line when creating your scheduler object:

```
configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X)
```

The following is an example of how to set the `CoordinatorConfig.clientVersionConfig` for migrating from KCL 2.x to 3.x. You can adjust other configurations as needed based on your specific requirements:

```
Scheduler scheduler = new Scheduler(
    configsBuilder.checkpointConfig(),
    configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X),
    configsBuilder.leaseManagementConfig(),
    configsBuilder.lifecycleConfig(),
    configsBuilder.metricsConfig(),
    configsBuilder.processorConfig(),
    configsBuilder.retrievalConfig()
);
```

It's important that all workers in your consumer application use the same load balancing algorithm at a given time because KCL 2.x and 3.x use different load balancing algorithms. Running workers with different load balancing algorithms can cause suboptimal load distribution as the two algorithms operate independently.

This KCL 2.x compatibility setting allows your KCL 3.x application to run in a mode compatible with KCL 2.x and use the load balancing algorithm for KCL 2.x until all workers in your consumer application have been upgraded to KCL 3.x. When the migration is complete, KCL will automatically switch to full KCL 3.x functionality mode and start using a new KCL 3.x load balancing algorithm for all running workers.

**Important**  
If you are not using `ConfigsBuilder` but creating a `LeaseManagementConfig` object to set configurations, you must add one more parameter called `applicationName` in KCL version 3.x or later. For details, see [Compilation error with the LeaseManagementConfig constructor](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compiliation-error-leasemanagementconfig). We recommend using `ConfigsBuilder` to set KCL configurations. `ConfigsBuilder` provides a more flexible and maintainable way to configure your KCL application.

## Step 4: Follow best practices for the shutdownRequested() method implementation
<a name="kcl-migration-from-2-3-best-practice"></a>

KCL 3.x introduces a feature called *graceful lease handoff* to minimize the reprocessing of data when a lease is handed over to another worker as part of the lease reassignment process. This is achieved by checkpointing the last processed sequence number in the lease table before the lease handoff. To ensure the graceful lease handoff works properly, you must make sure that you invoke the `checkpointer` object within the `shutdownRequested` method in your `RecordProcessor` class. If you're not invoking the `checkpointer` object within the `shutdownRequested` method, you can implement it as illustrated in the following example. 

**Important**  
The following implementation example is a minimal requirement for the graceful lease handoff. You can extend it to include additional logic related to the checkpointing if needed. If you are performing any asynchronous processing, make sure that all delivered records to the downstream were processed before invoking checkpointing. 
While graceful lease handoff significantly reduces the likelihood of data reprocessing during lease transfers, it does not entirely eliminate this possibility. To preserve data integrity and consistency, design your downstream consumer applications to be idempotent. This means they should be able to handle potential duplicate record processing without adverse effects on the overall system.

```
/**
 * Invoked when either Scheduler has been requested to gracefully shutdown
 * or lease ownership is being transferred gracefully so the current owner
 * gets one last chance to checkpoint.
 *
 * Checkpoints and logs the data a final time.
 *
 * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
 *                               before the shutdown is completed.
 */
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
    try {
       // Ensure that all delivered records are processed 
       // and has been successfully flushed to the downstream before calling 
       // checkpoint
       // If you are performing any asynchronous processing or flushing to
       // downstream, you must wait for its completion before invoking
       // the below checkpoint method.
        log.info("Scheduler is shutting down, checkpointing.");
        shutdownRequestedInput.checkpointer().checkpoint();
    } catch (ShutdownException | InvalidStateException e) {
        log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
    } 
}
```

## Step 5: Check the KCL 3.x prerequisites for collecting worker metrics
<a name="kcl-migration-from-2-3-worker-metrics"></a>

KCL 3.x collects CPU utilization metrics such as CPU utilization from workers to balance the load across workers evenly. Consumer application workers can run on Amazon EC2, Amazon ECS, Amazon EKS, or AWS Fargate. KCL 3.x can collect CPU utilization metrics from workers only when the following prerequisites are met:

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ Your operating system must be Linux OS.
+ You must enable [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html) in your EC2 instance.

 **Amazon Elastic Container Service (Amazon ECS) on Amazon EC2**
+ Your operating system must be Linux OS.
+ You must enable [ECS task metadata endpoint version 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html). 
+ Your Amazon ECS container agent version must be 1.39.0 or later.

 **Amazon ECS on AWS Fargate**
+ You must enable [Fargate task metadata endpoint version 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html). If you use Fargate platform version 1.4.0 or later, this is enabled by default. 
+ Fargate platform version 1.4.0 or later.

 **Amazon Elastic Kubernetes Service (Amazon EKS) on Amazon EC2** 
+ Your operating system must be Linux OS.

 **Amazon EKS on AWS Fargate**
+ Fargate platform 1.3.0 or later.

**Important**  
If KCL 3.x cannot collect CPU utilization metrics from workers because prerequisites are not met, it will rebalance the load the throughput level per lease. This fallback rebalancing mechanism will make sure all workers will get similar total throughput levels from leases assigned to each worker. For more information, see [How KCL assigns leases to workers and balances the load](kcl-dynamoDB.md#kcl-assign-leases).

## Step 6: Update IAM permissions for KCL 3.x
<a name="kcl-migration-from-2-3-IAM-permissions"></a>

You must add the following permissions to the IAM role or policy associated with your KCL 3.x consumer application. This involves updating the existing IAM policy used by the KCL application. For more information, see [IAM permissions required for KCL consumer applications](kcl-iam-permissions.md).

**Important**  
Your existing KCL applications might not have the following IAM actions and resources added in the IAM policy because they were not needed in KCL 2.x. Make sure that you have added them before running your KCL 3.x application:  
Actions: `UpdateTable`  
Resources (ARNs): `arn:aws:dynamodb:region:account:table/KCLApplicationName`
Actions: `Query`  
Resources (ARNs): `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`
Actions: `CreateTable`, `DescribeTable`, `Scan`, `GetItem`, `PutItem`, `UpdateItem`, `DeleteItem`  
Resources (ARNs): `arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats`, `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`
Replace "region," "account," and "KCLApplicationName" in the ARNs with your own AWS Region, AWS account number, and KCL application name respectively. If you use configurations to customize names of the metadata tables created by KCL, use those specified table names instead of KCL application name.

## Step 7: Deploy KCL 3.x code to your workers
<a name="kcl-migration-from-2-3-IAM-deploy"></a>

After you have set the configuration required for the migration and completed the all previous migration checklists, you can build and deploy your code to your workers.

**Note**  
If you see a compilation error with the `LeaseManagementConfig` constructor, see [Compilation error with the LeaseManagementConfig constructor](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compilation-error-leasemanagementconfig) for troubleshooting information.

## Step 8: Complete the migration
<a name="kcl-migration-from-2-3-finish"></a>

During the deployment of KCL 3.x code, KCL continues using the lease assignment algorithm from KCL 2.x. When you have successfully deployed KCL 3.x code to all of your workers, KCL automatically detects this and switches to the new lease assignment algorithm based on resource utilization of the workers. For more details about the new lease assignment algorithm, see [How KCL assigns leases to workers and balances the load](kcl-dynamoDB.md#kcl-assign-leases).

During the deployment, you can monitor the migration process with the following metrics emitted to CloudWatch. You can monitor metrics under the `Migration` operation. All metrics are per-KCL-application metrics and set to the `SUMMARY` metric level. If the `Sum` statistic of the `CurrentState:3xWorker` metric matches the total number of workers in your KCL application, it indicates that the migration to KCL 3.x has successfully completed.

**Important**  
 It takes at least 10 minutes for KCL to switch to the new leasee assignment algorithm after all workers are ready to run it.


**CloudWatch metrics for the KCL migration process**  

| Metrics | Description | 
| --- | --- | 
| CurrentState:3xWorker |  The number of KCL workers successfully migrated to KCL 3.x and running the new lease assignment algorithm. If the `Sum` count of this metric matches the total number of your workers, it indicates that the migration to KCL 3.x has successfully completed. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| CurrentState:2xCompatibleWorker |  The number of KCL workers running in KCL 2.x compatible mode during the migration process. A non-zero value for this metric indicates that the migration is still in progress. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| Fault |  The number of exceptions encountered during the migration process. Most of these exceptions are transient errors, and KCL 3.x will automatically retry to complete the migration. If you observe a persistent `Fault` metric value, review your logs from the migration period for further troubleshooting. If the issue continues, contact Support. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| GsiStatusReady |  The status of the global secondary index (GSI) creation on the lease table. This metric indicates whether the GSI on the lease table has been created, a prerequisite to run KCL 3.x. The value is 0 or 1, with 1 indicating successful creation. During a rollback state, this metric will not be emitted. After you roll forward again, you can resume monitoring this metric. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| workerMetricsReady |  Status of worker metrics emission from all workers. The metrics indicates whether all workers are emitting metrics like CPU utilization. The value is 0 or 1, with 1 indicating all workers are successfully emitting metrics and ready for the new lease assignment algorithm. During a rollback state, this metric will not be emitted. After you roll forward again, you can resume monitoring this metric. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html)  | 

KCL provides rollback capability to the 2.x compatible mode during migration. After successful migration to KCL 3.x is successful, we recommend that you remove the `CoordinatorConfig.clientVersionConfig` setting of `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` if rollback is no longer needed. Removing this configuration stops the emission of migration-related metrics from the KCL application.

**Note**  
We recommend that you monitor your application's performance and stability for a period during the migration and after completing the migration. If you observe any issues, you can rollback workers to use KCL 2.x compatible functionality using the [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

# Roll back to the previous KCL version
<a name="kcl-migration-rollback"></a>

This topic explains the steps to roll back your consumer back to the previous version. When you need to roll back, there is a two-step process: 

1. Run the [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Redeploy previous KCL version code (optional).

## Step 1: Run the KCL Migration Tool
<a name="kcl-migration-rollback-tool"></a>

When you need to roll back to the previous KCL version, you must run the KCL Migration Tool. The KCL Migration Tool does two important tasks:
+ It removes a metadata table called worker metrics table and global secondary index on the lease table in DynamoDB. These two artifacts are created by KCL 3.x but are not needed when you roll back to the previous version.
+ It makes all workers run in a mode compatible with KCL 2.x and start using the load balancing algorithm used in previous KCL versions. If you have issues with the new load balancing algorithm in KCL 3.x, this will mitigate the issue immediately.

**Important**  
The coordinator state table in DynamoDB must exist and must not be deleted during the migration, rollback, and rollforward process. 

**Note**  
It's important that all workers in your consumer application use the same load balancing algorithm at a given time. The KCL Migration Tool makes sure that all workers in your KCL 3.x consumer application switch to the KCL 2.x compatible mode so that all workers run the same load balancing algorithm during the rolling depayment back to your previous KCL version.

You can download the [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) in the scripts directory of the [KCL GitHub repository](https://github.com/awslabs/amazon-kinesis-client/tree/master). The script can be run from any of your workers or any host which has the required permissions to write to the coordinator state table, delete the worker metrics table, and update the lease table. You can refer to [IAM permissions required for KCL consumer applications](kcl-iam-permissions.md) for required IAM permission to run the script. You must run the script only once per KCL application. You can run the KCL Migration Tool with the following command: 

```
python3 ./KclMigrationTool.py --region <region> --mode rollback [--application_name <applicationName>] [--lease_table_name <leaseTableName>] [--coordinator_state_table_name <coordinatorStateTableName>] [--worker_metrics_table_name <workerMetricsTableName>]
```

**Parameters**
+ --region: Replace `<region>` with your AWS Region.
+ --application\$1name: This parameter is required if you're using default names for your DynamoDB metadata tables (lease table, coordinator state table, and worker metrics table). If you have specified custom names for these tables, you can omit this parameter. Replace `<applicationName>` with your actual KCL application name. The tool uses this name to derive the default table names if custom names are not provided.
+ --lease\$1table\$1name (optional): This parameter is needed when you have set a custom name for the lease table in your KCL configuration. If you're using the default table name, you can omit this parameter. Replace `leaseTableName` with the custom table name you specified for your lease table.
+ --coordinator\$1state\$1table\$1name (optional): This parameter is needed when you have set a custom name for the coordinator state table in your KCL configuration. If you're using the default table name, you can omit this parameter. Replace `<coordinatorStateTableName>` with the custom table name you specified for your coordinator state table. 
+ --worker\$1metrics\$1table\$1name (optional): This parameter is needed when you have set a custom name for the worker metrics table in your KCL configuration. If you're using the default table name, you can omit this parameter. Replace `<workerMetricsTableName>` with the custom table name you specified for your worker metrics table. 

## Step 2: Redeploy the code with the previous KCL version (optional)
<a name="kcl-migration-rollback-redeploy"></a>

 After running the KCL Migration Tool for a rollback, you'll see one of these messages:
+ **Message 1:** “Rollback completed. Your KCL application was running the KCL 2.x compatible mode. If you don't see mitigation of any regression, please rollback to your previous application binaries by deploying the code with your previous KCL version.”
  + **Required action: **This means that your workers were running in the KCL 2.x compatible mode. If the issue persists, redeploy the code with the previous KCL version to your workers.
+ **Message 2: **“Rollback completed. Your KCL application was running the KCL 3.x functionality mode. Rollback to the previous application binaries is not necessary, unless you don’t see any mitigation for the issue within 5 minutes. If you still have an issue, please rollback to your previous application binaries by deploying the code with your previous KCL version.”
  + **Required action: **This means that your workers were running in KCL 3.x mode and the KCL Migration Tool switched all workers to KCL 2.x compatible mode. If the issue is resolved, you don't need to redeploy the code with the previous KCL version. If the issue persists, redeploy the code with the previous KCL version to your workers.

 

# Roll forward to KCL 3.x after a rollback
<a name="kcl-migration-rollforward"></a>

This topic explains the steps to roll forward your consumer back to KCL 3.x after a rollback. When you need to roll forward, you must go through a two-step process: 

1. Run the [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py). 

1. Deploy the code with KCL 3.x.

## Step 1: Run the KCL Migration Tool
<a name="kcl-migration-rollback-tool"></a>

Run the KCL Migration Tool. KCL Migration Tool with the following command to roll forward to KCL 3.x:

```
python3 ./KclMigrationTool.py --region <region> --mode rollforward [--application_name <applicationName>] [--coordinator_state_table_name <coordinatorStateTableName>]
```

**Parameters**
+ --region: Replace `<region>` with your AWS Region.
+ --application\$1name: This parameter is required if you're using default names for your coordinator state table. If you have specified custom names for the coordinator state table, you can omit this parameter. Replace `<applicationName>` with your actual KCL application name. The tool uses this name to derive the default table names if custom names are not provided.
+ --coordinator\$1state\$1table\$1name (optional): This parameter is needed when you have set a custom name for the coordinator state table in your KCL configuration. If you're using the default table name, you can omit this parameter. Replace `<coordinatorStateTableName>` with the custom table name you specified for your coordinator state table. 

After you run the migration tool in roll-forward mode, KCL creates the following DynamoDB resources required for KCL 3.x:
+ A Global Secondary Index on the lease table
+ A worker metrics table

## Step 2: Deploy the code with KCL 3.x
<a name="kcl-migration-rollback-redeploy"></a>

After running the KCL Migration Tool for a roll forward, deploy your code with KCL 3.x to your workers. Follow [Step 8: Complete the migration](kcl-migration-from-2-3.md#kcl-migration-from-2-3-finish) to complete your migration.

# Best practices for the lease table with provisioned capacity mode
<a name="kcl-migration-lease-table"></a>

If the lease table of your KCL application was switched to provisioned capacity mode, KCL 3.x creates a global secondary index on the lease table with the provisioned billing mode and the same read capacity units (RCU) and write capacity units (WCU) as the base lease table. When the global secondary index is created, we recommend that you monitor the actual usage on the global secondary index in the DynamoDB console and adjust the capacity units if needed. For a more detailed guide about switching the capacity mode of DynamoDB metadata tables created by KCL, see [DynamoDB capacity mode for metadata tables created by KCL](kcl-dynamoDB.md#kcl-capacity-mode). 

**Note**  
By default, KCL creates metadata tables such as the lease table, worker metrics table, and coordinator state table, and the global secondary index on the lease table using the on-demand capacity mode. We recommend that you use the on-demand capacity mode to automatically adjust the capacity based on your usage changes. 

# Migrating from KCL 1.x to KCL 3.x
<a name="kcl-migration-1-3"></a>

This topic explains the instructions to migrate your consumer from KCL 1.x to KCL 3.x. KCL 1.x uses different classes and interfaces compared to KCL 2.x and KCL 3.x. You must migrate the record processor, record processor factory, and worker classes to the KCL 2.x/3.x compatible format first, and follow the migration steps for KCL 2.x to KCL 3.x migration. You can directly upgrade from KCL 1.x to KCL 3.x.
+ **Step 1: Migrate the record processor**

  Follow the [Migrate the record processor](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) section in the [Migrate consumers from KCL 1.x to KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) page.
+ **Step 2: Migrate the record processor factory**

  Follow the [Migrate the record processor factory](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-factory-migration) section in the [Migrate consumers from KCL 1.x to KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) page.
+ **Step 3: Migrate the worker**

  Follow the [Migrate the worker](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#worker-migration) section in the [Migrate consumers from KCL 1.x to KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) page.
+ **Step 4: Migrate KCL 1.x configuration **

  Follow the [Configure the Amazon Kinesis client](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration) section in the [Migrate consumers from KCL 1.x to KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) page.
+ **Step 5: Check idle time removal and client configuration removals**

  Follow the [Idle time removal ](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#idle-time-removal)and [Client configuration removals](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration-removals) sections in the [Migrate consumers from KCL 1.x to KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) page.
+ **Step 6: Follow the step-by-step instructions in the KCL 2.x to KCL 3.x migration guide**

  Follow instructions on the [Migrate from KCL 2.x to KCL 3.x](kcl-migration-from-2-3.md) page to complete the migration. If you need to roll back to the previous KCL version or roll forward to KCL 3.x after a rollback, refer to [Roll back to the previous KCL version](kcl-migration-rollback.md) and [Roll forward to KCL 3.x after a rollback](kcl-migration-rollforward.md).

**Important**  
Do not use AWS SDK for Java version 2.27.19 to 2.27.23 with KCL 3.x. These versions include an issue that causes an exception error related to KCL's DynamoDB usage. We recommend that you use the AWS SDK for Java version 2.28.0 or later to avoid this issue. 

# Previous KCL version documentation
<a name="kcl-archive"></a>

The following topics have been archived. To see current Kinesis Client Library documentation, see [Use Kinesis Client Library](kcl.md).

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

**Topics**
+ [

# KCL 1.x and 2.x information
](shared-throughput-kcl-consumers.md)
+ [

# Develop custom consumers with shared throughput
](shared-throughput-consumers.md)
+ [

# Migrate consumers from KCL 1.x to KCL 2.x
](kcl-migration.md)

# KCL 1.x and 2.x information
<a name="shared-throughput-kcl-consumers"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

One of the methods of developing custom consumer applications that can process data from KDS data streams is to use the Kinesis Client Library (KCL).

**Topics**
+ [

## About KCL (previous versions)
](#shared-throughput-kcl-consumers-overview)
+ [

## KCL previous versions
](#shared-throughput-kcl-consumers-versions)
+ [

## KCL concepts (previous versions)
](#shared-throughput-kcl-consumers-concepts)
+ [

## Use a lease table to track the shards processed by the KCL consumer application
](#shared-throughput-kcl-consumers-leasetable)
+ [

## Process multiple data streams with the same KCL 2.x for Java consumer application
](#shared-throughput-kcl-multistream)
+ [

## Use the KCL with the AWS Glue Schema Registry
](#shared-throughput-kcl-consumers-glue-schema-registry)

**Note**  
For both KCL 1.x and KCL 2.x, it is recommended that you upgrade to the latest KCL 1.x version or KCL 2.x version, depending on your usage scenario. Both KCL 1.x and KCL 2.x are regularly updated with newer releases that include the latest dependency and security patches, bug fixes, and backward-compatible new features. For more information, see [https://github.com/awslabs/amazon-kinesis-client/releases](https://github.com/awslabs/amazon-kinesis-client/releases).

## About KCL (previous versions)
<a name="shared-throughput-kcl-consumers-overview"></a>

KCL helps you consume and process data from a Kinesis data stream by taking care of many of the complex tasks associated with distributed computing. These include load balancing across multiple consumer application instances, responding to consumer application instance failures, checkpointing processed records, and reacting to resharding. The KCL takes care of all of these subtasks so that you can focus your efforts on writing your custom record-processing logic.

The KCL is different from the Kinesis Data Streams APIs that are available in the AWS SDKs. The Kinesis Data Streams APIs help you manage many aspects of Kinesis Data Streams, including creating streams, resharding, and putting and getting records. The KCL provides a layer of abstraction around all these subtasks, specifically so that you can focus on your consumer application’s custom data processing logic. For information about the Kinesis Data Streams API, see the [Amazon Kinesis API Reference](https://docs.aws.amazon.com/kinesis/latest/APIReference/Welcome.html).

**Important**  
The KCL is a Java library. Support for languages other than Java is provided using a multi-language interface called the MultiLangDaemon. This daemon is Java-based and runs in the background when you are using a KCL language other than Java. For example, if you install the KCL for Python and write your consumer application entirely in Python, you still need Java installed on your system because of the MultiLangDaemon. Further, MultiLangDaemon has some default settings that you might need to customize for your use case, for example, the AWS region that it connects to. For more information about the MultiLangDaemon on GitHub, see [KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

The KCL acts as an intermediary between your record processing logic and Kinesis Data Streams. 

## KCL previous versions
<a name="shared-throughput-kcl-consumers-versions"></a>

Currently, you can use either of the following supported versions of KCL to build your custom consumer applications:
+ **KCL 1.x**

  For more information, see [Develop KCL 1.x consumers](developing-consumers-with-kcl.md)
+ **KCL 2.x**

  For more information, see [Develop KCL 2.x Consumers](developing-consumers-with-kcl-v2.md)

You can use either KCL 1.x or KCL 2.x to build consumer applications that use shared throughput. For more information, see [Develop custom consumers with shared throughput using KCL](custom-kcl-consumers.md).

To build consumer applications that use dedicated throughput (enhanced fan-out consumers), you can only use KCL 2.x. For more information, see [Develop enhanced fan-out consumers with dedicated throughput](enhanced-consumers.md).

For information about the differences between KCL 1.x and KCL 2.x, and instructions on how to migrate from KCL 1.x to KCL 2.x, see [Migrate consumers from KCL 1.x to KCL 2.x](kcl-migration.md).

## KCL concepts (previous versions)
<a name="shared-throughput-kcl-consumers-concepts"></a>
+ **KCL consumer application** – an application that is custom-built using KCL and designed to read and process records from data streams. 
+ **Consumer application instance** - KCL consumer applications are typically distributed, with one or more application instances running simultaneously in order to coordinate on failures and dynamically load balance data record processing.
+ **Worker** – a high level class that a KCL consumer application instance uses to start processing data. 
**Important**  
Each KCL consumer application instance has one worker. 

  The worker initializes and oversees various tasks, including syncing shard and lease information, tracking shard assignments, and processing data from the shards. A worker provides KCL with the configuration information for the consumer application, such as the name of the data stream whose data records this KCL consumer application is going to process and the AWS credentials that are needed to access this data stream. The worker also kick starts that specific KCL consumer application instance to deliver data records from the data stream to the record processors.
**Important**  
In KCL 1.x this class is called **Worker**. For more information, (these are the Java KCL repositories), see [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java). In KCL 2.x, this class is called **Scheduler**. Scheduler’s purpose in KCL 2.x is identical to Worker’s purpose in KCL 1.x. For more information about the Scheduler class in KCL 2.x, see [https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java). 
+ **Lease** – data that defines the binding between a worker and a shard. Distributed KCL consumer applications use leases to partition data record processing across a fleet of workers. At any given time, each shard of data records is bound to a particular worker by a lease identified by the **leaseKey** variable. 

  By default, a worker can hold one or more leases (subject to the value of the **maxLeasesForWorker** variable) at the same time. 
**Important**  
Every worker will contend to hold all available leases for all available shards in a data stream. But only one worker will successfully hold each lease at any one time. 

  For example, if you have a consumer application instance A with worker A that is processing a data stream with 4 shards, worker A can hold leases to shards 1, 2, 3, and 4 at the same time. But if you have two consumer application instances: A and B with worker A and worker B, and these instances are processing a data stream with 4 shards, worker A and worker B cannot both hold the lease to shard 1 at the same time. One worker holds the lease to a particular shard until it is ready to stop processing this shard’s data records or until it fails. When one worker stops holding the lease, another worker takes up and holds the lease. 

  For more information, (these are the Java KCL repositories), see [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java) for KCL 1.x and [https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java) for KCL 2.x.
+ **Lease table** - a unique Amazon DynamoDB table that is used to keep track of the shards in a KDS data stream that are being leased and processed by the workers of the KCL consumer application. The lease table must remain in sync (within a worker and across all workers) with the latest shard information from the data stream while the KCL consumer application is running. For more information, see [Use a lease table to track the shards processed by the KCL consumer application](#shared-throughput-kcl-consumers-leasetable).
+ **Record processor** – the logic that defines how your KCL consumer application processes the data that it gets from the data streams. At runtime, a KCL consumer application instance instantiates a worker, and this worker instantiates one record processor for every shard to which it holds a lease. 

## Use a lease table to track the shards processed by the KCL consumer application
<a name="shared-throughput-kcl-consumers-leasetable"></a>

**Topics**
+ [

### What is a lease table
](#shared-throughput-kcl-consumers-what-is-leasetable)
+ [

### Throughput
](#shared-throughput-kcl-leasetable-throughput)
+ [

### How a lease table is synchronized with the shards in a Kinesis data stream
](#shared-throughput-kcl-consumers-leasetable-sync)

### What is a lease table
<a name="shared-throughput-kcl-consumers-what-is-leasetable"></a>

For each Amazon Kinesis Data Streams application, KCL uses a unique lease table (stored in a Amazon DynamoDB table) to keep track of the shards in a KDS data stream that are being leased and processed by the workers of the KCL consumer application.

**Important**  
KCL uses the name of the consumer application to create the name of the lease table that this consumer application uses, therefore each consumer application name must be unique.

You can view the lease table using the [Amazon DynamoDB console](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) while the consumer application is running.

If the lease table for your KCL consumer application does not exist when the application starts up, one of the workers creates the lease table for this application. 

**Important**  
 Your account is charged for the costs associated with the DynamoDB table, in addition to the costs associated with Kinesis Data Streams itself. 

Each row in the lease table represents a shard that is being processed by the workers of your consumer application. If your KCL consumer application processes only one data stream, then `leaseKey` which is the hash key for the lease table is the shard ID. If you are [Process multiple data streams with the same KCL 2.x for Java consumer application](#shared-throughput-kcl-multistream), then the structure of the leaseKey looks like this: `account-id:StreamName:streamCreationTimestamp:ShardId`. For example, `111111111:multiStreamTest-1:12345:shardId-000000000336`.

In addition to the shard ID, each row also includes the following data:
+ **checkpoint:** The most recent checkpoint sequence number for the shard. This value is unique across all shards in the data stream.
+ **checkpointSubSequenceNumber:** When using the Kinesis Producer Library's aggregation feature, this is an extension to **checkpoint** that tracks individual user records within the Kinesis record.
+ **leaseCounter:** Used for lease versioning so that workers can detect that their lease has been taken by another worker.
+ **leaseKey:** A unique identifier for a lease. Each lease is particular to a shard in the data stream and is held by one worker at a time.
+ **leaseOwner:** The worker that is holding this lease.
+ **ownerSwitchesSinceCheckpoint:** How many times this lease has changed workers since the last time a checkpoint was written.
+ **parentShardId:** Used to ensure that the parent shard is fully processed before processing starts on the child shards. This ensures that records are processed in the same order they were put into the stream.
+ **hashrange:** Used by the `PeriodicShardSyncManager` to run periodic syncs to find missing shards in the lease table and create leases for them if required. 
**Note**  
This data is present in the lease table for every shard starting with KCL 1.14 and KCL 2.3. For more information about `PeriodicShardSyncManager` and periodic synchronization between leases and shards, see [How a lease table is synchronized with the shards in a Kinesis data stream](#shared-throughput-kcl-consumers-leasetable-sync).
+ **childshards:** Used by the `LeaseCleanupManager` to review the child shard's processing status and decide whether the parent shard can be deleted from the lease table.
**Note**  
This data is present in the lease table for every shard starting with KCL 1.14 and KCL 2.3.
+ **shardID:** The ID of the shard.
**Note**  
This data is only present in the lease table if you are [Process multiple data streams with the same KCL 2.x for Java consumer application](#shared-throughput-kcl-multistream). This is only supported in KCL 2.x for Java, starting with KCL 2.3 for Java and later. 
+ **stream name** The identifier of the data stream in the following format: `account-id:StreamName:streamCreationTimestamp`.
**Note**  
This data is only present in the lease table if you are [Process multiple data streams with the same KCL 2.x for Java consumer application](#shared-throughput-kcl-multistream). This is only supported in KCL 2.x for Java, starting with KCL 2.3 for Java and later. 

### Throughput
<a name="shared-throughput-kcl-leasetable-throughput"></a>

If your Amazon Kinesis Data Streams application receives provisioned-throughput exceptions, you should increase the provisioned throughput for the DynamoDB table. The KCL creates the table with a provisioned throughput of 10 reads per second and 10 writes per second, but this might not be sufficient for your application. For example, if your Amazon Kinesis Data Streams application does frequent checkpointing or operates on a stream that is composed of many shards, you might need more throughput.

For information about provisioned throughput in DynamoDB, see [Read/Write Capacity Mode](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html) and [Working with Tables and Data](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html) in the *Amazon DynamoDB Developer Guide*.

### How a lease table is synchronized with the shards in a Kinesis data stream
<a name="shared-throughput-kcl-consumers-leasetable-sync"></a>

Workers in KCL consumer applications use leases to process shards from a given data stream. The information on what worker is leasing what shard at any given time is stored in a lease table. The lease table must remain in sync with the latest shard information from the data stream while the KCL consumer application is running. KCL synchronizes the lease table with the shards information acquired from the Kinesis Data Streams service during the consumer application bootstraping (either when the consumer application is initialized or restarted) and also whenever a shard that is being processed reaches an end (resharding). In other words, the workers or a KCL consumer application are synchronized with the data stream that they are processing during the initial consumer application bootstrap and whenever the consumer application encounters a data stream reshard event.

**Topics**
+ [

#### Synchronization in KCL 1.0 - 1.13 and KCL 2.0 - 2.2
](#shared-throughput-kcl-consumers-leasetable-sync-old)
+ [

#### Synchronization in KCL 2.x, starting with KCL 2.3 and later
](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl2)
+ [

#### Synchronization in KCL 1.x, starting with KCL 1.14 and later
](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl1)

#### Synchronization in KCL 1.0 - 1.13 and KCL 2.0 - 2.2
<a name="shared-throughput-kcl-consumers-leasetable-sync-old"></a>

In KCL 1.0 - 1.13 and KCL 2.0 - 2.2, during consumer application's bootstraping and also during each data stream reshard event, KCL synchronizes the lease table with the shards information acquired from the Kinesis Data Streams service by invoking the `ListShards` or the `DescribeStream` discovery APIs. In all the KCL versions listed above, each worker of a KCL consumer application completes the following steps to perform the lease/shard synchronization process during the consumer application's bootstrapping and at each stream reshard event:
+ Fetches all the shards for data the stream that is being processed
+ Fetches all the shard leases from the lease table
+ Filters out each open shard that does not have a lease in the lease table
+ Iterates over all found open shards and for each open shard with no open parent:
  + Traverses the hierarchy tree through its ancestors path to determine if the shard is a descendant. A shard is considered a descendant, if an ancestor shard is being processed (lease entry for ancestor shard exists in the lease table) or if an ancestor shard should be processed (for example, if the initial position is `TRIM_HORIZON` or `AT_TIMESTAMP`)
  + If the open shard in context is a descendant, KCL checkpoints the shard based on initial position and creates leases for its parents, if required

#### Synchronization in KCL 2.x, starting with KCL 2.3 and later
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl2"></a>

Starting with the latest supported versions of KCL 2.x (KCL 2.3) and later, the library now supports the following changes to the synchronization process. These lease/shard synchronization changes significantly reduce the number of API calls made by KCL consumer applications to the Kinesis Data Streams service and optimize the lease management in your KCL consumer application. 
+ During application's bootstraping, if the lease table is empty, KCL utilizes the `ListShard` API's filtering option (the `ShardFilter` optional request parameter) to retrieve and create leases only for a snapshot of shards open at the time specified by the `ShardFilter` parameter. The `ShardFilter` parameter enables you to filter out the response of the `ListShards` API. The only required property of the `ShardFilter` parameter is `Type`. KCL uses the `Type` filter property and the following of its valid values to identify and return a snapshot of open shards that might require new leases:
  + `AT_TRIM_HORIZON` - the response includes all the shards that were open at `TRIM_HORIZON`. 
  + `AT_LATEST` - the response includes only the currently open shards of the data stream. 
  + `AT_TIMESTAMP` - the response includes all shards whose start timestamp is less than or equal to the given timestamp and end timestamp is greater than or equal to the given timestamp or still open.

  `ShardFilter` is used when creating leases for an empty lease table to initialize leases for a snapshot of shards specified at `RetrievalConfig#initialPositionInStreamExtended`.

  For more information about `ShardFilter`, see [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html).
+ Instead of all workers performing the lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard synchronization.
+ KCL 2.3 uses the `ChildShards` return parameter of the `GetRecords` and the `SubscribeToShard` APIs to perform lease/shard synchronization that happens at `SHARD_END` for closed shards, allowing a KCL worker to only create leases for the child shards of the shard it finished processing. For shared throughout consumer applications, this optimization of the lease/shard synchronization uses the `ChildShards` parameter of the `GetRecords` API. For the dedicated throughput (enhanced fan-out) consumer applications, this optimization of the lease/shard synchronization uses the `ChildShards` parameter of the `SubscribeToShard` API. For more information, see [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html), [SubscribeToShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html), and [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).
+ With the above changes, the behavior of KCL is moving from the model of all workers learning about all existing shards to the model of workers learning only about the children shards of the shards that each worker owns. Therefore, in addition to the synchronization that happens during consumer application bootstraping and reshard events, KCL now also performs additional periodic shard/lease scans in order to identify any potential holes in the lease table (in other words, to learn about all new shards) to ensure the complete hash range of the data stream is being processed and create leases for them if required. `PeriodicShardSyncManager` is the component that is responsible for running periodic lease/shard scans. 

  For more information about `PeriodicShardSyncManager` in KCL 2.3, see [https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java\$1L201-L213](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213).

  In KCL 2.3, new configuration options are available to configure `PeriodicShardSyncManager` in `LeaseManagementConfig`:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html)

  New CloudWatch metrics are also now emitted to monitor the health of the `PeriodicShardSyncManager`. For more information, see [PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task).
+ Including an optimization to `HierarchicalShardSyncer` to only create leases for one layer of shards.

#### Synchronization in KCL 1.x, starting with KCL 1.14 and later
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl1"></a>

Starting with the latest supported versions of KCL 1.x (KCL 1.14) and later, the library now supports the following changes to the synchronization process. These lease/shard synchronization changes significantly reduce the number of API calls made by KCL consumer applications to the Kinesis Data Streams service and optimize the lease management in your KCL consumer application. 
+ During application's bootstraping, if the lease table is empty, KCL utilizes the `ListShard` API's filtering option (the `ShardFilter` optional request parameter) to retrieve and create leases only for a snapshot of shards open at the time specified by the `ShardFilter` parameter. The `ShardFilter` parameter enables you to filter out the response of the `ListShards` API. The only required property of the `ShardFilter` parameter is `Type`. KCL uses the `Type` filter property and the following of its valid values to identify and return a snapshot of open shards that might require new leases:
  + `AT_TRIM_HORIZON` - the response includes all the shards that were open at `TRIM_HORIZON`. 
  + `AT_LATEST` - the response includes only the currently open shards of the data stream. 
  + `AT_TIMESTAMP` - the response includes all shards whose start timestamp is less than or equal to the given timestamp and end timestamp is greater than or equal to the given timestamp or still open.

  `ShardFilter` is used when creating leases for an empty lease table to initialize leases for a snapshot of shards specified at `KinesisClientLibConfiguration#initialPositionInStreamExtended`.

  For more information about `ShardFilter`, see [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html).
+ Instead of all workers performing the lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard synchronization.
+ KCL 1.14 uses the `ChildShards` return parameter of the `GetRecords` and the `SubscribeToShard` APIs to perform lease/shard synchronization that happens at `SHARD_END` for closed shards, allowing a KCL worker to only create leases for the child shards of the shard it finished processing. For more information, see [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) and [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).
+ With the above changes, the behavior of KCL is moving from the model of all workers learning about all existing shards to the model of workers learning only about the children shards of the shards that each worker owns. Therefore, in addition to the synchronization that happens during consumer application bootstraping and reshard events, KCL now also performs additional periodic shard/lease scans in order to identify any potential holes in the lease table (in other words, to learn about all new shards) to ensure the complete hash range of the data stream is being processed and create leases for them if required. `PeriodicShardSyncManager` is the component that is responsible for running periodic lease/shard scans. 

  When `KinesisClientLibConfiguration#shardSyncStrategyType` is set to `ShardSyncStrategyType.SHARD_END`, `PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold` is used to determine the threshold for number of consecutive scans containing holes in the lease table after which to enforce a shard synchronization. When `KinesisClientLibConfiguration#shardSyncStrategyType` is set to `ShardSyncStrategyType.PERIODIC`, `leasesRecoveryAuditorInconsistencyConfidenceThreshold` is ignored.

  For more information about `PeriodicShardSyncManager` in KCL 1.14, see [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java\$1L987-L999](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999).

  In KCL 1.14, new configuration option is available to configure `PeriodicShardSyncManager` in `LeaseManagementConfig`:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html)

  New CloudWatch metrics are also now emitted to monitor the health of the `PeriodicShardSyncManager`. For more information, see [PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task).
+ KCL 1.14 now also supports deferred lease cleanup. Leases are deleted asynchronously by `LeaseCleanupManager` upon reaching `SHARD_END`, when a shard has either expired past the data stream’s retention period or been closed as the result of a resharding operation.

  New configuration options are available to configure `LeaseCleanupManager`.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html)
+ Including an optimization to `KinesisShardSyncer` to only create leases for one layer of shards.

## Process multiple data streams with the same KCL 2.x for Java consumer application
<a name="shared-throughput-kcl-multistream"></a>

This section describes the following changes in KCL 2.x for Java that enable you to create KCL consumer applications that can process more than one data stream at the same time. 

**Important**  
Multistream processing is only supported in KCL 2.x for Java, starting with KCL 2.3 for Java and later.   
Multistream processing is NOT supported for any other languages in which KCL 2.x can be implemented.  
Multistream processing is NOT supported in any versions of KCL 1.x.
+ **MultistreamTracker interface**

  To build a consumer application that can process multiple streams at the same time, you must implement a new interface called [MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java). This interface includes the `streamConfigList` method that returns the list of data streams and their configurations to be processed by the KCL consumer application. Notice that the data streams that are being processed can be changed during the consumer application runtime. `streamConfigList` is called periodically by the KCL to learn about the changes in data streams to process.

  The `streamConfigList` method populates the [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23) list. 

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```

  Note that the `StreamIdentifier` and `InitialPositionInStreamExtended` are required fields, while `consumerArn` is optional. You must provide the `consumerArn` only if you are using KCL 2.x to implement an enhanced fan-out consumer application.

  For more information about `StreamIdentifier`, see [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java\$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129). To create a `StreamIdentifier`, we recommend that you create a multistream instance from the `streamArn` and the `streamCreationEpoch` that is available in v2.5.0 and later. In KCL v2.3 and v2.4, which don't support `streamArm`, create a multistream instance by using the format `account-id:StreamName:streamCreationTimestamp`. This format will be deprecated and no longer supported starting with the next major release.

  `MultistreamTracker` also includes a strategy for deleting leases of old streams in the lease table (`formerStreamsLeasesDeletionStrategy`). Notice that the strategy CANNOT be changed during the consumer application runtime. For more information, see [https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java)
+ [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java) is a an application-wide class that you can use to specify all of the KCL 2.x configuration settings to be used when building your KCL consumer application. `ConfigsBuilder` class now has support for the `MultistreamTracker` interface. You can initialize ConfigsBuilder either with the name of the one data stream to consume records from:

  ```
   /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```

  Or you can initialize ConfigsBuilder with `MultiStreamTracker` if you want to implement a KCL consumer application that processes multiple streams at the same time.

  ```
  * Constructor to initialize ConfigsBuilder with MultiStreamTracker
       * @param multiStreamTracker
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.left(multiStreamTracker);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```
+ With multistream support implemented for your KCL consumer application, each row of the application's lease table now contains the shard ID and the stream name of the multiple data streams that this application processes. 
+ When multistream support for your KCL consumer application is implemented, the leaseKey takes the following structure: `account-id:StreamName:streamCreationTimestamp:ShardId`. For example, `111111111:multiStreamTest-1:12345:shardId-000000000336`.
**Important**  
When your existing KCL consumer application is configured to process only one data stream, the leaseKey (which is the hash key for the lease table) is the shard ID. If you reconfigure this existing KCL consumer application to process multiple data streams, it breaks your lease table, because with multistream support, the leaseKey structure must be as follows: `account-id:StreamName:StreamCreationTimestamp:ShardId`.

## Use the KCL with the AWS Glue Schema Registry
<a name="shared-throughput-kcl-consumers-glue-schema-registry"></a>

You can integrate your Kinesis data streams with the AWS Glue Schema Registry. The AWS Glue Schema Registry allows you to centrally discover, control, and evolve schemas, while ensuring data produced is continuously validated by a registered schema. A schema defines the structure and format of a data record. A schema is a versioned specification for reliable data publication, consumption, or storage. The AWS GlueSchema Registry lets you improve end-to-end data quality and data governance within your streaming applications. For more information, see [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). One of the ways to set up this integration is through the KCL in Java. 

**Important**  
Currently, Kinesis Data Streams and AWS Glue Schema Registry integration is only supported for the Kinesis data streams that use KCL 2.3 consumers implemented in Java. Multi-language support is not provided. KCL 1.0 consumers are not supported. KCL 2.x consumers prior to KCL 2.3 are not supported.

For detailed instructions on how to set up integration of Kinesis Data Streams with Schema Registry using the KCL, see the "Interacting with Data Using the KPL/KCL Libraries" section in [Use Case: Integrating Amazon Kinesis Data Streams with the AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds).

# Develop custom consumers with shared throughput
<a name="shared-throughput-consumers"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

If you don't need dedicated throughput when receiving data from Kinesis Data Streams, and if you don't need read propagation delays under 200 ms, you can build consumer applications as described in the following topics. You can use the Kinesis Client Library (KCL) or the AWS SDK for Java.

**Topics**
+ [

# Develop custom consumers with shared throughput using KCL
](custom-kcl-consumers.md)

For information about building consumers that can receive records from Kinesis data streams with dedicated throughput, see [Develop enhanced fan-out consumers with dedicated throughput](enhanced-consumers.md).

# Develop custom consumers with shared throughput using KCL
<a name="custom-kcl-consumers"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

One of the methods of developing a custom consumer application with shared throughput is to use the Kinesis Client Library (KCL). 

Choose from the following topics for the KCL version you are using.

**Topics**
+ [

# Develop KCL 1.x consumers
](developing-consumers-with-kcl.md)
+ [

# Develop KCL 2.x Consumers
](developing-consumers-with-kcl-v2.md)

# Develop KCL 1.x consumers
<a name="developing-consumers-with-kcl"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

You can develop a consumer application for Amazon Kinesis Data Streams using the Kinesis Client Library (KCL). 

For more information about KCL, see [About KCL (previous versions)](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview).

Choose from the following topics depending on the option you want to use.

**Topics**
+ [

# Develop a Kinesis Client Library consumer in Java
](kinesis-record-processor-implementation-app-java.md)
+ [

# Develop a Kinesis Client Library consumer in Node.js
](kinesis-record-processor-implementation-app-nodejs.md)
+ [

# Develop a Kinesis Client Library consumer in .NET
](kinesis-record-processor-implementation-app-dotnet.md)
+ [

# Develop a Kinesis Client Library consumer in Python
](kinesis-record-processor-implementation-app-py.md)
+ [

# Develop a Kinesis Client Library consumer in Ruby
](kinesis-record-processor-implementation-app-ruby.md)

# Develop a Kinesis Client Library consumer in Java
<a name="kinesis-record-processor-implementation-app-java"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

You can use the Kinesis Client Library (KCL) to build applications that process data from your Kinesis data streams. The Kinesis Client Library is available in multiple languages. This topic discusses Java. To view the Javadoc reference, see the [AWS Javadoc topic for Class AmazonKinesisClient](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html).

To download the Java KCL from GitHub, go to [Kinesis Client Library (Java)](https://github.com/awslabs/amazon-kinesis-client). To locate the Java KCL on Apache Maven, go to the [KCL search results](https://search.maven.org/#search|ga|1|amazon-kinesis-client) page. To download sample code for a Java KCL consumer application from GitHub, go to the [KCL for Java sample project](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis) page on GitHub. 

The sample application uses [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html). You can change the logging configuration in the static `configure` method defined in the `AmazonKinesisApplicationSample.java` file. For more information about how to use Apache Commons Logging with Log4j and AWS Java applications, see [Logging with Log4j](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html) in the *AWS SDK for Java Developer Guide*.

You must complete the following tasks when implementing a KCL consumer application in Java:

**Topics**
+ [

## Implement the IRecordProcessor methods
](#kinesis-record-processor-implementation-interface-java)
+ [

## Implement a class factory for the IRecordProcessor interface
](#kinesis-record-processor-implementation-factory-java)
+ [

## Create a worker
](#kcl-java-worker)
+ [

## Modify the configuration properties
](#kinesis-record-processor-initialization-java)
+ [

## Migrate to Version 2 of the record processor interface
](#kcl-java-v2-migration)

## Implement the IRecordProcessor methods
<a name="kinesis-record-processor-implementation-interface-java"></a>

The KCL currently supports two versions of the `IRecordProcessor` interface:The original interface is available with the first version of the KCL, and version 2 is available starting with KCL version 1.5.0. Both interfaces are fully supported. Your choice depends on your specific scenario requirements. Refer to your locally built Javadocs or the source code to see all the differences. The following sections outline the minimal implementation for getting started.

**Topics**
+ [

### Original Interface (Version 1)
](#kcl-java-interface-original)
+ [

### Updated interface (Version 2)
](#kcl-java-interface-v2)

### Original Interface (Version 1)
<a name="kcl-java-interface-original"></a>

The original `IRecordProcessor` interface (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) exposes the following record processor methods that your consumer must implement. The sample provides implementations that you can use as a starting point (see `AmazonKinesisApplicationSampleRecordProcessor.java`).

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**initialize**  
The KCL calls the `initialize` method when the record processor is instantiated, passing a specific shard ID as a parameter. This record processor processes only this shard and typically, the reverse is also true (this shard is processed only by this record processor). However, your consumer should account for the possibility that a data record might be processed more than one time. Kinesis Data Streams has *at least once* semantics, meaning that every data record from a shard is processed at least one time by a worker in your consumer. For more information about cases in which a particular shard may be processed by more than one worker, see [Use resharding, scaling, and parallel processing to change the number of shards](kinesis-record-processor-scaling.md).

```
public void initialize(String shardId)
```

**processRecords**  
The KCL calls the `processRecords` method, passing a list of data record from the shard specified by the `initialize(shardId)` method. The record processor processes the data in these records according to the semantics of the consumer. For example, the worker might perform a transformation on the data and then store the result in an Amazon Simple Storage Service (Amazon S3) bucket.

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

In addition to the data itself, the record also contains a sequence number and partition key. The worker can use these values when processing the data. For example, the worker could choose the S3 bucket in which to store the data based on the value of the partition key. The `Record` class exposes the following methods that provide access to the record's data, sequence number, and partition key. 

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

In the sample, the private method `processRecordsWithRetries` has code that shows how a worker can access the record's data, sequence number, and partition key.

Kinesis Data Streams requires the record processor to keep track of the records that have already been processed in a shard. The KCL takes care of this tracking for you by passing a checkpointer (`IRecordProcessorCheckpointer`) to `processRecords`. The record processor calls the `checkpoint` method on this interface to inform the KCL of how far it has progressed in processing the records in the shard. If the worker fails, the KCL uses this information to restart the processing of the shard at the last known processed record.

For a split or merge operation, the KCL won't start processing the new shards until the processors for the original shards have called `checkpoint` to signal that all processing on the original shards is complete.

If you don't pass a parameter, the KCL assumes that the call to `checkpoint` means that all records have been processed, up to the last record that was passed to the record processor. Therefore, the record processor should call `checkpoint` only after it has processed all the records in the list that was passed to it. Record processors do not need to call `checkpoint` on each call to `processRecords`. A processor could, for example, call `checkpoint` on every third call to `processRecords`. You can optionally specify the exact sequence number of a record as a parameter to `checkpoint`. In this case, the KCL assumes that all records have been processed up to that record only.

In the sample, the private method `checkpoint` shows how to call `IRecordProcessorCheckpointer.checkpoint` using the appropriate exception handling and retry logic.

The KCL relies on `processRecords` to handle any exceptions that arise from processing the data records. If an exception is thrown from `processRecords`, the KCL skips over the data records that were passed before the exception. That is, these records are not re-sent to the record processor that threw the exception or to any other record processor in the consumer.

**shutdown**  
The KCL calls the `shutdown` method either when processing ends (the shutdown reason is `TERMINATE`) or the worker is no longer responding (the shutdown reason is `ZOMBIE`).

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

Processing ends when the record processor does not receive any further records from the shard, because either the shard was split or merged, or the stream was deleted.

The KCL also passes a `IRecordProcessorCheckpointer` interface to `shutdown`. If the shutdown reason is `TERMINATE`, the record processor should finish processing any data records, and then call the `checkpoint` method on this interface.

### Updated interface (Version 2)
<a name="kcl-java-interface-v2"></a>

The updated `IRecordProcessor` interface (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) exposes the following record processor methods that your consumer must implement: 

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

All of the arguments from the original version of the interface are accessible through get methods on the container objects. For example, to retrieve the list of records in `processRecords()`, you can use `processRecordsInput.getRecords()`.

As of version 2 of this interface (KCL 1.5.0 and later), the following new inputs are available in addition to the inputs provided by the original interface:

starting sequence number  
In the `InitializationInput` object passed to the `initialize()` operation, the starting sequence number from which records would be provided to the record processor instance. This is the sequence number that was last checkpointed by the record processor instance previously processing the same shard. This is provided in case your application needs this information. 

pending checkpoint sequence number  
In the `InitializationInput` object passed to the `initialize()` operation, the pending checkpoint sequence number (if any) that could not be committed before the previous record processor instance stopped.

## Implement a class factory for the IRecordProcessor interface
<a name="kinesis-record-processor-implementation-factory-java"></a>

You also need to implement a factory for the class that implements the record processor methods. When your consumer instantiates the worker, it passes a reference to this factory.

The sample implements the factory class in the file `AmazonKinesisApplicationSampleRecordProcessorFactory.java` using the original record processor interface. If you want the class factory to create version 2 record processors, use the package name `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`.

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## Create a worker
<a name="kcl-java-worker"></a>

As discussed in [Implement the IRecordProcessor methods](#kinesis-record-processor-implementation-interface-java), there are two versions of the KCL record processor interface to choose from, which affects how you would create a worker. The original record processor interface uses the following code structure to create a worker:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

With version 2 of the record processor interface, you can use `Worker.Builder` to create a worker without needing to worry about which constructor to use and the order of the arguments. The updated record processor interface uses the following code structure to create a worker:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## Modify the configuration properties
<a name="kinesis-record-processor-initialization-java"></a>

The sample provides default values for configuration properties. This configuration data for the worker is then consolidated in a `KinesisClientLibConfiguration` object. This object and a reference to the class factory for `IRecordProcessor` are passed in the call that instantiates the worker. You can override any of these properties with your own values using a Java properties file (see `AmazonKinesisApplicationSample.java`).

### Application name
<a name="configuration-property-application-name"></a>

The KCL requires an application name that is unique across your applications, and across Amazon DynamoDB tables in the same Region. It uses the application name configuration value in the following ways:
+ All workers associated with this application name are assumed to be working together on the same stream. These workers may be distributed on multiple instances. If you run an additional instance of the same application code, but with a different application name, the KCL treats the second instance as an entirely separate application that is also operating on the same stream.
+ The KCL creates a DynamoDB table with the application name and uses the table to maintain state information (such as checkpoints and worker-shard mapping) for the application. Each application has its own DynamoDB table. For more information, see [Use a lease table to track the shards processed by the KCL consumer application](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Set up credentials
<a name="kinesis-record-processor-cred-java"></a>

You must make your AWS credentials available to one of the credential providers in the default credential providers chain. For example, if you are running your consumer on an EC2 instance, we recommend that you launch the instance with an IAM role. AWS credentials that reflect the permissions associated with this IAM role are made available to applications on the instance through its instance metadata. This is the most secure way to manage credentials for a consumer running on an EC2 instance.

The sample application first attempts to retrieve IAM credentials from instance metadata: 

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

If the sample application cannot obtain credentials from the instance metadata, it attempts to retrieve credentials from a properties file:

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

For more information about instance metadata, see [Instance Metadata](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html) in the *Amazon EC2 User Guide*.

### Use the worker ID for multiple instances
<a name="kinesis-record-processor-workerid-java"></a>

The sample initialization code creates an ID for the worker, `workerId`, using the name of the local computer and appending a globally unique identifier as shown in the following code snippet. This approach supports the scenario of multiple instances of the consumer application running on a single computer.

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## Migrate to Version 2 of the record processor interface
<a name="kcl-java-v2-migration"></a>

If you want to migrate code that uses the original interface, in addition to the steps described previously, the following steps are required:

1. Change your record processor class to import the version 2 record processor interface:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. Change the references to inputs to use `get` methods on the container objects. For example, in the `shutdown()` operation, change "`checkpointer`" to "`shutdownInput.getCheckpointer()`".

1. Change your record processor factory class to import the version 2 record processor factory interface:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. Change the construction of the worker to use `Worker.Builder`. For example:

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# Develop a Kinesis Client Library consumer in Node.js
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

You can use the Kinesis Client Library (KCL) to build applications that process data from your Kinesis data streams. The Kinesis Client Library is available in multiple languages. This topic discusses Node.js.

The KCL is a Java library; support for languages other than Java is provided using a multi-language interface called the *MultiLangDaemon*. This daemon is Java-based and runs in the background when you are using a KCL language other than Java. Therefore, if you install the KCL for Node.js and write your consumer app entirely in Node.js, you still need Java installed on your system because of the MultiLangDaemon. Further, MultiLangDaemon has some default settings you may need to customize for your use case, for example, the AWS Region that it connects to. For more information about the MultiLangDaemon on GitHub, go to the [KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang) page.

To download the Node.js KCL from GitHub, go to [Kinesis Client Library (Node.js)](https://github.com/awslabs/amazon-kinesis-client-nodejs).

**Sample Code Downloads**

There are two code samples available for KCL in Node.js:
+ [basic-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  Used in the following sections to illustrate the fundamentals of building a KCL consumer application in Node.js.
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   Slightly more advanced and uses a real-world scenario, after you have familiarized yourself with the basic sample code. This sample is not discussed here but has a README file with more information.

You must complete the following tasks when implementing a KCL consumer application in Node.js:

**Topics**
+ [

## Implement the record processor
](#kinesis-record-processor-implementation-interface-nodejs)
+ [

## Modify the configuration properties
](#kinesis-record-processor-initialization-nodejs)

## Implement the record processor
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

The simplest possible consumer using the KCL for Node.js must implement a `recordProcessor` function, which in turn contains the functions `initialize`, `processRecords`, and `shutdown`. The sample provides an implementation that you can use as a starting point (see `sample_kcl_app.js`).

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**initialize**  
The KCL calls the `initialize` function when the record processor starts. This record processor processes only the shard ID passed as `initializeInput.shardId`, and typically, the reverse is also true (this shard is processed only by this record processor). However, your consumer should account for the possibility that a data record might be processed more than one time. This is because Kinesis Data Streams has *at least once* semantics, meaning that every data record from a shard is processed at least one time by a worker in your consumer. For more information about cases in which a particular shard might be processed by more than one worker, see [Use resharding, scaling, and parallel processing to change the number of shards](kinesis-record-processor-scaling.md).

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 The KCL calls this function with input that contains a list of data records from the shard specified to the `initialize` function. The record processor that you implement processes the data in these records according to the semantics of your consumer. For example, the worker might perform a transformation on the data and then store the result in an Amazon Simple Storage Service (Amazon S3) bucket. 

```
processRecords: function(processRecordsInput, completeCallback)
```

In addition to the data itself, the record also contains a sequence number and partition key, which the worker can use when processing the data. For example, the worker could choose the S3 bucket in which to store the data based on the value of the partition key. The `record` dictionary exposes the following key-value pairs to access the record's data, sequence number, and partition key:

```
record.data
record.sequenceNumber
record.partitionKey
```

Note that the data is Base64-encoded.

In the basic sample, the function `processRecords` has code that shows how a worker can access the record's data, sequence number, and partition key.

Kinesis Data Streams requires the record processor to keep track of the records that have already been processed in a shard. The KCL takes care of this tracking for with a `checkpointer` object passed as `processRecordsInput.checkpointer`. Your record processor calls the `checkpointer.checkpoint` function to inform the KCL how far it has progressed in processing the records in the shard. In the event that the worker fails, the KCL uses this information when you restart the processing of the shard so that it continues from the last known processed record.

For a split or merge operation, the KCL doesn't start processing the new shards until the processors for the original shards have called `checkpoint` to signal that all processing on the original shards is complete.

If you don't pass the sequence number to the `checkpoint` function, the KCL assumes that the call to `checkpoint` means that all records have been processed, up to the last record that was passed to the record processor. Therefore, the record processor should call `checkpoint` **only** after it has processed all the records in the list that was passed to it. Record processors do not need to call `checkpoint` on each call to `processRecords`. A processor could, for example, call `checkpoint` on every third call, or some event external to your record processor, such as a custom verification/validation service you've implemented. 

You can optionally specify the exact sequence number of a record as a parameter to `checkpoint`. In this case, the KCL assumes that all records have been processed up to that record only.

The basic sample application shows the simplest possible call to the `checkpointer.checkpoint` function. You can add other checkpointing logic you need for your consumer at this point in the function.

**shutdown**  
The KCL calls the `shutdown` function either when processing ends (`shutdownInput.reason` is `TERMINATE`) or the worker is no longer responding (`shutdownInput.reason` is `ZOMBIE`).

```
shutdown: function(shutdownInput, completeCallback)
```

Processing ends when the record processor does not receive any further records from the shard, because either the shard was split or merged, or the stream was deleted.

The KCL also passes a `shutdownInput.checkpointer` object to `shutdown`. If the shutdown reason is `TERMINATE`, you should make sure that the record processor has finished processing any data records, and then call the `checkpoint` function on this interface.

## Modify the configuration properties
<a name="kinesis-record-processor-initialization-nodejs"></a>

The sample provides default values for the configuration properties. You can override any of these properties with your own values (see `sample.properties` in the basic sample).

### Application name
<a name="kinesis-record-processor-application-name-nodejs"></a>

The KCL requires an application that this is unique among your applications, and among Amazon DynamoDB tables in the same Region. It uses the application name configuration value in the following ways:
+ All workers associated with this application name are assumed to be working together on the same stream. These workers may be distributed on multiple instances. If you run an additional instance of the same application code, but with a different application name, the KCL treats the second instance as an entirely separate application that is also operating on the same stream.
+ The KCL creates a DynamoDB table with the application name and uses the table to maintain state information (such as checkpoints and worker-shard mapping) for the application. Each application has its own DynamoDB table. For more information, see [Use a lease table to track the shards processed by the KCL consumer application](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Set up credentials
<a name="kinesis-record-processor-credentials-nodejs"></a>

You must make your AWS credentials available to one of the credential providers in the default credential providers chain. You can you use the `AWSCredentialsProvider` property to set a credentials provider. The `sample.properties` file must make your credentials available to one of the credentials providers in the [default credential providers chain](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). If you are running your consumer on an Amazon EC2 instance, we recommend that you configure the instance with an IAM role. AWS credentials that reflect the permissions associated with this IAM role are made available to applications on the instance through its instance metadata. This is the most secure way to manage credentials for a consumer application running on an EC2 instance.

The following example configures KCL to process a Kinesis data stream named `kclnodejssample` using the record processor supplied in `sample_kcl_app.js`:

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# Develop a Kinesis Client Library consumer in .NET
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

You can use the Kinesis Client Library (KCL) to build applications that process data from your Kinesis data streams. The Kinesis Client Library is available in multiple languages. This topic discusses .NET.

The KCL is a Java library; support for languages other than Java is provided using a multi-language interface called the *MultiLangDaemon*. This daemon is Java-based and runs in the background when you are using a KCL language other than Java. Therefore, if you install the KCL for .NET and write your consumer app entirely in .NET, you still need Java installed on your system because of the MultiLangDaemon. Further, MultiLangDaemon has some default settings you may need to customize for your use case, for example, the AWS Region that it connects to. For more information about the MultiLangDaemon on GitHub, go to the [KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang) page.

To download the .NET KCL from GitHub, go to [Kinesis Client Library (.NET)](https://github.com/awslabs/amazon-kinesis-client-net). To download sample code for a .NET KCL consumer application, go to the [KCL for .NET sample consumer project](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer) page on GitHub.

You must complete the following tasks when implementing a KCL consumer application in .NET:

**Topics**
+ [

## Implement the IRecordProcessor class methods
](#kinesis-record-processor-implementation-interface-dotnet)
+ [

## Modify the configuration properties
](#kinesis-record-processor-initialization-dotnet)

## Implement the IRecordProcessor class methods
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

The consumer must implement the following methods for `IRecordProcessor`. The sample consumer provides implementations that you can use as a starting point (see the `SampleRecordProcessor` class in `SampleConsumer/AmazonKinesisSampleConsumer.cs`).

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**Initialize**  
The KCL calls this method when the record processor is instantiated, passing a specific shard ID in the `input` parameter (`input.ShardId`). This record processor processes only this shard, and typically, the reverse is also true (this shard is processed only by this record processor). However, your consumer should account for the possibility that a data record might be processed more than one time. This is because Kinesis Data Streams has *at least once* semantics, meaning that every data record from a shard is processed at least one time by a worker in your consumer. For more information about cases in which a particular shard might be processed by more than one worker, see [Use resharding, scaling, and parallel processing to change the number of shards](kinesis-record-processor-scaling.md).

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
The KCL calls this method, passing a list of data records in the `input` parameter (`input.Records`) from the shard specified by the `Initialize` method. The record processor that you implement processes the data in these records according to the semantics of your consumer. For example, the worker might perform a transformation on the data and then store the result in an Amazon Simple Storage Service (Amazon S3) bucket.

```
public void ProcessRecords(ProcessRecordsInput input)
```

In addition to the data itself, the record also contains a sequence number and partition key. The worker can use these values when processing the data. For example, the worker could choose the S3 bucket in which to store the data based on the value of the partition key. The `Record` class exposes the following to access the record's data, sequence number, and partition key:

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

In the sample, the method `ProcessRecordsWithRetries` has code that shows how a worker can access the record's data, sequence number, and partition key.

Kinesis Data Streams requires the record processor to keep track of the records that have already been processed in a shard. The KCL takes care of this tracking for you by passing a `Checkpointer` object to `ProcessRecords` (`input.Checkpointer`). The record processor calls the `Checkpointer.Checkpoint` method to inform the KCL of how far it has progressed in processing the records in the shard. If the worker fails, the KCL uses this information to restart the processing of the shard at the last known processed record.

For a split or merge operation, the KCL doesn't start processing the new shards until the processors for the original shards have called `Checkpointer.Checkpoint` to signal that all processing on the original shards is complete.

If you don't pass a parameter, the KCL assumes that the call to `Checkpointer.Checkpoint` signifies that all records have been processed, up to the last record that was passed to the record processor. Therefore, the record processor should call `Checkpointer.Checkpoint` only after it has processed all the records in the list that was passed to it. Record processors do not need to call `Checkpointer.Checkpoint` on each call to `ProcessRecords`. A processor could, for example, call `Checkpointer.Checkpoint` on every third or fourth call. You can optionally specify the exact sequence number of a record as a parameter to `Checkpointer.Checkpoint`. In this case, the KCL assumes that records have been processed only up to that record.

In the sample, the private method `Checkpoint(Checkpointer checkpointer)` shows how to call the `Checkpointer.Checkpoint` method using appropriate exception handling and retry logic.

The KCL for .NET handles exceptions differently from other KCL language libraries in that it does not handle any exceptions that arise from processing the data records. Any uncaught exceptions from user code crashes the program.

**Shutdown**  
The KCL calls the `Shutdown` method either when processing ends (the shutdown reason is `TERMINATE`) or the worker is no longer responding (the shutdown `input.Reason` value is `ZOMBIE`).

```
public void Shutdown(ShutdownInput input)
```

Processing ends when the record processor does not receive any further records from the shard, because the shard was split or merged, or the stream was deleted.

The KCL also passes a `Checkpointer` object to `shutdown`. If the shutdown reason is `TERMINATE`, the record processor should finish processing any data records, and then call the `checkpoint` method on this interface.

## Modify the configuration properties
<a name="kinesis-record-processor-initialization-dotnet"></a>

The sample consumer provides default values for the configuration properties. You can override any of these properties with your own values (see `SampleConsumer/kcl.properties`).

### Application name
<a name="modify-kinesis-record-processor-application-name"></a>

The KCL requires an application that this is unique among your applications, and among Amazon DynamoDB tables in the same Region. It uses the application name configuration value in the following ways:
+ All workers associated with this application name are assumed to be working together on the same stream. These workers may be distributed on multiple instances. If you run an additional instance of the same application code, but with a different application name, the KCL treats the second instance as an entirely separate application that is also operating on the same stream.
+ The KCL creates a DynamoDB table with the application name and uses the table to maintain state information (such as checkpoints and worker-shard mapping) for the application. Each application has its own DynamoDB table. For more information, see [Use a lease table to track the shards processed by the KCL consumer application](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Set up credentials
<a name="kinesis-record-processor-creds-dotnet"></a>

You must make your AWS credentials available to one of the credential providers in the default credential providers chain. You can you use the `AWSCredentialsProvider` property to set a credentials provider. The [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) must make your credentials available to one of the credentials providers in the [default credential providers chain](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). If you are running your consumer application on an EC2 instance, we recommend that you configure the instance with an IAM role. AWS credentials that reflect the permissions associated with this IAM role are made available to applications on the instance through its instance metadata. This is the most secure way to manage credentials for a consumer running on an EC2 instance.

The sample's properties file configures KCL to process a Kinesis data stream called "words" using the record processor supplied in `AmazonKinesisSampleConsumer.cs`. 

# Develop a Kinesis Client Library consumer in Python
<a name="kinesis-record-processor-implementation-app-py"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

You can use the Kinesis Client Library (KCL) to build applications that process data from your Kinesis data streams. The Kinesis Client Library is available in multiple languages. This topic discusses Python.

The KCL is a Java library; support for languages other than Java is provided using a multi-language interface called the *MultiLangDaemon*. This daemon is Java-based and runs in the background when you are using a KCL language other than Java. Therefore, if you install the KCL for Python and write your consumer app entirely in Python, you still need Java installed on your system because of the MultiLangDaemon. Further, MultiLangDaemon has some default settings you may need to customize for your use case, for example, the AWS Region that it connects to. For more information about the MultiLangDaemon on GitHub, go to the [KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang) page.

To download the Python KCL from GitHub, go to [Kinesis Client Library (Python)](https://github.com/awslabs/amazon-kinesis-client-python). To download sample code for a Python KCL consumer application, go to the [KCL for Python sample project](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) page on GitHub.

You must complete the following tasks when implementing a KCL consumer application in Python:

**Topics**
+ [

## Implement the RecordProcessor class methods
](#kinesis-record-processor-implementation-interface-py)
+ [

## Modify the configuration properties
](#kinesis-record-processor-initialization-py)

## Implement the RecordProcessor class methods
<a name="kinesis-record-processor-implementation-interface-py"></a>

The `RecordProcess` class must extend the `RecordProcessorBase` to implement the following methods. The sample provides implementations that you can use as a starting point (see `sample_kclpy_app.py`).

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**initialize**  
The KCL calls the `initialize` method when the record processor is instantiated, passing a specific shard ID as a parameter. This record processor processes only this shard, and typically, the reverse is also true (this shard is processed only by this record processor). However, your consumer should account for the possibility that a data record might be processed more than one time. This is because Kinesis Data Streams has *at least once* semantics, meaning that every data record from a shard is processed at least one time by a worker in your consumer. For more information about cases in which a particular shard may be processed by more than one worker, see [Use resharding, scaling, and parallel processing to change the number of shards](kinesis-record-processor-scaling.md).

```
def initialize(self, shard_id)
```

**process\$1records**  
 The KCL calls this method, passing a list of data record from the shard specified by the `initialize` method. The record processor that you implement processes the data in these records according to the semantics of your consumer. For example, the worker might perform a transformation on the data and then store the result in an Amazon Simple Storage Service (Amazon S3) bucket.

```
def process_records(self, records, checkpointer) 
```

In addition to the data itself, the record also contains a sequence number and partition key. The worker can use these values when processing the data. For example, the worker could choose the S3 bucket in which to store the data based on the value of the partition key. The `record` dictionary exposes the following key-value pairs to access the record's data, sequence number, and partition key:

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

Note that the data is Base64-encoded.

In the sample, the method `process_records` has code that shows how a worker can access the record's data, sequence number, and partition key.

Kinesis Data Streams requires the record processor to keep track of the records that have already been processed in a shard. The KCL takes care of this tracking for you by passing a `Checkpointer` object to `process_records`. The record processor calls the `checkpoint` method on this object to inform the KCL of how far it has progressed in processing the records in the shard. If the worker fails, the KCL uses this information to restart the processing of the shard at the last known processed record.

For a split or merge operation, the KCL doesn't start processing the new shards until the processors for the original shards have called `checkpoint` to signal that all processing on the original shards is complete.

If you don't pass a parameter, the KCL assumes that the call to `checkpoint` means that all records have been processed, up to the last record that was passed to the record processor. Therefore, the record processor should call `checkpoint` only after it has processed all the records in the list that was passed to it. Record processors do not need to call `checkpoint` on each call to `process_records`. A processor could, for example, call `checkpoint` on every third call. You can optionally specify the exact sequence number of a record as a parameter to `checkpoint`. In this case, the KCL assumes that all records have been processed up to that record only.

In the sample, the private method `checkpoint` shows how to call the `Checkpointer.checkpoint` method using appropriate exception handling and retry logic.

The KCL relies on `process_records` to handle any exceptions that arise from processing the data records. If an exception is thrown from `process_records`, the KCL skips over the data records that were passed to `process_records` before the exception. That is, these records are not re-sent to the record processor that threw the exception or to any other record processor in the consumer.

**shutdown**  
 The KCL calls the `shutdown` method either when processing ends (the shutdown reason is `TERMINATE`) or the worker is no longer responding (the shutdown `reason` is `ZOMBIE`).

```
def shutdown(self, checkpointer, reason)
```

Processing ends when the record processor does not receive any further records from the shard, because either the shard was split or merged, or the stream was deleted.

 The KCL also passes a `Checkpointer` object to `shutdown`. If the shutdown `reason` is `TERMINATE`, the record processor should finish processing any data records, and then call the `checkpoint` method on this interface.

## Modify the configuration properties
<a name="kinesis-record-processor-initialization-py"></a>

The sample provides default values for the configuration properties. You can override any of these properties with your own values (see `sample.properties`).

### Application name
<a name="kinesis-record-processor-application-name-py"></a>

The KCL requires an application name that is unique among your applications, and among Amazon DynamoDB tables in the same Region. It uses the application name configuration value in the following ways:
+ All workers that are associated with this application name are assumed to be working together on the same stream. These workers can be distributed on multiple instances. If you run an additional instance of the same application code, but with a different application name, the KCL treats the second instance as an entirely separate application that is also operating on the same stream.
+ The KCL creates a DynamoDB table with the application name and uses the table to maintain state information (such as checkpoints and worker-shard mapping) for the application. Each application has its own DynamoDB table. For more information, see [Use a lease table to track the shards processed by the KCL consumer application](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Set up credentials
<a name="kinesis-record-processor-creds-py"></a>

You must make your AWS credentials available to one of the credential providers in the default credential providers chain. You can you use the `AWSCredentialsProvider` property to set a credentials provider. The [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) must make your credentials available to one of the credentials providers in the [default credential providers chain](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). If you are running your consumer application on an Amazon EC2 instance, we recommend that you configure the instance with an IAM role. AWS credentials that reflect the permissions associated with this IAM role are made available to applications on the instance through its instance metadata. This is the most secure way to manage credentials for a consumer application running on an EC2 instance.

The sample's properties file configures KCL to process a Kinesis data stream called "words" using the record processor supplied in `sample_kclpy_app.py`. 

# Develop a Kinesis Client Library consumer in Ruby
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

You can use the Kinesis Client Library (KCL) to build applications that process data from your Kinesis data streams. The Kinesis Client Library is available in multiple languages. This topic discusses Ruby.

The KCL is a Java library; support for languages other than Java is provided using a multi-language interface called the *MultiLangDaemon*. This daemon is Java-based and runs in the background when you are using a KCL language other than Java. Therefore, if you install the KCL for Ruby and write your consumer app entirely in Ruby, you still need Java installed on your system because of the MultiLangDaemon. Further, MultiLangDaemon has some default settings you may need to customize for your use case, for example, the AWS Region that it connects to. For more information about the MultiLangDaemon on GitHub, go to the [KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang) page.

To download the Ruby KCL from GitHub, go to [Kinesis Client Library (Ruby)](https://github.com/awslabs/amazon-kinesis-client-ruby). To download sample code for a Ruby KCL consumer application, go to the [KCL for Ruby sample project](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples) page on GitHub.

For more information about the KCL Ruby support library, see [KCL Ruby Gems Documentation](http://www.rubydoc.info/gems/aws-kclrb).

# Develop KCL 2.x Consumers
<a name="developing-consumers-with-kcl-v2"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

This topic shows you how to use version 2.0 of the Kinesis Client Library (KCL). 

For more information about the KCL, see the overview provided in [Developing Consumers Using the Kinesis Client Library 1.x](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html).

Choose from the following topics depending on the option you want to use.

**Topics**
+ [

# Develop a Kinesis Client Library consumer in Java
](kcl2-standard-consumer-java-example.md)
+ [

# Develop a Kinesis Client Library consumer in Python
](kcl2-standard-consumer-python-example.md)
+ [

# Develop enhanced fan-out consumers with KCL 2.x
](building-enhanced-consumers-kcl-retired.md)

# Develop a Kinesis Client Library consumer in Java
<a name="kcl2-standard-consumer-java-example"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

The following code shows an example implementation in Java of `ProcessorFactory` and `RecordProcessor`. If you want to take advantage of the enhanced fan-out feature, see [Using Consumers with Enhanced Fan-Out ](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html).

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Develop a Kinesis Client Library consumer in Python
<a name="kcl2-standard-consumer-python-example"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

You can use the Kinesis Client Library (KCL) to build applications that process data from your Kinesis data streams. The Kinesis Client Library is available in multiple languages. This topic discusses Python.

The KCL is a Java library; support for languages other than Java is provided using a multi-language interface called the *MultiLangDaemon*. This daemon is Java-based and runs in the background when you are using a KCL language other than Java. Therefore, if you install the KCL for Python and write your consumer app entirely in Python, you still need Java installed on your system because of the MultiLangDaemon. Further, MultiLangDaemon has some default settings you may need to customize for your use case, for example, the AWS Region that it connects to. For more information about the MultiLangDaemon on GitHub, go to the [KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang) page.

To download the Python KCL from GitHub, go to [Kinesis Client Library (Python)](https://github.com/awslabs/amazon-kinesis-client-python). To download sample code for a Python KCL consumer application, go to the [KCL for Python sample project](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) page on GitHub.

You must complete the following tasks when implementing a KCL consumer application in Python:

**Topics**
+ [

## Implement the RecordProcessor class methods
](#kinesis-record-processor-implementation-interface-py)
+ [

## Modify the configuration properties
](#kinesis-record-processor-initialization-py)

## Implement the RecordProcessor class methods
<a name="kinesis-record-processor-implementation-interface-py"></a>

The `RecordProcess` class must extend the `RecordProcessorBase` class to implement the following methods:

```
initialize
process_records
shutdown_requested
```

This sample provides implementations that you can use as a starting point.

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## Modify the configuration properties
<a name="kinesis-record-processor-initialization-py"></a>

The sample provides default values for the configuration properties, as shown in the following script. You can override any of these properties with your own values.

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### Application name
<a name="kinesis-record-processor-application-name-py"></a>

The KCL requires an application name that is unique among your applications and among Amazon DynamoDB tables in the same Region. It uses the application name configuration value in the following ways:
+ All workers that are associated with this application name are assumed to be working together on the same stream. These workers can be distributed across multiple instances. If you run an additional instance of the same application code, but with a different application name, the KCL treats the second instance as an entirely separate application that is also operating on the same stream.
+ The KCL creates a DynamoDB table with the application name and uses the table to maintain state information (such as checkpoints and worker-shard mapping) for the application. Each application has its own DynamoDB table. For more information, see [Use a lease table to track the shards processed by the KCL consumer application](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Credentials
<a name="kinesis-record-processor-creds-py"></a>

You must make your AWS credentials available to one of the credential providers in the [default credential providers chain](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). You can you use the `AWSCredentialsProvider` property to set a credentials provider. If you run your consumer application on an Amazon EC2 instance, we recommend that you configure the instance with an IAM role. AWS credentials that reflect the permissions associated with this IAM role are made available to applications on the instance through its instance metadata. This is the most secure way to manage credentials for a consumer application running on an EC2 instance.

# Develop enhanced fan-out consumers with KCL 2.x
<a name="building-enhanced-consumers-kcl-retired"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

Consumers that use *enhanced fan-out* in Amazon Kinesis Data Streams can receive records from a data stream with dedicated throughput of up to 2 MB of data per second per shard. This type of consumer doesn't have to contend with other consumers that are receiving data from the stream. For more information, see [Develop enhanced fan-out consumers with dedicated throughput](enhanced-consumers.md).

You can use version 2.0 or later of the Kinesis Client Library (KCL) to develop applications that use enhanced fan-out to receive data from streams. The KCL automatically subscribes your application to all the shards of a stream, and ensures that your consumer application can read with a throughput value of 2 MB/sec per shard. If you want to use the KCL without turning on enhanced fan-out, see [Developing Consumers Using the Kinesis Client Library 2.0](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html).

**Topics**
+ [

# Develop enhanced fan-out consumers using KCL 2.x in Java
](building-enhanced-consumers-kcl-java.md)

# Develop enhanced fan-out consumers using KCL 2.x in Java
<a name="building-enhanced-consumers-kcl-java"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

You can use version 2.0 or later of the Kinesis Client Library (KCL) to develop applications in Amazon Kinesis Data Streams to receive data from streams using enhanced fan-out. The following code shows an example implementation in Java of `ProcessorFactory` and `RecordProcessor`.

It is recommended that you use `KinesisClientUtil` to create `KinesisAsyncClient` and to configure `maxConcurrency` in `KinesisAsyncClient`.

**Important**  
The Amazon Kinesis Client might see significantly increased latency, unless you configure `KinesisAsyncClient` to have a `maxConcurrency` high enough to allow all leases plus additional usages of `KinesisAsyncClient`.

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Migrate consumers from KCL 1.x to KCL 2.x
<a name="kcl-migration"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

This topic explains the differences between versions 1.x and 2.x of the Kinesis Client Library (KCL). It also shows you how to migrate your consumer from version 1.x to version 2.x of the KCL. After you migrate your client, it will start processing records from the last checkpointed location.

Version 2.0 of the KCL introduces the following interface changes:


**KCL Interface Changes**  

| KCL 1.x Interface | KCL 2.0 Interface | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | Folded into software.amazon.kinesis.processor.ShardRecordProcessor | 

**Topics**
+ [

## Migrate the record processor
](#recrod-processor-migration)
+ [

## Migrate the record processor factory
](#recrod-processor-factory-migration)
+ [

## Migrate the worker
](#worker-migration)
+ [

## Configure the Amazon Kinesis client
](#client-configuration)
+ [

## Idle time removal
](#idle-time-removal)
+ [

## Client configuration removals
](#client-configuration-removals)

## Migrate the record processor
<a name="recrod-processor-migration"></a>

The following example shows a record processor implemented for KCL 1.x:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**To migrate the record processor class**

1. Change the interfaces from `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` and `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` to `software.amazon.kinesis.processor.ShardRecordProcessor`, as follows:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. Update the `import` statements for the `initialize` and `processRecords` methods.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. Replace the `shutdown` method with the following new methods: `leaseLost`, `shardEnded`, and `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

The following is the updated version of the record processor class.

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## Migrate the record processor factory
<a name="recrod-processor-factory-migration"></a>

The record processor factory is responsible for creating record processors when a lease is acquired. The following is an example of a KCL 1.x factory.

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**To migrate the record processor factory**

1. Change the implemented interface from `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` to `software.amazon.kinesis.processor.ShardRecordProcessorFactory`, as follows.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. Change the return signature for `createProcessor`.

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

The following is an example of the record processor factory in 2.0:

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## Migrate the worker
<a name="worker-migration"></a>

In version 2.0 of the KCL, a new class, called `Scheduler`, replaces the `Worker` class. The following is an example of a KCL 1.x worker.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**To migrate the worker**

1. Change the `import` statement for the `Worker` class to the import statements for the `Scheduler` and `ConfigsBuilder` classes.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Create the `ConfigsBuilder` and a `Scheduler` as shown in the following example.

   It is recommended that you use `KinesisClientUtil` to create `KinesisAsyncClient` and to configure `maxConcurrency` in `KinesisAsyncClient`.
**Important**  
The Amazon Kinesis Client might see significantly increased latency, unless you configure `KinesisAsyncClient` to have a `maxConcurrency` high enough to allow all leases plus additional usages of `KinesisAsyncClient`.

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## Configure the Amazon Kinesis client
<a name="client-configuration"></a>

With the 2.0 release of the Kinesis Client Library, the configuration of the client moved from a single configuration class (`KinesisClientLibConfiguration`) to six configuration classes. The following table describes the migration.


**Configuration Fields and Their New Classes**  

| Original Field | New Configuration Class | Description | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | The name for this the KCL application. Used as the default for the tableName and consumerName. | 
| tableName | ConfigsBuilder | Allows overriding the table name used for the Amazon DynamoDB lease table. | 
| streamName | ConfigsBuilder | The name of the stream that this application processes records from. | 
| kinesisEndpoint | ConfigsBuilder | This option has been removed. See Client Configuration Removals. | 
| dynamoDBEndpoint | ConfigsBuilder | This option has been removed. See Client Configuration Removals. | 
| initialPositionInStreamExtended | RetrievalConfig | The location in the shard from which the KCL begins fetching records, starting with the application's initial run. | 
| kinesisCredentialsProvider | ConfigsBuilder | This option has been removed. See Client Configuration Removals. | 
| dynamoDBCredentialsProvider | ConfigsBuilder | This option has been removed. See Client Configuration Removals. | 
| cloudWatchCredentialsProvider | ConfigsBuilder | This option has been removed. See Client Configuration Removals. | 
| failoverTimeMillis | LeaseManagementConfig | The number of milliseconds that must pass before you can consider a lease owner to have failed. | 
| workerIdentifier | ConfigsBuilder | A unique identifier that represents this instantiation of the application processor. This must be unique. | 
| shardSyncIntervalMillis | LeaseManagementConfig | The time between shard sync calls. | 
| maxRecords | PollingConfig | Allows setting the maximum number of records that Kinesis returns. | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | This option has been removed. See Idle Time Removal. | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | When set, the record processor is called even when no records were provided from Kinesis. | 
| parentShardPollIntervalMillis | CoordinatorConfig | How often a record processor should poll to see if the parent shard has been completed. | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | When set, leases are removed as soon as the child leases have started processing. | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | When set, child shards that have an open shard are ignored. This is primarily for DynamoDB Streams. | 
| kinesisClientConfig | ConfigsBuilder | This option has been removed. See Client Configuration Removals. | 
| dynamoDBClientConfig | ConfigsBuilder | This option has been removed. See Client Configuration Removals. | 
| cloudWatchClientConfig | ConfigsBuilder | This option has been removed. See Client Configuration Removals. | 
| taskBackoffTimeMillis | LifecycleConfig | The time to wait to retry failed tasks. | 
| metricsBufferTimeMillis | MetricsConfig | Controls CloudWatch metric publishing. | 
| metricsMaxQueueSize | MetricsConfig | Controls CloudWatch metric publishing. | 
| metricsLevel | MetricsConfig | Controls CloudWatch metric publishing. | 
| metricsEnabledDimensions | MetricsConfig | Controls CloudWatch metric publishing. | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | This option has been removed. See Checkpoint Sequence Number Validation. | 
| regionName | ConfigsBuilder | This option has been removed. See Client Configuration Removal. | 
| maxLeasesForWorker | LeaseManagementConfig | The maximum number of leases a single instance of the application should accept. | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | The maximum number of leases an application should attempt to steal at one time. | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | The DynamoDB read IOPs that is used if the Kinesis Client Library needs to create a new DynamoDB lease table. | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | The DynamoDB read IOPs that is used if the Kinesis Client Library needs to create a new DynamoDB lease table. | 
| initialPositionInStreamExtended | LeaseManagementConfig | The initial position in the stream that the application should start at. This is only used during initial lease creation. | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | Disable synchronizing shard data if the lease table contains existing leases. TODO: KinesisEco-438 | 
| shardPrioritization | CoordinatorConfig | Which shard prioritization to use. | 
| shutdownGraceMillis | N/A | This option has been removed. See MultiLang Removals. | 
| timeoutInSeconds | N/A | This option has been removed. See MultiLang Removals. | 
| retryGetRecordsInSeconds | PollingConfig | Configures the delay between GetRecords attempts for failures. | 
| maxGetRecordsThreadPool | PollingConfig | The thread pool size used for GetRecords. | 
| maxLeaseRenewalThreads | LeaseManagementConfig | Controls the size of the lease renewer thread pool. The more leases that your application could take, the larger this pool should be. | 
| recordsFetcherFactory | PollingConfig | Allows replacing the factory used to create fetchers that retrieve from streams. | 
| logWarningForTaskAfterMillis | LifecycleConfig | How long to wait before a warning is logged if a task hasn't completed. | 
| listShardsBackoffTimeInMillis | RetrievalConfig | The number of milliseconds to wait between calls to ListShards when failures occur. | 
| maxListShardsRetryAttempts | RetrievalConfig | The maximum number of times that ListShards retries before giving up. | 

## Idle time removal
<a name="idle-time-removal"></a>

In the 1.x version of the KCL, the `idleTimeBetweenReadsInMillis` corresponded to two quantities: 
+ The amount of time between task dispatching checks. You can now configure this time between tasks by setting `CoordinatorConfig#shardConsumerDispatchPollIntervalMillis`.
+ The amount of time to sleep when no records were returned from Kinesis Data Streams. In version 2.0, in enhanced fan-out records are pushed from their respective retriever. Activity on the shard consumer only occurs when a pushed request arrives. 

## Client configuration removals
<a name="client-configuration-removals"></a>

In version 2.0, the KCL no longer creates clients. It depends on the user to supply a valid client. With this change, all configuration parameters that controlled client creation have been removed. If you need these parameters, you can set them on the clients before providing the clients to `ConfigsBuilder`.


****  

| Removed Field | Equivalent Configuration | 
| --- | --- | 
| kinesisEndpoint | Configure the SDK KinesisAsyncClient with preferred endpoint: KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build(). | 
| dynamoDBEndpoint | Configure the SDK DynamoDbAsyncClient with preferred endpoint: DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build(). | 
| kinesisClientConfig | Configure the SDK KinesisAsyncClient with the needed configuration: KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| dynamoDBClientConfig | Configure the SDK DynamoDbAsyncClient with the needed configuration: DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| cloudWatchClientConfig | Configure the SDK CloudWatchAsyncClient with the needed configuration: CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| regionName | Configure the SDK with the preferred Region. This is the same for all SDK clients. For example, KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build(). | 