

# Read data from Amazon Kinesis Data Streams
<a name="building-consumers"></a>

A *consumer* is an application that processes all data from a Kinesis data stream. When a consumer uses *enhanced fan-out*, it gets its own 2 MB/sec allotment of read throughput, allowing multiple consumers to read data from the same stream in parallel, without contending for read throughput with other consumers. To use the enhanced fan-out capability of shards, see [Develop enhanced fan-out consumers with dedicated throughput](enhanced-consumers.md).

You can build consumers for Kinesis Data Streams using Kinesis Client Library (KCL) or AWS SDK for Java. You can also develop consumers using other AWS services such as AWS Lambda, Amazon Managed Service for Apache Flink, and Amazon Data Firehose. Kinesis Data Streams supports integrations with other AWS services such as Amazon EMR, Amazon EventBridge, AWS Glue, and Amazon Redshift It also supports third party integrations including Apache Flink, Adobe Experience Platform, Apache Druid, Apache Spark, Databricks, Confluent Platform, Kinesumer, and Talend. 

**Topics**
+ [

# Develop enhanced fan-out consumers with dedicated throughput
](enhanced-consumers.md)
+ [

# Use the Data Viewer in the Kinesis console
](data-viewer.md)
+ [

# Query your data streams in the Kinesis console
](querying-data.md)
+ [

# Use Kinesis Client Library
](kcl.md)
+ [

# Develop consumers with the AWS SDK for Java
](develop-consumers-sdk.md)
+ [

# Develop consumers using AWS Lambda
](lambda-consumer.md)
+ [

# Develop consumers using Amazon Managed Service for Apache Flink
](kda-consumer.md)
+ [

# Develop consumers using Amazon Data Firehose
](kdf-consumer.md)
+ [

# Read data from Kinesis Data Streams using other AWS services
](using-other-services-read.md)
+ [

# Read from Kinesis Data Streams using third-party integrations
](using-services-third-party-read.md)
+ [

# Troubleshoot Kinesis Data Streams consumers
](troubleshooting-consumers.md)
+ [

# Optimize Amazon Kinesis Data Streams consumers
](advanced-consumers.md)

# Develop enhanced fan-out consumers with dedicated throughput
<a name="enhanced-consumers"></a>

In Amazon Kinesis Data Streams, you can build consumers that use a feature called *enhanced fan-out*. This feature lets consumers receive records from a stream with throughput of up to 2 MB of data per second per shard. This throughput is dedicated, which means that consumers that use enhanced fan-out don't have to contend with other consumers that are receiving data from the stream. Kinesis Data Streams pushes data records from the stream to consumers that use enhanced fan-out. Therefore, these consumers don't need to poll for data.

**Important**  
With On-demand Advantage mode, you can register up to 50 consumers per stream to use enhanced fan-out. With On-demand Standard and Provisioned streams, you can register up to 20 consumers per stream to use enhanced fan-out. 

The following diagram shows the enhanced fan-out architecture. If you use version 2.0 or later of the Amazon Kinesis Client Library (KCL) to build a consumer, the KCL sets up the consumer to use enhanced fan-out to receive data from all the shards of the stream. If you use the API to build a consumer that uses enhanced fan-out, then you can subscribe to individual shards.

![\[Workflow diagram showing enhanced fan-out architecture with two shards and two consumers. Each of the two consumers is using enhanced fan-out to receive data from both shards of the stream.\]](http://docs.aws.amazon.com/streams/latest/dev/images/enhanced_fan-out.png)


The diagram shows the following: 
+ A stream with two shards.
+ Two consumers that are using enhanced fan-out to receive data from the stream: Consumer X and Consumer Y. Each of the two consumers is subscribed to all of the shards and all of the records of the stream. If you use version 2.0 or later of the KCL to build a consumer, the KCL automatically subscribes that consumer to all the shards of the stream. On the other hand, if you use the API to build a consumer, you can subscribe to individual shards. 
+ Arrows representing the enhanced fan-out pipes that the consumers use to receive data from the stream. An enhanced fan-out pipe provides up to 2 MB/sec of data per shard, independently of any other pipes or of the total number of consumers.

**Topics**
+ [

## Differences between shared throughput consumer and enhanced fan-out consumer
](#enhanced-consumers-differences)
+ [

## Regions supported for up to 50 enhanced fan-out consumers (On-demand Advantage only)
](#supported-regions)
+ [

# Manage enhanced fan-out consumers with the AWS CLI or APIs
](building-enhanced-consumers-console.md)

## Differences between shared throughput consumer and enhanced fan-out consumer
<a name="enhanced-consumers-differences"></a>

The following table compares default shared-throughput consumers to enhanced fan-out consumers. Message propagation delay is defined as the time taken in milliseconds for a payload sent using the payload-dispatching APIs (like `PutRecord` and `PutRecords`) to reach the consumer application through the payload-consuming APIs (like `GetRecords` and `SubscribeToShard`).


**This table compares shared-throughput consumers to enhanced fan-out consumers**  

| Characteristics | Shared throughput consumers without enhanced fan-out | Enhanced fan-out consumers | 
| --- | --- | --- | 
| Read throughput |  Fixed at a total of 2 MB/sec per shard. If there are multiple consumers reading from the same shard, they all share this throughput. The sum of the throughputs they receive from the shard doesn't exceed 2 MB/sec.  |  Scales as consumers register to use enhanced fan-out. Each consumer registered to use enhanced fan-out receives its own read throughput per shard, up to 2 MB/sec, independently of other consumers.  | 
| Message propagation delay |  An average of around 200 ms if you have one consumer reading from the stream. This average goes up to around 1000 ms if you have five consumers.  |  Typically an average of 70 ms whether you have one consumer or five consumers.  | 
| Cost | Not applicable |  There is a data retrieval cost and a consumer-shard hour cost. For more information, see [Amazon Kinesis Data Streams Pricing](https://aws.amazon.com/kinesis/data-streams/pricing/?nc=sn&loc=3).  | 
| Record delivery model |  Pull model over HTTP using GetRecords.  |  Kinesis Data Streams pushes the records to you over HTTP/2 using SubscribeToShard.  | 

## Regions supported for up to 50 enhanced fan-out consumers (On-demand Advantage only)
<a name="supported-regions"></a>

 Support for up to 50 enhanced fan-out consumers in On-demand Advantage mode is available only in the following AWS Regions:


| AWS Region | Region Name | 
| --- | --- | 
|  eu-north-1 | Europe (Stockholm) | 
|  me-south-1 | Middle East (Bahrain) | 
|  ap-south-1 | Asia Pacific (Mumbai) | 
|  eu-west-3 | Europe (Paris) | 
|  ap-southeast-3 | Asia Pacific (Jakarta) | 
|  us-east-2 | US East (Ohio) | 
|  af-south-1 | Africa (Cape Town) | 
|  eu-west-1 | Europe (Ireland) | 
|  me-central-1 | Middle East (UAE) | 
|  eu-central-1 | Europe (Frankfurt) | 
|  sa-east-1 | South America (São Paulo) | 
|  ap-east-1 | Asia Pacific (Hong Kong) | 
|  ap-south-2 | Asia Pacific (Hyderabad) | 
|  us-east-1 | US East (N. Virginia) | 
|  ap-northeast-2 | Asia Pacific (Seoul) | 
|  ap-northeast-3 | Asia Pacific (Osaka) | 
|  eu-west-2 | Europe (London) | 
|  ap-southeast-4 | Asia Pacific (Melbourne) | 
|  ap-northeast-1 | Asia Pacific (Tokyo) | 
|  us-west-2 | US West (Oregon) | 
|  us-west-1 | US West (N. California) | 
|  ap-southeast-1 | Asia Pacific (Singapore) | 
|  ap-southeast-2 | Asia Pacific (Sydney) | 
|  il-central-1 | Israel (Tel Aviv) | 
|  ca-central-1 | Canada (Central) | 
|  ca-west-1 | Canada West (Calgary) | 
|  eu-south-2 | Europe (Spain) | 
|  cn-northwest-1 | China (Ningxia) | 
|  eu-central-2 | Europe (Zurich) | 
| us-gov-east-1 | AWS GovCloud (US-East) | 
| us-gov-west-1 | AWS GovCloud (US-West) | 

# Manage enhanced fan-out consumers with the AWS CLI or APIs
<a name="building-enhanced-consumers-console"></a>

Consumers that use *enhanced fan-out* in Amazon Kinesis Data Streams can receive records from a data stream with dedicated throughput of up to 2 MB of data per second per shard. For more information, see [Develop enhanced fan-out consumers with dedicated throughput](enhanced-consumers.md).

You can use AWS CLI or Kinesis Data Streams APIs to register, describe, list, and deregister a consumer that uses enhanced fan-out in Kinesis Data Streams.

## Manage consumers using the AWS CLI
<a name="manage-consumers-cli"></a>

You can register, describe, list, and deregister enhanced fan-out consumers using the AWS CLI. For examples, see the following documentation.

[register-stream-consumer](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/register-stream-consumer.html)  
Registers a consumer for a Kinesis data stream. You can apply tags while registering the consumer. 

[describe-stream-consumer](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/describe-stream-consumer.html)  
Gets the description of a registered consumer with either consumer ARN or consumer name and stream ARN combination.

[list-stream-consumers](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/list-stream-consumers.html)  
Lists the consumers registered to receive data from a stream using enhanced fan-out.

[deregister-stream-consumer](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/deregister-stream-consumer.html)  
Deregister a consumer with either consumer ARN or consumer name and stream ARN combination.

## Manage consumers using the Kinesis Data Streams APIs
<a name="manage-consumers-api"></a>

You can register, describe, list, and deregister enhanced fan-out consumers using the Kinesis Data Streams APIs. For examples, see the following documentation.

[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)  
Registers a consumer for a Kinesis data stream with tags. You can apply tags while registering the consumer.

[DescribeStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html)  
Gets the description of a registered consumer with either consumer ARN or consumer name and stream ARN combination.

[ListStreamConsumers](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreamConsumers.html)  
Lists the consumers registered to receive data from a stream using enhanced fan-out.

[DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)  
Deregister a consumer with either consumer ARN or consumer name and stream ARN combination.

## Tagging consumers
<a name="tag-consumers"></a>

You can assign your own metadata to streams and enhanced fan-out consumers you create in Kinesis Data Streams in the form of tags. You can use tags to categorize and track costs of your consumers. You can also control access to consumers using tags with [attribute-based access control (ABAC)](https://docs.aws.amazon.com/IAM/latest/UserGuide/introduction_attribute-based-access-control.html). For more information, see [Tag your Amazon Kinesis Data Streams resources](tagging.md).

# Use the Data Viewer in the Kinesis console
<a name="data-viewer"></a>

 The Data Viewer in the Kinesis Management Console lets you view data records within the specified shard of your data stream without having to develop a consumer application. To use the Data Viewer, follow these steps: 

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose the active data stream whose records you want to view with the Data Viewer and then choose the **Data Viewer** tab.

1. In the **Data Viewer** tab for the selected active data stream, choose the shard whose records you want to view, chose the **Starting Position**, and then click **Get records**. You can set the starting position to one of the following values:
   + **At sequence number**: Show records from the position denoted by the sequence number specified in the sequence number field.
   + **After sequence number**: Show records right after the position denoted by the sequence number specified in the sequence number field.
   + **At timestamp**: Show records from the position denoted by the time stamp specified in the timestamp field.
   + **Trim horizon**: Show records at the last untrimmed record in the shard, which is the oldest data record in the shard.
   + **Latest**: Show records just after the most recent record in the shard, so that you always read the most recent data in the shard.

     The generated data records that match the specified shard ID and starting position are then displayed in a records table in the console. Maximum 50 records are displayed at a time. To view the next set of records, click the **Next** button.

1. Click any individual record to view that record payload in raw data or JSON format in a separate window.

Note that when you click **Get records** or **Next** buttons in **Data Viewer**, this invokes the **GetRecords** API and this applies towards the **GetRecords** API limit of 5 transactions per second. 

# Query your data streams in the Kinesis console
<a name="querying-data"></a>

 The Data Analytics tab in the Kinesis Data Streams Console lets you query your data streams using SQL. To use this capability, follow these steps: 

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose the active data stream that you want to query with SQL and then choose the **Data analytics** tab.

1. In the **Data analytics** tab, you can perform stream inspection and visualization with a Managed Apache Flink Studio notebook. You can perform ad-hoc SQL queries to inspect your data stream and view results in seconds using Apache Zeppelin. In the **Data analytics** tab, choose **I agree** and then choose **Create notebook** to create a notebook. 

1. After the notebook is created, choose **Open in Apache Zeppelin**. This will open your notebook in a new tab. A notebook is an interactive interface where you can submit your SQL queries. Choose the note that contains the name of your stream. 

1. You will see a note with a sample `SELECT` query to output the data in the stream already running. This lets you view the schema for your data stream. 

1. To try out other queries such as tumbling or sliding windows, choose **View sample queries** in the **Data analytics** tab. Copy the query, modify it to suit your data stream schema, and then run it in a new paragraph in your Zeppelin note. 

# Use Kinesis Client Library
<a name="kcl"></a>

## What is Kinesis Client Library?
<a name="kcl-library-what-is"></a>

Kinesis Client Library (KCL) is a standalone Java software library designed to simplify the process of consuming and processing data from Amazon Kinesis Data Streams. KCL handles many of the complex tasks associated with distributed computing, letting developers focus on implementing their business logic for processing data. It manages activities such as load balancing across multiple workers, responding to worker failures, checkpointing processed records, and responding to changes in the number of shards in the stream.

KCL is frequently updated to incorporate newer versions of underlying libraries, security improvements, and bug fixes. We recommend that you use the latest version of KCL to avoid known issues and benefit from all latest improvements. To find the latest KCL version, see [ KCL Github](https://github.com/awslabs/amazon-kinesis-client). 

**Important**  
We recommend that you use the latest KCL version to avoid known bugs and issues. If you are using KCL 2.6.0 or earlier, upgrade to KCL 2.6.1 or later to avoid a rare condition that can block shard processing when stream capacity changes. 
KCL is a Java library. Support for languages other than Java is provided using a Java-based daemon called MultiLangDaemon. MultiLangDaemon interacts with the KCL application over STDIN and STDOUT. For more information about the MultiLangDaemon on GitHub, see [Develop consumers with KCL in non-Java languages](develop-kcl-consumers-non-java.md).
Do not use AWS SDK for Java version 2.27.19 to 2.27.23 with KCL 3.x. These versions include an issue that causes an exception error related to KCL's DynamoDB usage. We recommend that you use the AWS SDK for Java version 2.28.0 or later to avoid this issue. 

## KCL key features and benefits
<a name="kcl-benefits"></a>

Following are the key features and related benefits of the KCL:
+ **Scalability**: KCL enables applications to scale dynamically by distributing the processing load across multiple workers. You can scale your application in or out, manually or with auto-scaling, without worrying about load redistribution.
+ **Load balancing**: KCL automatically balances the processing load across available workers, resulting in an even distribution of work across workers.
+ **Checkpointing**: KCL manages checkpointing of processed records, enabling applications to resume processing from their last successfully processed position.
+ **Fault tolerance**: KCL provides built-in fault tolerance mechanisms, making sure that data processing continues even if individual workers fail. KCL also provides at-least-once delivery.
+ **Handling stream-level changes**: KCL adapts to shard splits and merges that might occur due to changes in data volume. It maintains ordering by making sure that child shards are processed only after their parent shard is completed and checkpointed.
+ **Monitoring**: KCL integrates with Amazon CloudWatch for consumer-level monitoring.
+ **Multi-language support**: KCL natively supports Java and enables multiple non-Java programming languages through MultiLangDaemon.

# KCL concepts
<a name="kcl-concepts"></a>

This section explains the core concepts and interactions of Kinesis Client Library (KCL). These concepts are fundamental to developing and managing KCL consumer applications.
+ **KCL consumer application** – a custom-built application designed to read and process records from Kinesis data streams using the Kinesis Client Library.
+ **Worker** – KCL consumer applications are typically distributed, with one or more workers running simultaneously. KCL coordinates workers to consume data from the stream in a distributed manner and balances the load evenly across multiple workers.
+ **Scheduler** – a high-level class that a KCL worker uses to start processing data. Each KCL worker has one scheduler. The scheduler initializes and oversees various tasks, including syncing shard information from Kinesis data streams, tracking shard assignments among workers, and processing data from the stream based on the assigned shards to the worker. Scheduler can take various configurations that affect the scheduler's behavior, such as the name of the stream to process and AWS credentials. Scheduler initiates the delivery of data records from the stream to the record processors.
+ **Record processor** – defines the logic for how your KCL consumer application processes the data it receives from the data streams. You must implement your own custom data processing logic in the record processor. A KCL worker instantiates a scheduler. The scheduler then instantiates one record processor for every shard to which it holds a lease. A worker can run multiple record processors.
+ **Lease** – defines the assignment between a worker and a shard. KCL consumer applications use leases to distribute data record processing across multiple workers. Each shard is bound to only one worker by a lease at any given time and each worker can hold one or more leases simultaneously. When a worker stops holding a lease due to stopping or failing, KCL assigns another worker to take the lease. To learn more about the lease, see [Github documentation: Lease Lifecycle](https://github.com/awslabs/amazon-kinesis-client/blob/master/docs/lease-lifecycle.md#lease-lifecycle).
+ **Lease table** – is a unique Amazon DynamoDB table used to track all leases for the KCL consumer application. Each KCL consumer application creates its own lease table. The lease table is used to maintain state across all workers to coordinate data processing. For more information, see [DynamoDB metadata tables and load balancing in KCL](kcl-dynamoDB.md).
+ **Checkpointing** – is the process of persistently storing the position of the last successfully processed record in a shard. KCL manages checkpointing to make sure that processing can be resumed from the last checkpointed position if a worker fails or the application restarts. Checkpoints are stored in the DynamoDB lease table as part of the metadata of the lease. This allows workers to continue processing from where the previous worker stopped.

# DynamoDB metadata tables and load balancing in KCL
<a name="kcl-dynamoDB"></a>

KCL manages metadata such as leases and CPU utilization metrics from workers. KCL tracks these metadata using DynamoDB tables. For each Amazon Kinesis Data Streams application, KCL creates three DynamoDB tables to manage the metadata: lease table, worker metrics table, and coordinator state table.

**Note**  
KCL 3.x introduced two new metadata tables: *worker metrics* and *coordinator state* tables.

**Important**  
 You must add proper permissions for KCL applications to create and manage metadata tables in DynamoDB. For details, see [IAM permissions required for KCL consumer applications](kcl-iam-permissions.md).  
KCL consumer application does not automatically remove these three DynamoDB metadata tables. Make sure that you remove these DynamoDB metadata tables created by KCL consumer application when you decommission your consumer application to prevent unnecessary cost.

## Lease table
<a name="kcl-leasetable"></a>

A lease table is a unique Amazon DynamoDB table used to track the shards being leased and processed by the schedulers of the KCL consumer application. Each KCL consumer application creates its own lease table. KCL uses the name of the consumer application for the name of the lease table by default. You can set a custom table name using configuration. KCL also creates a [global secondary index](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html) on the lease table with the partition key of leaseOwner for an efficient lease discovery. Global secondary index mirrors the leaseKey attribute from the base lease table. If the lease table for your KCL consumer application does not exist when the application starts up, one of the workers creates the lease table for your application.

You can view the lease table using the [Amazon DynamoDB console](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) while the consumer application is running.

**Important**  
Each KCL consumer application name must be unique to prevent a duplicated lease table name. 
Your account is charged for the costs associated with the DynamoDB table, in addition to the costs associated with Kinesis Data Streams itself. 

Each row in the lease table represents a shard that is being processed by the schedulers of your consumer application. Key fields include the following:
+ **leaseKey:** For single-stream processing, this is the shard ID. For multi-stream processing with KCL, it's structured as `account-id:StreamName:streamCreationTimestamp:ShardId`. leaseKey is the partition key of the lease table. For more information about multi-stream processing, see [Multi-stream processing with KCL](kcl-multi-stream.md).
+ **checkpoint:** The most recent checkpoint sequence number for the shard. 
+ **checkpointSubSequenceNumber:** When using the Kinesis Producer Library's aggregation feature, this is an extension to **checkpoint** that tracks individual user records within the Kinesis record.
+ **leaseCounter:** Used for checking if a worker is currently processing the lease actively. leaseCounter increases if the lease ownership is transferred to another worker.
+ **leaseOwner:** The current worker that is holding this lease.
+ **ownerSwitchesSinceCheckpoint:** How many times this lease has changed workers since the last checkpoint.
+ **parentShardId:** ID of this shard's parent. Makes sure that the parent shard is fully processed before processing starts on the child shards, maintaining the correct record processing order.
+ **childShardId:** List of child shard IDs resulting from this shard's split or merge. Used to track shard lineage and manage processing order during resharding operations.
+ **startingHashKey:** The lower bound of the hash key range for this shard.
+ **endingHashKey:** The upper bound of the hash key range for this shard.

If you use multi-stream processing with KCL, you see the following two additional fields in the lease table. For more information, see [Multi-stream processing with KCL](kcl-multi-stream.md).
+ **shardID:** The ID of the shard.
+ **streamName:** The identifier of the data stream in the following format: `account-id:StreamName:streamCreationTimestamp`.

## Worker metrics table
<a name="kcl-worker-metrics-table"></a>

Worker metrics table is a unique Amazon DynamoDB table for each KCL application and is used to record CPU utilization metrics from each worker. These metrics will be used by KCL to perform efficient lease assignments to result in balanced resource utilization across workers. KCL uses `KCLApplicationName-WorkerMetricStats` for the name of the worker metrics table by default.

## Coordinator state table
<a name="kcl-coordinator-state-table"></a>

A coordinator state table is a unique Amazon DynamoDB table for each KCL application and is used to store internal state information for workers. For example, the coordinator state table stores data regarding the leader election or metadata associated with the in-place migration from KCL 2.x to KCL 3.x. KCL uses `KCLApplicationName-CoordinatorState` for the name of the coordinator state table by default.

## DynamoDB capacity mode for metadata tables created by KCL
<a name="kcl-capacity-mode"></a>

By default, the Kinesis Client Library (KCL) creates DynamoDB metadata tables such as lease table, worker metrics table, and coordinator state table using the [on-demand capacity mode](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/on-demand-capacity-mode.html). This mode automatically scales read and write capacity to accommodate traffic without requiring capacity planning. We strongly recommend you to keep the capacity mode as on-demand mode for more efficient operation of these metadata tables.

If you decide to switch the lease table to [provisioned capacity mode](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html), follow these best practices:
+ Analyze usage patterns:
  + Monitor your application's read and write patterns and usages (RCU, WCU) using Amazon CloudWatch metrics.
  + Understand peak and average throughput requirements.
+ Calculate the required capacity:
  + Estimate read capacity units (RCUs) and write capacity units (WCUs) based on your analysis.
  + Consider factors like the number of shards, checkpoint frequency, and worker count.
+ Implement auto scaling:
  + Use [DynamoDB auto scaling](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html#ddb-autoscaling) to automatically adjust provisioned capacity and set appropriate minimum and maximum capacity limits. 
  + DynamoDB auto scaling will help to avoid your KCL metadata table from hitting the capacity limit and getting throttled.
+ Regular monitoring and optimization:
  + Continuously monitor CloudWatch metrics for `ThrottledRequests`.
  + Adjust capacity as your workload changes over time.

If you experience a `ProvisionedThroughputExceededException` in metadata DynamoDB tables for your KCL consumer application, you must increase the provisioned throughput capacity of the DynamoDB table. If you set a certain level of read capacity units (RCU) and write capacity units (WCU) when you first create your consumer application, it might not be sufficient as your usage grows. For example, if your KCL consumer application does frequent checkpointing or operates on a stream with many shards, you might need more capacity units. For information about provisioned throughput in DynamoDB, see [DynamoDB throughput capacity](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/capacity-mode.html) and [updating a table](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html#WorkingWithTables.Basics.UpdateTable) in the Amazon DynamoDB Developer Guide.

## How KCL assigns leases to workers and balances the load
<a name="kcl-assign-leases"></a>

KCL continuously gathers and monitors CPU utilization metrics from compute hosts running the workers to ensure even workload distribution. These CPU utilization metrics are stored in the worker metrics table in DynamoDB. If KCL detects that some workers are showing higher CPU utilization rates compared to others, it will reassign leases among workers to lower the load on highly used workers. The goal is to balance the workload more evenly across the consumer application fleet, preventing any single worker from becoming overloaded. As KCL distributes CPU utilization across the consumer application fleet, you can right-size your consumer application fleet capacity by choosing the right number of workers or use auto scaling to efficiently manage the computing capacity to achieve lower cost.

**Important**  
KCL can collect CPU utilization metrics from workers only if certain prerequisites are met. For details, see [Prerequisites](develop-kcl-consumers-java.md#develop-kcl-consumers-java-prerequisites). If KCL cannot collect CPU utilization metrics from workers, KCL will fall back to using throughput per worker to assign leases and balance the load across workers in the fleet. KCL will monitor the throughput that each worker receives at a given time and reassign leases to make sure that each worker gets a similar total throughput level from its assigned leases.

# Develop consumers with KCL
<a name="develop-kcl-consumers"></a>

You can use the Kinesis Client Library (KCL) to build consumer applications that process data from your Kinesis data streams.

KCL is available in multiple languages. This topic covers how to develop KCL consumers in Java and non-Java languages.
+ To view the Kinesis Client Library Javadoc reference, see the [Amazon Kinesis Client Library Javadoc](https://javadoc.io/doc/software.amazon.kinesis/amazon-kinesis-client/latest/index.html).
+ To download KCL for Java from GitHub, see [Amazon Kinesis Client Library for Java](https://github.com/awslabs/amazon-kinesis-client).
+ To locate the KCL for Java on Apache Maven, see the [KCL Maven Central Repository](https://central.sonatype.com/artifact/software.amazon.kinesis/amazon-kinesis-client).

**Topics**
+ [

# Develop consumers with KCL in Java
](develop-kcl-consumers-java.md)
+ [

# Develop consumers with KCL in non-Java languages
](develop-kcl-consumers-non-java.md)

# Develop consumers with KCL in Java
<a name="develop-kcl-consumers-java"></a>

## Prerequisites
<a name="develop-kcl-consumers-java-prerequisites"></a>

Before you start using KCL 3.x, make sure that you have the following:
+ Java Development Kit (JDK) 8 or later
+ AWS SDK for Java 2.x
+ Maven or Gradle for dependency management

KCL collects CPU utilization metrics such as CPU utilization from the compute host that workers are running on to balance the load to achieve an even resource utilization level across workers. To enable KCL to collect CPU utilization metrics from workers, you must meet the following prerequisites:

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ Your operating system must be Linux OS.
+ You must enable [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html) in your EC2 instance.

 **Amazon Elastic Container Service (Amazon ECS) on Amazon EC2**
+ Your operating system must be Linux OS.
+ You must enable [ECS task metadata endpoint version 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html). 
+ Your Amazon ECS container agent version must be 1.39.0 or later.

 **Amazon ECS on AWS Fargate**
+ You must enable [Fargate task metadata endpoint version 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html). If you use Fargate platform version 1.4.0 or later, this is enabled by default. 
+ Fargate platform version 1.4.0 or later.

 **Amazon Elastic Kubernetes Service (Amazon EKS) on Amazon EC2** 
+ Your operating system must be Linux OS.

 **Amazon EKS on AWS Fargate**
+ Fargate platform 1.3.0 or later.

**Important**  
If KCL cannot collect CPU utilization metrics from workers, KCL will fall back to use throughput per worker to assign leases and balance the load across workers in the fleet. For more information, see [How KCL assigns leases to workers and balances the load](kcl-dynamoDB.md#kcl-assign-leases).

## Install and add dependencies
<a name="develop-kcl-consumers-java-installation"></a>

If you're using Maven, add the following dependency to your `pom.xml` file. Make sure you replaced 3.x.x to the latest KCL version. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

If you're using Gradle, add the following to your `build.gradle` file. Make sure you replaced 3.x.x to the latest KCL version. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

You can check for the latest version of the KCL on the [Maven Central Repository](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client).

## Implement the consumer
<a name="develop-kcl-consumers-java-implemetation"></a>

A KCL consumer application consists of the following key components:

**Topics**
+ [

### RecordProcessor
](#implementation-recordprocessor)
+ [

### RecordProcessorFactory
](#implementation-recordprocessorfactory)
+ [

### Scheduler
](#implementation-scheduler)
+ [

### Main Consumer Application
](#implementation-main)

### RecordProcessor
<a name="implementation-recordprocessor"></a>

RecordProcessor is the core component where your business logic for processing Kinesis data stream records resides. It defines how your application processes the data it receives from the Kinesis stream.

Key responsibilities:
+ Initialize processing for a shard
+ Process batches of records from the Kinesis stream
+ Shutdown processing for a shard (for example, when the shard splits or merges, or the lease is handed over to another host)
+ Handle checkpointing to track progress

The following shows an implementation example:

```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class SampleRecordProcessor implements ShardRecordProcessor {
    private static final String SHARD_ID_MDC_KEY = "ShardId";
    private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.shardId();
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size());
            processRecordsInput.records().forEach(r -> 
                log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
            );
            
            // Checkpoint periodically
            processRecordsInput.checkpointer().checkpoint();
        } catch (Throwable t) {
            log.error("Caught throwable while processing records. Aborting.", t);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Lost lease, so terminating.");
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }
}
```

The following is a detailed explanation of each method used in the example:

**initialize(InitializationInput initializationInput)**
+ Purpose: Set up any necessary resources or state for processing records.
+ When it's called: Once, when KCL assigns a shard to this record processor.
+ Key points:
  + `initializationInput.shardId()`: The ID of the shard this processor will handle.
  + `initializationInput.extendedSequenceNumber()`: The sequence number to start processing from.

**processRecords(ProcessRecordsInput processRecordsInput)**
+ Purpose: Process the incoming records and optionally checkpoint progress.
+ When it's called: Repeatedly, as long as the record processor holds the lease for the shard.
+ Key points:
  + `processRecordsInput.records()`: List of records to process.
  + `processRecordsInput.checkpointer()`: Used to checkpoint the progress.
  + Make sure that you handled any exceptions during processing to prevent KCL from failing.
  + This method should be idempotent, as the same record may be processed more than once in some scenarios, such as data that has not been checkpointed before unexpected worker crashes or restarts.
  + Always flush any buffered data before checkpointing to ensure data consistency.

**leaseLost(LeaseLostInput leaseLostInput)**
+ Purpose: Clean up any resources specific to processing this shard.
+ When it's called: When another Scheduler takes over the lease for this shard.
+ Key points:
  + Checkpointing is not allowed in this method.

**shardEnded(ShardEndedInput shardEndedInput)**
+ Purpose: Finish processing for this shard and checkpoint.
+ When it's called: When the shard splits or merges, indicating all data for this shard has been processed.
+ Key points:
  + `shardEndedInput.checkpointer()`: Used to perform the final checkpointing.
  + Checkpointing in this method is mandatory to complete processing.
  + Failing to flush data and checkpoint here may result in data loss or duplicate processing when the shard is reopened.

**shutdownRequested(ShutdownRequestedInput shutdownRequestedInput)**
+ Purpose: Checkpoint and clean up resources when KCL is shutting down.
+ When it's called: When KCL is shutting down, for example, when the application is terminating).
+ Key points:
  + `shutdownRequestedInput.checkpointer()`: Used to perform checkpointing before shutdown.
  + Make sure you implemented checkpointing in the method so that progress is saved before the application stops.
  + Failure to flush data and checkpoint here may result in data loss or reprocessing of records when the application restarts.

**Important**  
KCL 3.x ensures fewer data reprocessing when the lease is handed over from one worker to another worker by checkpointing before the previous worker is shut down. If you don’t implement the checkpointing logic in the `shutdownRequested()` method, you won’t see this benefit. Make sure that you have implemented a checkpointing logic inside the `shutdownRequested()` method.

### RecordProcessorFactory
<a name="implementation-recordprocessorfactory"></a>

RecordProcessorFactory is responsible for creating new RecordProcessor instances. KCL uses this factory to create a new RecordProcessor for each shard that the application needs to process.

Key responsibilities:
+ Create new RecordProcessor instances on demand
+ Make sure that each RecordProcessor is properly initialized

The following is an implementation example:

```
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new SampleRecordProcessor();
    }
}
```

In this example, the factory creates a new SampleRecordProcessor each time shardRecordProcessor() is called. You can extend this to include any necessary initialization logic.

### Scheduler
<a name="implementation-scheduler"></a>

Scheduler is a high-level component that coordinates all the activities of the KCL application. It's responsible for the overall orchestration of data processing.

Key responsibilities:
+ Manage the lifecycle of RecordProcessors
+ Handle lease management for shards
+ Coordinate checkpointing
+ Balance shard processing load across multiple workers of your application
+ Handle graceful shutdown and application termination signals

Scheduler is typically created and started in the Main Application. You can check the implementation example of Scheduler in the following section, Main Consumer Application. 

### Main Consumer Application
<a name="implementation-main"></a>

Main Consumer Application ties all the components together. It's responsible for setting up the KCL consumer, creating necessary clients, configuring the Scheduler, and managing the application's lifecycle.

Key responsibilities:
+ Set up AWS service clients (Kinesis, DynamoDB, CloudWatch)
+ Configure the KCL application
+ Create and start the Scheduler
+ Handle application shutdown

The following is an implementation example:

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;

public class SampleConsumer {
    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    public SampleConsumer(String streamName, Region region) {
        this.streamName = streamName;
        this.region = region;
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    public void run() {
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamName, 
            streamName, 
            kinesisClient, 
            dynamoDbAsyncClient,
            cloudWatchClient, 
            UUID.randomUUID().toString(), 
            new SampleRecordProcessorFactory()
        );

        Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

    public static void main(String[] args) {
        String streamName = "your-stream-name"; // replace with your stream name
        Region region = Region.US_EAST_1; // replace with your region
        new SampleConsumer(streamName, region).run();
    }
}
```

 KCL creates an Enhanced Fan-out (EFO) consumer with dedicated throughput by default. For more information about Enhanced Fan-out, see [Develop enhanced fan-out consumers with dedicated throughput](enhanced-consumers.md). If you have less than 2 consumers or don't need read propagation delays under 200 ms, you must set the following configuration in the scheduler object to use shared-throughput consumers:

```
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
```

The following code is an example of creating a scheduler object that uses shared-throughput consumers:

**Imports**:

```
import software.amazon.kinesis.retrieval.polling.PollingConfig;
```

**Code**:

```
Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );/
```

# Develop consumers with KCL in non-Java languages
<a name="develop-kcl-consumers-non-java"></a>

This section covers the implementation of consumers using Kinesis Client Library (KCL) in Python, Node.js, .NET, and Ruby.

KCL is a Java library. Support for languages other than Java is provided using a multi-language interface called the `MultiLangDaemon`. This daemon is Java-based and runs in the background when you are using a KCL with a language other than Java. Therefore, if you install KCL for non-Java languages and write your consumer app entirely in non-Java languages, you still need Java installed on your system because of the `MultiLangDaemon`. Further, `MultiLangDaemon` has some default settings you might need to customize for your use case (for example, the AWS region that it connects to). For more information about the `MultiLangDaemon` on GitHub, see [KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

While the core concepts remain the same across languages, there are some language-specific considerations and implementations. For the core concepts about the KCL consumer development, see [Develop consumers with KCL in Java](develop-kcl-consumers-java.md). For more detailed information about how to develop KCL consumers in Python, Node.js, .NET, and Ruby and latest updates, please refer to the following GitHub repositories:
+ Python: [amazon-kinesis-client-python](https://github.com/awslabs/amazon-kinesis-client-python)
+ Node.js: [amazon-kinesis-client-nodejs](https://github.com/awslabs/amazon-kinesis-client-nodejs)
+ .NET: [amazon-kinesis-client-net](https://github.com/awslabs/amazon-kinesis-client-net)
+ Ruby: [amazon-kinesis-client-ruby](https://github.com/awslabs/amazon-kinesis-client-ruby)

**Important**  
Don't use the following non-Java KCL library versions if you're using JDK 8. These versions contain a dependency (logback) that is incompatible with JDK 8.  
KCL Python 3.0.2 and 2.2.0
KCL Node.js 2.3.0
KCL .NET 3.1.0
KCL Ruby 2.2.0
We recommend that you use versions released either before or after these affected versions when working with JDK 8.

# Multi-stream processing with KCL
<a name="kcl-multi-stream"></a>

This section describes the required changes in KCL that allow you to create KCL consumer applications that can process more than one data stream at the same time.
**Important**  
Multi-stream processing is only supported in KCL 2.3 or later.
Multi-stream processing is *not* supported for KCL consumers written in non-Java languages that run with `multilangdaemon`.
Multi-stream processing is *not* supported in any versions of KCL 1.x.
+ **MultistreamTracker interface**
  + To build a consumer application that can process multiple streams at the same time, you must implement a new interface called [MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java). This interface includes the `streamConfigList` method that returns the list of data streams and their configurations to be processed by the KCL consumer application. Notice that the data streams that are being processed can be changed during the consumer application runtime. `streamConfigList` is called periodically by KCL to learn about the changes in data streams to process.
  + The `streamConfigList` populates the [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23) list.

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```
  + The `StreamIdentifier` and `InitialPositionInStreamExtended` are required fields, while `consumerArn` is optional. You must provide the `consumerArn` only if you are using KCL to implement an enhanced fan-out consumer application.
  + For more information about `StreamIdentifier`, see [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java\$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129). To create a `StreamIdentifier`, we recommend that you create a multistream instance from the `streamArn` and the `streamCreationEpoch` that is available in KCL 2.5.0 or later. In KCL v2.3 and v2.4, which don't support `streamArm`, create a multistream instance by using the format `account-id:StreamName:streamCreationTimestamp`. This format will be deprecated and no longer supported starting with the next major release.
  +  MultistreamTracker also includes a strategy for deleting leases of old streams in the lease table (formerStreamsLeasesDeletionStrategy). Notice that the strategy CANNOT be changed during the consumer application runtime. For more information, see [https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java).
+   [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java) is an application-wide class that you can use to specify all of the KCL configuration settings to be used when building your KCL consumer application for KCL version 2.x or later. `ConfigsBuilder` class now has support for the `MultistreamTracker` interface. You can initialize ConfigsBuilder either with the name of the one data stream to consume records from: 

  ```
  /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```  

Or you can initialize ConfigsBuilder with `MultiStreamTracker` if you want to implement a KCL consumer application that processes multiple streams at the same time.

```
* Constructor to initialize ConfigsBuilder with MultiStreamTracker
     * @param multiStreamTracker
     * @param applicationName
     * @param kinesisClient
     * @param dynamoDBClient
     * @param cloudWatchClient
     * @param workerIdentifier
     * @param shardRecordProcessorFactory
     */
    public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
            @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
            @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
            @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
        this.appStreamTracker = Either.left(multiStreamTracker);
        this.applicationName = applicationName;
        this.kinesisClient = kinesisClient;
        this.dynamoDBClient = dynamoDBClient;
        this.cloudWatchClient = cloudWatchClient;
        this.workerIdentifier = workerIdentifier;
        this.shardRecordProcessorFactory = shardRecordProcessorFactory;
    }
```
+ With multi-stream support implemented for your KCL consumer application, each row of the application's lease table now contains the shard ID and the stream name of the multiple data streams that this application processes.
+ When multi-stream support for your KCL consumer application is implemented, the leaseKey takes the following structure: `account-id:StreamName:streamCreationTimestamp:ShardId`. For example, `111111111:multiStreamTest-1:12345:shardId-000000000336`.

**Important**  
When your existing KCL consumer application is configured to process only one data stream, the `leaseKey` (which is the partition key for the lease table) is the shard ID. If you reconfigure an existing KCL consumer application to process multiple data streams, it breaks your lease table, because the `leaseKey` structure must be as follows: `account-id:StreamName:StreamCreationTimestamp:ShardId` to support multi-stream.

# Use the AWS Glue Schema registry with KCL
<a name="kcl-glue-schema"></a>

You can integrate Kinesis Data Streams with the AWS Glue Schema registry. The AWS Glue Schema registry lets you centrally discover, control, and evolve schemas, while ensuring data produced is continuously validated by a registered schema. A schema defines the structure and format of a data record. A schema is a versioned specification for reliable data publication, consumption, or storage. The AWS Glue Schema registry lets you improve end-to-end data quality and data governance within your streaming applications. For more information, see [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). One of the ways to set up this integration is through KCL for Java.

**Important**  
AWS Glue Schema registry integration for Kinesis Data Streams is only supported in KCL 2.3 or later.
AWS Glue Schema registry integration for Kinesis Data Streams is *not* supported for KCL consumers written in non-Java languages that run with `multilangdaemon`.
AWS Glue Schema registry integration for Kinesis Data Streams is *not* supported in any versions of KCL 1.x.

For detailed instructions on how to set up integration of Kinesis Data Streamswith AWS Glue Schema registry using KCL, see the "Interacting with Data Using the KPL/KCL Libraries" section in [Use Case: Integrating Amazon Kinesis Data Streams with the AWS Glue Schema Registry.](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)

# IAM permissions required for KCL consumer applications
<a name="kcl-iam-permissions"></a>

 You must add the following permissions to the IAM role or user associated with your KCL consumer application. 

 Security best practices for AWS dictate the use of fine-grained permissions to control access to different resources. AWS Identity and Access Management (IAM) lets you manage users and user permissions in AWS. An IAM policy explicitly lists actions that are allowed and the resources on which the actions are applicable.

The following table shows the minimum IAM permissions generally required for KCL consumer applications:


**Minimum IAM permissions for KCL consumer applications**  

| Service | Actions | Resources (ARNs) | Purpose | 
| --- | --- | --- | --- | 
| Amazon Kinesis Data Streams |  `DescribeStream` `DescribeStreamSummary` `RegisterStreamConsumer`  |  Kinesis data stream from which your KCL application will process the data.`arn:aws:kinesis:region:account:stream/StreamName`  |  Before attempting to read records, the consumer checks if the data stream exists, if it's active, and if the shards are contained in the data stream. Registers consumers to a shard.  | 
| Amazon Kinesis Data Streams |  `GetRecords` `GetShardIterator` `ListShards`  | Kinesis data stream from which your KCL application will process the data.`arn:aws:kinesis:region:account:stream/StreamName` |  Reads records from a shard.  | 
| Amazon Kinesis Data Streams |  `SubscribeToShard` `DescribeStreamConsumer` |  Kinesis data stream from which your KCL application will process the data. Add this action only if you use enhanced fan-out (EFO) consumers. `arn:aws:kinesis:region:account:stream/StreamName/consumer/*`  |  Subscribes to a shard for enhanced fan-out (EFO) consumers.  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `UpdateTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  Lease table (metadata table in DynamoDB created by KCL. `arn:aws:dynamodb:region:account:table/KCLApplicationName`  |  These actions are required for KCL to manag the lease table created in DynamoDB.  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  Worker metrics and coordinator state table (metadata tables in DynamoDB) created by KCL. `arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats` `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`  |  Thess actions are required for KCL to manage the worker metrics and coordinator state metadata tables in DynamoDB.  | 
| Amazon DynamoDB | `Query` |  Global secondary index on the lease table. `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`  |  This action is required for KCL to read the global secondary index of the lease table created in DynamoDB.  | 
| Amazon CloudWatch | `PutMetricData` |  \$1  |  Upload metrics to CloudWatch that are useful for monitoring the application. The asterisk (\$1) is used because there is no spcific resource in CloudWatch on which the `PutMetricData` action is invoked.   | 

**Note**  
Replace "region," "account," "StreamName," and "KCLApplicationName" in the ARNs with your own AWS Region, AWS account number, Kinesis data stream name, and KCL application name respectively. KCL 3.x creates two more metadata tables in DynamoDB. For details about DynamoDB metadata tables created by KCL, see [DynamoDB metadata tables and load balancing in KCL](kcl-dynamoDB.md). If you use configurations to customize names of the metadata tables created by KCL, use those specified table names instead of KCL application name. 

The following is an example policy document for a KCL consumer application. 

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME/consumer/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:UpdateTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-WorkerMetricStats",
    "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-CoordinatorState"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:Query"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME/index/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Before you use this example policy, check the following items:
+ Replace REGION with your AWS Region (for example, us-east-1).
+ Replace ACCOUNT\$1ID with your AWS account ID.
+ Replace STREAM\$1NAME with the name of your Kinesis data stream.
+ Replace CONSUMER\$1NAME with the name of your consumer, typically your application name when using KCL.
+ Replace KCL\$1APPLICATION\$1NAME with the name of your KCL application.

# KCL configurations
<a name="kcl-configuration"></a>

You can set configuration properties to customize Kinesis Client Library's functionality to meet your specific requirements. The following table describes configuration properties and classes.

**Important**  
In KCL 3.x, the load balancing algorithm aims to achieve even CPU utilization across workers, not an equal number of leases per worker. Setting `maxLeasesForWorker` too low, you might limit KCL's ability to balance the workload effectively. If you use the `maxLeasesForWorker` configuration, consider increasing its value to allow for the best possible load distribution.


**This table shows the configuration properties for KCL**  

| Configuration property | Configuration class | Description | Default value | 
| --- | --- | --- | --- | 
| applicationName | ConfigsBuilder | The name for this the KCL application. Used as the default for the tableName and consumerName. | Not applicable | 
| tableName | ConfigsBuilder |  Allows overriding the table name used for the Amazon DynamoDB lease table.  | Not applicable | 
| streamName | ConfigsBuilder |  The name of the stream that this application processes records from.  | Not applicable | 
| workerIdentifier | ConfigsBuilder |  A unique identifier that represents this instantiation of the application processor. This must be unique.  | Not applicable | 
| failoverTimeMillis | LeaseManagementConfig |  The number of milliseconds that must pass before you can consider a lease owner to have failed. For applications that have a large number of shards, this may be set to a higher number to reduce the number of DynamoDB IOPS required for tracking leases.  | 10,000 (10 seconds) | 
| shardSyncIntervalMillis | LeaseManagementConfig |  The time between shard sync calls.  | 60,000 (60 seconds) | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig |  When set, leases are removed as soon as the child leases have started processing.  | TRUE | 
| ignoreUnexpectedChildShards | LeaseManagementConfig |  When set, child shards that have an open shard are ignored. This is primarily for DynamoDB Streams.  | FALSE | 
| maxLeasesForWorker | LeaseManagementConfig |  The maximum number of leases a single worker should accept. Setting it too low may cause data loss if workers can't process all shards, and lead to a suboptimal lease assignment among workers. Consider total shard count, number of workers, and worker processing capacity when configuring it.  | Unlimited | 
| maxLeaseRenewalThreads | LeaseManagementConfig |  Controls the size of the lease renewer thread pool. The more leases that your application could take, the larger this pool should be.  | 20 | 
| billingMode | LeaseManagementConfig |  Determines the capacity mode of the lease table created in DynamoDB. There are two options: on-demand mode (PAY\$1PER\$1REQUEST) and provisioned mode. We recommend using the default setting of on-demand mode because it automatically scales to accommodate your workload without the need for capacity planning.  | PAY\$1PER\$1REQUEST (on-demand mode) | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | The DynamoDB read capacity that is used if the Kinesis Client Library needs to create a new DynamoDB lease table with provisioned capacity mode. You can ignore this configuration if you are using the default on-demand capacity mode in billingMode configuration. | 10 | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | The DynamoDB read capacity that is used if the Kinesis Client Library needs to create a new DynamoDB lease table. You can ignore this configuration if you are using the default on-demand capacity mode in billingMode configuration. | 10 | 
| initialPositionInStreamExtended | LeaseManagementConfig |  The initial position in the stream that the application should start at. This is only used during initial lease creation.  |  InitialPositionInStream.TRIM\$1HORIZON  | 
| reBalanceThresholdPercentage | LeaseManagementConfig |  A percentage value that determines when the load balancing algorithm should consider reassigning shards among workers. This is a new configuration introduced in KCL 3.x.  | 10 | 
| dampeningPercentage | LeaseManagementConfig |  A percentage value that is used to dampen the amount of load that will be moved from the overloaded worker in a single rebalance operation. This is a new configuration introduced in KCL 3.x.  | 60 | 
| allowThroughputOvershoot | LeaseManagementConfig |  Determines whether additional lease still needs to be taken from the overloaded worker even if it causes total amount of lease throughput taken to exceed the desired throughput amount. This is a new configuration introduced in KCL 3.x.  | TRUE | 
| disableWorkerMetrics | LeaseManagementConfig |  Determines if KCL should ignore resource metrics from workers (such as CPU utilization) when reassigning leases and load balancing. Set this to TRUE if you want to prevent KCL from load balancing based on CPU utilization. This is a new configuration introduced in KCL 3.x.  | FALSE | 
| maxThroughputPerHostKBps | LeaseManagementConfig |  Amount of the maximum throughput to assign to a worker during the lease assignment. This is a new configuration introduced in KCL 3.x.  | Unlimited | 
| isGracefulLeaseHandoffEnabled | LeaseManagementConfig |  Controls the behavior of lease handoff between workers. When set to true, KCL will attempt to gracefully transfer leases by allowing the shard's RecordProcessor sufficient time to complete processing before handing off the lease to another worker. This can help ensure data integrity and smooth transitions but may increase handoff time. When set to false, the lease will be handed off immediately without waiting for the RecordProcessor to shut down gracefully. This can lead to faster handoffs but may risk incomplete processing. Note: Checkpointing must be implemented inside the shutdownRequested() method of the RecordProcessor to get benefited from the graceful lease handoff feature. This is a new configuration introduced in KCL 3.x.  | TRUE | 
| gracefulLeaseHandoffTimeoutMillis | LeaseManagementConfig |  Specifies the minimum time (in milliseconds) to wait for the current shard's RecordProcessor to gracefully shut down before forcefully transferring the lease to the next owner. If your processRecords method typically runs longer than the default value, consider increasing this setting. This ensures the RecordProcessor has sufficient time to complete its processing before the lease transfer occurs. This is a new configuration introduced in KCL 3.x.  | 30,000 (30 seconds) | 
| maxRecords | PollingConfig |  Allows setting the maximum number of records that Kinesis returns.  | 10,000 | 
| retryGetRecordsInSeconds | PollingConfig |  Configures the delay between GetRecords attempts for failures.  | None | 
| maxGetRecordsThreadPool | PollingConfig |  The thread pool size used for GetRecords.  | None | 
| idleTimeBetweenReadsInMillis | PollingConfig |  Determines how long KCL waits between GetRecords calls to poll the data from data streams. The unit is milliseconds.  | 1,500 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig |  When set, the record processor is called even when no records were provided from Kinesis.  | FALSE | 
| parentShardPollIntervalMillis | CoordinatorConfig |  How often a record processor should poll to see if the parent shard has been completed. The unit is milliseconds.  | 10,000 (10 seconds) | 
| skipShardSyncAtWorkerInitializationIfLeaseExist | CoordinatorConfig |  Disable synchronizing shard data if the lease table contains existing leases.  |  FALSE  | 
| shardPrioritization | CoordinatorConfig |  Which shard prioritization to use.  |  NoOpShardPrioritization  | 
| ClientVersionConfig | CoordinatorConfig |  Determines which KCL version compatibility mode the application will run in. This configuration is only for the migration from previous KCL versions. When migrating to 3.x, you need to set this configuration to `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X`. You can remove this configuration when you complete the migration.  | CLIENT\$1VERSION\$1CONFIG\$13X | 
| taskBackoffTimeMillis | LifecycleConfig |  The time to wait to retry failed KCL tasks. The unit is milliseconds.  | 500 (0.5 seconds) | 
| logWarningForTaskAfterMillis | LifecycleConfig |  How long to wait before a warning is logged if a task hasn't completed.  | None | 
| listShardsBackoffTimeInMillis | RetrievalConfig | The number of milliseconds to wait between calls to ListShards when failures occur. The unit is milliseconds. | 1,500 (1.5 seconds) | 
| maxListShardsRetryAttempts | RetrievalConfig | The maximum number of times that ListShards retries before giving up. | 50 | 
| metricsBufferTimeMillis | MetricsConfig |  Specifies the maximum duration (in milliseconds) to buffer metrics before publishing them to CloudWatch.  | 10,000 (10 seconds) | 
| metricsMaxQueueSize | MetricsConfig |  Specifies the maximum number of metrics to buffer before publishing to CloudWatch.  | 10,000 | 
| metricsLevel | MetricsConfig |  Specifies the granularity level of CloudWatch metrics to be enabled and published.  Possible values: NONE, SUMMARY, DETAILED.  |  MetricsLevel.DETAILED  | 
| metricsEnabledDimensions | MetricsConfig |  Controls allowed dimensions for CloudWatch Metrics.  | All dimensions | 

**Discontinued configurations in KCL 3.x**

The following configuration properties are discontinued in KCL 3.x:


**The table shows discontinued configuration properties for KCL 3.x**  

| Configuration property | Configuration class | Description | 
| --- | --- | --- | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig |  The maximum number of leases an application should attempt to steal at one time. KCL 3.x will ignore this configuration and reassign leases based on the resource utilization of workers.  | 
| enablePriorityLeaseAssignment | LeaseManagementConfig |  Controls whether workers should prioritize taking very expired leases (leases not renewed for 3x the failover time) and new shard leases, regardless of target lease counts but still respecting max lease limits. KCL 3.x will ignore this configuration and always spread expired leases across workers.  | 

**Important**  
You still must have the discontiuned configuration properties during the migration from previous KCL verisons to KCL 3.x. During the migration, the KCL worker will first start with the KCL 2.x compatible mode and switch to the KCL 3.x functionality mode when it detects that all KCL workers of the application are ready to run KCL 3.x. These discontinued configurations are needed while KCL workers are running the KCL 2.x compatible mode.

# KCL version lifecycle policy
<a name="kcl-version-lifecycle-policy"></a>

This topic outlines the version lifecycle policy for Amazon Kinesis Client Library (KCL). AWS regularly provides new releases for KCL versions to support new features and enhancements, bug fixes, security patches, and dependency updates. We recommend that you stay up-to-date with KCL versions to keep up with the latest features, security updates, and underlying dependencies. We **don't** recommend continued use of an unsupported KCL version.

The lifecycle for major KCL versions consists of the following three phases:
+ **General availability (GA)** – During this phase, the major version is fully supported. AWS provides regular minor and patch version releases that include support for new features or API updates for Kinesis Data Streams, as well as bug and security fixes.
+ **Maintenance mode** – AWS limits patch version releases to address critical bug fixes and security issues only. The major version won't receive updates for new features or APIs of Kinesis Data Streams.
+ **End-of-support** – The major version will no longer receive updates or releases. Previously published releases will continue to be available through public package managers and the code will remain on GitHub. Use of a version which has reached end-of-support is done at the user’s discretion. We recommend that you upgrade to the latest major version.


| Major version | Current phase | Release date | Maintenance mode date | End-of-support date | 
| --- | --- | --- | --- | --- | 
| KCL 1.x | Maintenance mode | 2013-12-19 | 2025-04-17 | 2026-01-30 | 
| KCL 2.x | General availability | 2018-08-02 | -- | -- | 
| KCL 3.x | General availability | 2024-11-06 | -- | -- | 

# Migrate from previous KCL versions
<a name="kcl-migration-previous-versions"></a>

This topic explains how to migrate from previous versions of the Kinesis Client Library (KCL). 

## What's new in KCL 3.0?
<a name="kcl-migration-new-3-0"></a>

Kinesis Client Library (KCL) 3.0 introduces several major enhancements compared to previous versions:
+  It lowers compute costs for consumer applications by automatically redistributing the work from over-utilized workers to under-utilized workers in the consumer application fleet. This new load balancing algorithm ensures the evenly distributed CPU utilization across workers and removes the need to over-provision workers.
+  It reduces the DynamoDB cost associated with KCL by optimizing read operations on the lease table.
+ It minimizes reprocessing of data when leases are reassigned to another worker by allowing the current worker to complete checkpointing the records that it has processed.
+  It uses AWS SDK for Java 2.x for improved performance and security features, fully removing the dependency on AWS SDK for Java 1.x.

For more information, see [KCL 3.0 release note](https://github.com/awslabs/amazon-kinesis-client/blob/master/CHANGELOG.md).

**Topics**
+ [

## What's new in KCL 3.0?
](#kcl-migration-new-3-0)
+ [

# Migrate from KCL 2.x to KCL 3.x
](kcl-migration-from-2-3.md)
+ [

# Roll back to the previous KCL version
](kcl-migration-rollback.md)
+ [

# Roll forward to KCL 3.x after a rollback
](kcl-migration-rollforward.md)
+ [

# Best practices for the lease table with provisioned capacity mode
](kcl-migration-lease-table.md)
+ [

# Migrating from KCL 1.x to KCL 3.x
](kcl-migration-1-3.md)

# Migrate from KCL 2.x to KCL 3.x
<a name="kcl-migration-from-2-3"></a>

This topic provides step-by-step instructions to migrate your consumer from KCL 2.x to KCL 3.x. KCL 3.x supports in-place migration of KCL 2.x consumers. You can continue consuming the data from your Kinesis data stream while migrating your workers in a rolling manner.

**Important**  
KCL 3.x maintains the same interfaces and methods as KCL 2.x. Therefore you don’t have to update your record processing code during the migration. However, you must set the proper configuration and check the required steps for the migration. We highly recommend that you follow the following migration steps for a smooth migration experience.

## Step 1: Prerequisites
<a name="kcl-migration-from-2-3-prerequisites"></a>

Before you start using KCL 3.x, make sure that you have the following:
+ Java Development Kit (JDK) 8 or later
+ AWS SDK for Java 2.x
+ Maven or Gradle for dependency management

**Important**  
Do not use AWS SDK for Java version 2.27.19 to 2.27.23 with KCL 3.x. These versions include an issue that causes an exception error related to KCL's DynamoDB usage. We recommend that you use the AWS SDK for Java version 2.28.0 or later to avoid this issue. 

## Step 2: Add dependencies
<a name="kcl-migration-from-2-3-dependencies"></a>

If you're using Maven, add the following dependency to your `pom.xml` file. Make sure you replaced 3.x.x to the latest KCL version. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

If you're using Gradle, add the following to your `build.gradle` file. Make sure you replaced 3.x.x to the latest KCL version. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

You can check for the latest version of the KCL on the [Maven Central Repository](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client).

## Step 3: Set up the migration-related configuration
<a name="kcl-migration-from-2-3-configuration"></a>

To migrate from KCL 2.x to KCL 3.x, you must set the following configuration parameter:
+ CoordinatorConfig.clientVersionConfig: This configuration determines which KCL version compatibility mode the application will run in. When migrating from KCL 2.x to 3.x, you need to set this configuration to `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X`. To set this configuration, add the following line when creating your scheduler object:

```
configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X)
```

The following is an example of how to set the `CoordinatorConfig.clientVersionConfig` for migrating from KCL 2.x to 3.x. You can adjust other configurations as needed based on your specific requirements:

```
Scheduler scheduler = new Scheduler(
    configsBuilder.checkpointConfig(),
    configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X),
    configsBuilder.leaseManagementConfig(),
    configsBuilder.lifecycleConfig(),
    configsBuilder.metricsConfig(),
    configsBuilder.processorConfig(),
    configsBuilder.retrievalConfig()
);
```

It's important that all workers in your consumer application use the same load balancing algorithm at a given time because KCL 2.x and 3.x use different load balancing algorithms. Running workers with different load balancing algorithms can cause suboptimal load distribution as the two algorithms operate independently.

This KCL 2.x compatibility setting allows your KCL 3.x application to run in a mode compatible with KCL 2.x and use the load balancing algorithm for KCL 2.x until all workers in your consumer application have been upgraded to KCL 3.x. When the migration is complete, KCL will automatically switch to full KCL 3.x functionality mode and start using a new KCL 3.x load balancing algorithm for all running workers.

**Important**  
If you are not using `ConfigsBuilder` but creating a `LeaseManagementConfig` object to set configurations, you must add one more parameter called `applicationName` in KCL version 3.x or later. For details, see [Compilation error with the LeaseManagementConfig constructor](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compiliation-error-leasemanagementconfig). We recommend using `ConfigsBuilder` to set KCL configurations. `ConfigsBuilder` provides a more flexible and maintainable way to configure your KCL application.

## Step 4: Follow best practices for the shutdownRequested() method implementation
<a name="kcl-migration-from-2-3-best-practice"></a>

KCL 3.x introduces a feature called *graceful lease handoff* to minimize the reprocessing of data when a lease is handed over to another worker as part of the lease reassignment process. This is achieved by checkpointing the last processed sequence number in the lease table before the lease handoff. To ensure the graceful lease handoff works properly, you must make sure that you invoke the `checkpointer` object within the `shutdownRequested` method in your `RecordProcessor` class. If you're not invoking the `checkpointer` object within the `shutdownRequested` method, you can implement it as illustrated in the following example. 

**Important**  
The following implementation example is a minimal requirement for the graceful lease handoff. You can extend it to include additional logic related to the checkpointing if needed. If you are performing any asynchronous processing, make sure that all delivered records to the downstream were processed before invoking checkpointing. 
While graceful lease handoff significantly reduces the likelihood of data reprocessing during lease transfers, it does not entirely eliminate this possibility. To preserve data integrity and consistency, design your downstream consumer applications to be idempotent. This means they should be able to handle potential duplicate record processing without adverse effects on the overall system.

```
/**
 * Invoked when either Scheduler has been requested to gracefully shutdown
 * or lease ownership is being transferred gracefully so the current owner
 * gets one last chance to checkpoint.
 *
 * Checkpoints and logs the data a final time.
 *
 * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
 *                               before the shutdown is completed.
 */
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
    try {
       // Ensure that all delivered records are processed 
       // and has been successfully flushed to the downstream before calling 
       // checkpoint
       // If you are performing any asynchronous processing or flushing to
       // downstream, you must wait for its completion before invoking
       // the below checkpoint method.
        log.info("Scheduler is shutting down, checkpointing.");
        shutdownRequestedInput.checkpointer().checkpoint();
    } catch (ShutdownException | InvalidStateException e) {
        log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
    } 
}
```

## Step 5: Check the KCL 3.x prerequisites for collecting worker metrics
<a name="kcl-migration-from-2-3-worker-metrics"></a>

KCL 3.x collects CPU utilization metrics such as CPU utilization from workers to balance the load across workers evenly. Consumer application workers can run on Amazon EC2, Amazon ECS, Amazon EKS, or AWS Fargate. KCL 3.x can collect CPU utilization metrics from workers only when the following prerequisites are met:

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ Your operating system must be Linux OS.
+ You must enable [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html) in your EC2 instance.

 **Amazon Elastic Container Service (Amazon ECS) on Amazon EC2**
+ Your operating system must be Linux OS.
+ You must enable [ECS task metadata endpoint version 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html). 
+ Your Amazon ECS container agent version must be 1.39.0 or later.

 **Amazon ECS on AWS Fargate**
+ You must enable [Fargate task metadata endpoint version 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html). If you use Fargate platform version 1.4.0 or later, this is enabled by default. 
+ Fargate platform version 1.4.0 or later.

 **Amazon Elastic Kubernetes Service (Amazon EKS) on Amazon EC2** 
+ Your operating system must be Linux OS.

 **Amazon EKS on AWS Fargate**
+ Fargate platform 1.3.0 or later.

**Important**  
If KCL 3.x cannot collect CPU utilization metrics from workers because prerequisites are not met, it will rebalance the load the throughput level per lease. This fallback rebalancing mechanism will make sure all workers will get similar total throughput levels from leases assigned to each worker. For more information, see [How KCL assigns leases to workers and balances the load](kcl-dynamoDB.md#kcl-assign-leases).

## Step 6: Update IAM permissions for KCL 3.x
<a name="kcl-migration-from-2-3-IAM-permissions"></a>

You must add the following permissions to the IAM role or policy associated with your KCL 3.x consumer application. This involves updating the existing IAM policy used by the KCL application. For more information, see [IAM permissions required for KCL consumer applications](kcl-iam-permissions.md).

**Important**  
Your existing KCL applications might not have the following IAM actions and resources added in the IAM policy because they were not needed in KCL 2.x. Make sure that you have added them before running your KCL 3.x application:  
Actions: `UpdateTable`  
Resources (ARNs): `arn:aws:dynamodb:region:account:table/KCLApplicationName`
Actions: `Query`  
Resources (ARNs): `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`
Actions: `CreateTable`, `DescribeTable`, `Scan`, `GetItem`, `PutItem`, `UpdateItem`, `DeleteItem`  
Resources (ARNs): `arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats`, `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`
Replace "region," "account," and "KCLApplicationName" in the ARNs with your own AWS Region, AWS account number, and KCL application name respectively. If you use configurations to customize names of the metadata tables created by KCL, use those specified table names instead of KCL application name.

## Step 7: Deploy KCL 3.x code to your workers
<a name="kcl-migration-from-2-3-IAM-deploy"></a>

After you have set the configuration required for the migration and completed the all previous migration checklists, you can build and deploy your code to your workers.

**Note**  
If you see a compilation error with the `LeaseManagementConfig` constructor, see [Compilation error with the LeaseManagementConfig constructor](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compilation-error-leasemanagementconfig) for troubleshooting information.

## Step 8: Complete the migration
<a name="kcl-migration-from-2-3-finish"></a>

During the deployment of KCL 3.x code, KCL continues using the lease assignment algorithm from KCL 2.x. When you have successfully deployed KCL 3.x code to all of your workers, KCL automatically detects this and switches to the new lease assignment algorithm based on resource utilization of the workers. For more details about the new lease assignment algorithm, see [How KCL assigns leases to workers and balances the load](kcl-dynamoDB.md#kcl-assign-leases).

During the deployment, you can monitor the migration process with the following metrics emitted to CloudWatch. You can monitor metrics under the `Migration` operation. All metrics are per-KCL-application metrics and set to the `SUMMARY` metric level. If the `Sum` statistic of the `CurrentState:3xWorker` metric matches the total number of workers in your KCL application, it indicates that the migration to KCL 3.x has successfully completed.

**Important**  
 It takes at least 10 minutes for KCL to switch to the new leasee assignment algorithm after all workers are ready to run it.


**CloudWatch metrics for the KCL migration process**  

| Metrics | Description | 
| --- | --- | 
| CurrentState:3xWorker |  The number of KCL workers successfully migrated to KCL 3.x and running the new lease assignment algorithm. If the `Sum` count of this metric matches the total number of your workers, it indicates that the migration to KCL 3.x has successfully completed. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| CurrentState:2xCompatibleWorker |  The number of KCL workers running in KCL 2.x compatible mode during the migration process. A non-zero value for this metric indicates that the migration is still in progress. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| Fault |  The number of exceptions encountered during the migration process. Most of these exceptions are transient errors, and KCL 3.x will automatically retry to complete the migration. If you observe a persistent `Fault` metric value, review your logs from the migration period for further troubleshooting. If the issue continues, contact Support. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| GsiStatusReady |  The status of the global secondary index (GSI) creation on the lease table. This metric indicates whether the GSI on the lease table has been created, a prerequisite to run KCL 3.x. The value is 0 or 1, with 1 indicating successful creation. During a rollback state, this metric will not be emitted. After you roll forward again, you can resume monitoring this metric. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| workerMetricsReady |  Status of worker metrics emission from all workers. The metrics indicates whether all workers are emitting metrics like CPU utilization. The value is 0 or 1, with 1 indicating all workers are successfully emitting metrics and ready for the new lease assignment algorithm. During a rollback state, this metric will not be emitted. After you roll forward again, you can resume monitoring this metric. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html)  | 

KCL provides rollback capability to the 2.x compatible mode during migration. After successful migration to KCL 3.x is successful, we recommend that you remove the `CoordinatorConfig.clientVersionConfig` setting of `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` if rollback is no longer needed. Removing this configuration stops the emission of migration-related metrics from the KCL application.

**Note**  
We recommend that you monitor your application's performance and stability for a period during the migration and after completing the migration. If you observe any issues, you can rollback workers to use KCL 2.x compatible functionality using the [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

# Roll back to the previous KCL version
<a name="kcl-migration-rollback"></a>

This topic explains the steps to roll back your consumer back to the previous version. When you need to roll back, there is a two-step process: 

1. Run the [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Redeploy previous KCL version code (optional).

## Step 1: Run the KCL Migration Tool
<a name="kcl-migration-rollback-tool"></a>

When you need to roll back to the previous KCL version, you must run the KCL Migration Tool. The KCL Migration Tool does two important tasks:
+ It removes a metadata table called worker metrics table and global secondary index on the lease table in DynamoDB. These two artifacts are created by KCL 3.x but are not needed when you roll back to the previous version.
+ It makes all workers run in a mode compatible with KCL 2.x and start using the load balancing algorithm used in previous KCL versions. If you have issues with the new load balancing algorithm in KCL 3.x, this will mitigate the issue immediately.

**Important**  
The coordinator state table in DynamoDB must exist and must not be deleted during the migration, rollback, and rollforward process. 

**Note**  
It's important that all workers in your consumer application use the same load balancing algorithm at a given time. The KCL Migration Tool makes sure that all workers in your KCL 3.x consumer application switch to the KCL 2.x compatible mode so that all workers run the same load balancing algorithm during the rolling depayment back to your previous KCL version.

You can download the [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) in the scripts directory of the [KCL GitHub repository](https://github.com/awslabs/amazon-kinesis-client/tree/master). The script can be run from any of your workers or any host which has the required permissions to write to the coordinator state table, delete the worker metrics table, and update the lease table. You can refer to [IAM permissions required for KCL consumer applications](kcl-iam-permissions.md) for required IAM permission to run the script. You must run the script only once per KCL application. You can run the KCL Migration Tool with the following command: 

```
python3 ./KclMigrationTool.py --region <region> --mode rollback [--application_name <applicationName>] [--lease_table_name <leaseTableName>] [--coordinator_state_table_name <coordinatorStateTableName>] [--worker_metrics_table_name <workerMetricsTableName>]
```

**Parameters**
+ --region: Replace `<region>` with your AWS Region.
+ --application\$1name: This parameter is required if you're using default names for your DynamoDB metadata tables (lease table, coordinator state table, and worker metrics table). If you have specified custom names for these tables, you can omit this parameter. Replace `<applicationName>` with your actual KCL application name. The tool uses this name to derive the default table names if custom names are not provided.
+ --lease\$1table\$1name (optional): This parameter is needed when you have set a custom name for the lease table in your KCL configuration. If you're using the default table name, you can omit this parameter. Replace `leaseTableName` with the custom table name you specified for your lease table.
+ --coordinator\$1state\$1table\$1name (optional): This parameter is needed when you have set a custom name for the coordinator state table in your KCL configuration. If you're using the default table name, you can omit this parameter. Replace `<coordinatorStateTableName>` with the custom table name you specified for your coordinator state table. 
+ --worker\$1metrics\$1table\$1name (optional): This parameter is needed when you have set a custom name for the worker metrics table in your KCL configuration. If you're using the default table name, you can omit this parameter. Replace `<workerMetricsTableName>` with the custom table name you specified for your worker metrics table. 

## Step 2: Redeploy the code with the previous KCL version (optional)
<a name="kcl-migration-rollback-redeploy"></a>

 After running the KCL Migration Tool for a rollback, you'll see one of these messages:
+ **Message 1:** “Rollback completed. Your KCL application was running the KCL 2.x compatible mode. If you don't see mitigation of any regression, please rollback to your previous application binaries by deploying the code with your previous KCL version.”
  + **Required action: **This means that your workers were running in the KCL 2.x compatible mode. If the issue persists, redeploy the code with the previous KCL version to your workers.
+ **Message 2: **“Rollback completed. Your KCL application was running the KCL 3.x functionality mode. Rollback to the previous application binaries is not necessary, unless you don’t see any mitigation for the issue within 5 minutes. If you still have an issue, please rollback to your previous application binaries by deploying the code with your previous KCL version.”
  + **Required action: **This means that your workers were running in KCL 3.x mode and the KCL Migration Tool switched all workers to KCL 2.x compatible mode. If the issue is resolved, you don't need to redeploy the code with the previous KCL version. If the issue persists, redeploy the code with the previous KCL version to your workers.

 

# Roll forward to KCL 3.x after a rollback
<a name="kcl-migration-rollforward"></a>

This topic explains the steps to roll forward your consumer back to KCL 3.x after a rollback. When you need to roll forward, you must go through a two-step process: 

1. Run the [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py). 

1. Deploy the code with KCL 3.x.

## Step 1: Run the KCL Migration Tool
<a name="kcl-migration-rollback-tool"></a>

Run the KCL Migration Tool. KCL Migration Tool with the following command to roll forward to KCL 3.x:

```
python3 ./KclMigrationTool.py --region <region> --mode rollforward [--application_name <applicationName>] [--coordinator_state_table_name <coordinatorStateTableName>]
```

**Parameters**
+ --region: Replace `<region>` with your AWS Region.
+ --application\$1name: This parameter is required if you're using default names for your coordinator state table. If you have specified custom names for the coordinator state table, you can omit this parameter. Replace `<applicationName>` with your actual KCL application name. The tool uses this name to derive the default table names if custom names are not provided.
+ --coordinator\$1state\$1table\$1name (optional): This parameter is needed when you have set a custom name for the coordinator state table in your KCL configuration. If you're using the default table name, you can omit this parameter. Replace `<coordinatorStateTableName>` with the custom table name you specified for your coordinator state table. 

After you run the migration tool in roll-forward mode, KCL creates the following DynamoDB resources required for KCL 3.x:
+ A Global Secondary Index on the lease table
+ A worker metrics table

## Step 2: Deploy the code with KCL 3.x
<a name="kcl-migration-rollback-redeploy"></a>

After running the KCL Migration Tool for a roll forward, deploy your code with KCL 3.x to your workers. Follow [Step 8: Complete the migration](kcl-migration-from-2-3.md#kcl-migration-from-2-3-finish) to complete your migration.

# Best practices for the lease table with provisioned capacity mode
<a name="kcl-migration-lease-table"></a>

If the lease table of your KCL application was switched to provisioned capacity mode, KCL 3.x creates a global secondary index on the lease table with the provisioned billing mode and the same read capacity units (RCU) and write capacity units (WCU) as the base lease table. When the global secondary index is created, we recommend that you monitor the actual usage on the global secondary index in the DynamoDB console and adjust the capacity units if needed. For a more detailed guide about switching the capacity mode of DynamoDB metadata tables created by KCL, see [DynamoDB capacity mode for metadata tables created by KCL](kcl-dynamoDB.md#kcl-capacity-mode). 

**Note**  
By default, KCL creates metadata tables such as the lease table, worker metrics table, and coordinator state table, and the global secondary index on the lease table using the on-demand capacity mode. We recommend that you use the on-demand capacity mode to automatically adjust the capacity based on your usage changes. 

# Migrating from KCL 1.x to KCL 3.x
<a name="kcl-migration-1-3"></a>

This topic explains the instructions to migrate your consumer from KCL 1.x to KCL 3.x. KCL 1.x uses different classes and interfaces compared to KCL 2.x and KCL 3.x. You must migrate the record processor, record processor factory, and worker classes to the KCL 2.x/3.x compatible format first, and follow the migration steps for KCL 2.x to KCL 3.x migration. You can directly upgrade from KCL 1.x to KCL 3.x.
+ **Step 1: Migrate the record processor**

  Follow the [Migrate the record processor](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) section in the [Migrate consumers from KCL 1.x to KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) page.
+ **Step 2: Migrate the record processor factory**

  Follow the [Migrate the record processor factory](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-factory-migration) section in the [Migrate consumers from KCL 1.x to KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) page.
+ **Step 3: Migrate the worker**

  Follow the [Migrate the worker](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#worker-migration) section in the [Migrate consumers from KCL 1.x to KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) page.
+ **Step 4: Migrate KCL 1.x configuration **

  Follow the [Configure the Amazon Kinesis client](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration) section in the [Migrate consumers from KCL 1.x to KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) page.
+ **Step 5: Check idle time removal and client configuration removals**

  Follow the [Idle time removal ](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#idle-time-removal)and [Client configuration removals](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration-removals) sections in the [Migrate consumers from KCL 1.x to KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) page.
+ **Step 6: Follow the step-by-step instructions in the KCL 2.x to KCL 3.x migration guide**

  Follow instructions on the [Migrate from KCL 2.x to KCL 3.x](kcl-migration-from-2-3.md) page to complete the migration. If you need to roll back to the previous KCL version or roll forward to KCL 3.x after a rollback, refer to [Roll back to the previous KCL version](kcl-migration-rollback.md) and [Roll forward to KCL 3.x after a rollback](kcl-migration-rollforward.md).

**Important**  
Do not use AWS SDK for Java version 2.27.19 to 2.27.23 with KCL 3.x. These versions include an issue that causes an exception error related to KCL's DynamoDB usage. We recommend that you use the AWS SDK for Java version 2.28.0 or later to avoid this issue. 

# Previous KCL version documentation
<a name="kcl-archive"></a>

The following topics have been archived. To see current Kinesis Client Library documentation, see [Use Kinesis Client Library](kcl.md).

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

**Topics**
+ [

# KCL 1.x and 2.x information
](shared-throughput-kcl-consumers.md)
+ [

# Develop custom consumers with shared throughput
](shared-throughput-consumers.md)
+ [

# Migrate consumers from KCL 1.x to KCL 2.x
](kcl-migration.md)

# KCL 1.x and 2.x information
<a name="shared-throughput-kcl-consumers"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

One of the methods of developing custom consumer applications that can process data from KDS data streams is to use the Kinesis Client Library (KCL).

**Topics**
+ [

## About KCL (previous versions)
](#shared-throughput-kcl-consumers-overview)
+ [

## KCL previous versions
](#shared-throughput-kcl-consumers-versions)
+ [

## KCL concepts (previous versions)
](#shared-throughput-kcl-consumers-concepts)
+ [

## Use a lease table to track the shards processed by the KCL consumer application
](#shared-throughput-kcl-consumers-leasetable)
+ [

## Process multiple data streams with the same KCL 2.x for Java consumer application
](#shared-throughput-kcl-multistream)
+ [

## Use the KCL with the AWS Glue Schema Registry
](#shared-throughput-kcl-consumers-glue-schema-registry)

**Note**  
For both KCL 1.x and KCL 2.x, it is recommended that you upgrade to the latest KCL 1.x version or KCL 2.x version, depending on your usage scenario. Both KCL 1.x and KCL 2.x are regularly updated with newer releases that include the latest dependency and security patches, bug fixes, and backward-compatible new features. For more information, see [https://github.com/awslabs/amazon-kinesis-client/releases](https://github.com/awslabs/amazon-kinesis-client/releases).

## About KCL (previous versions)
<a name="shared-throughput-kcl-consumers-overview"></a>

KCL helps you consume and process data from a Kinesis data stream by taking care of many of the complex tasks associated with distributed computing. These include load balancing across multiple consumer application instances, responding to consumer application instance failures, checkpointing processed records, and reacting to resharding. The KCL takes care of all of these subtasks so that you can focus your efforts on writing your custom record-processing logic.

The KCL is different from the Kinesis Data Streams APIs that are available in the AWS SDKs. The Kinesis Data Streams APIs help you manage many aspects of Kinesis Data Streams, including creating streams, resharding, and putting and getting records. The KCL provides a layer of abstraction around all these subtasks, specifically so that you can focus on your consumer application’s custom data processing logic. For information about the Kinesis Data Streams API, see the [Amazon Kinesis API Reference](https://docs.aws.amazon.com/kinesis/latest/APIReference/Welcome.html).

**Important**  
The KCL is a Java library. Support for languages other than Java is provided using a multi-language interface called the MultiLangDaemon. This daemon is Java-based and runs in the background when you are using a KCL language other than Java. For example, if you install the KCL for Python and write your consumer application entirely in Python, you still need Java installed on your system because of the MultiLangDaemon. Further, MultiLangDaemon has some default settings that you might need to customize for your use case, for example, the AWS region that it connects to. For more information about the MultiLangDaemon on GitHub, see [KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

The KCL acts as an intermediary between your record processing logic and Kinesis Data Streams. 

## KCL previous versions
<a name="shared-throughput-kcl-consumers-versions"></a>

Currently, you can use either of the following supported versions of KCL to build your custom consumer applications:
+ **KCL 1.x**

  For more information, see [Develop KCL 1.x consumers](developing-consumers-with-kcl.md)
+ **KCL 2.x**

  For more information, see [Develop KCL 2.x Consumers](developing-consumers-with-kcl-v2.md)

You can use either KCL 1.x or KCL 2.x to build consumer applications that use shared throughput. For more information, see [Develop custom consumers with shared throughput using KCL](custom-kcl-consumers.md).

To build consumer applications that use dedicated throughput (enhanced fan-out consumers), you can only use KCL 2.x. For more information, see [Develop enhanced fan-out consumers with dedicated throughput](enhanced-consumers.md).

For information about the differences between KCL 1.x and KCL 2.x, and instructions on how to migrate from KCL 1.x to KCL 2.x, see [Migrate consumers from KCL 1.x to KCL 2.x](kcl-migration.md).

## KCL concepts (previous versions)
<a name="shared-throughput-kcl-consumers-concepts"></a>
+ **KCL consumer application** – an application that is custom-built using KCL and designed to read and process records from data streams. 
+ **Consumer application instance** - KCL consumer applications are typically distributed, with one or more application instances running simultaneously in order to coordinate on failures and dynamically load balance data record processing.
+ **Worker** – a high level class that a KCL consumer application instance uses to start processing data. 
**Important**  
Each KCL consumer application instance has one worker. 

  The worker initializes and oversees various tasks, including syncing shard and lease information, tracking shard assignments, and processing data from the shards. A worker provides KCL with the configuration information for the consumer application, such as the name of the data stream whose data records this KCL consumer application is going to process and the AWS credentials that are needed to access this data stream. The worker also kick starts that specific KCL consumer application instance to deliver data records from the data stream to the record processors.
**Important**  
In KCL 1.x this class is called **Worker**. For more information, (these are the Java KCL repositories), see [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java). In KCL 2.x, this class is called **Scheduler**. Scheduler’s purpose in KCL 2.x is identical to Worker’s purpose in KCL 1.x. For more information about the Scheduler class in KCL 2.x, see [https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java). 
+ **Lease** – data that defines the binding between a worker and a shard. Distributed KCL consumer applications use leases to partition data record processing across a fleet of workers. At any given time, each shard of data records is bound to a particular worker by a lease identified by the **leaseKey** variable. 

  By default, a worker can hold one or more leases (subject to the value of the **maxLeasesForWorker** variable) at the same time. 
**Important**  
Every worker will contend to hold all available leases for all available shards in a data stream. But only one worker will successfully hold each lease at any one time. 

  For example, if you have a consumer application instance A with worker A that is processing a data stream with 4 shards, worker A can hold leases to shards 1, 2, 3, and 4 at the same time. But if you have two consumer application instances: A and B with worker A and worker B, and these instances are processing a data stream with 4 shards, worker A and worker B cannot both hold the lease to shard 1 at the same time. One worker holds the lease to a particular shard until it is ready to stop processing this shard’s data records or until it fails. When one worker stops holding the lease, another worker takes up and holds the lease. 

  For more information, (these are the Java KCL repositories), see [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java) for KCL 1.x and [https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java) for KCL 2.x.
+ **Lease table** - a unique Amazon DynamoDB table that is used to keep track of the shards in a KDS data stream that are being leased and processed by the workers of the KCL consumer application. The lease table must remain in sync (within a worker and across all workers) with the latest shard information from the data stream while the KCL consumer application is running. For more information, see [Use a lease table to track the shards processed by the KCL consumer application](#shared-throughput-kcl-consumers-leasetable).
+ **Record processor** – the logic that defines how your KCL consumer application processes the data that it gets from the data streams. At runtime, a KCL consumer application instance instantiates a worker, and this worker instantiates one record processor for every shard to which it holds a lease. 

## Use a lease table to track the shards processed by the KCL consumer application
<a name="shared-throughput-kcl-consumers-leasetable"></a>

**Topics**
+ [

### What is a lease table
](#shared-throughput-kcl-consumers-what-is-leasetable)
+ [

### Throughput
](#shared-throughput-kcl-leasetable-throughput)
+ [

### How a lease table is synchronized with the shards in a Kinesis data stream
](#shared-throughput-kcl-consumers-leasetable-sync)

### What is a lease table
<a name="shared-throughput-kcl-consumers-what-is-leasetable"></a>

For each Amazon Kinesis Data Streams application, KCL uses a unique lease table (stored in a Amazon DynamoDB table) to keep track of the shards in a KDS data stream that are being leased and processed by the workers of the KCL consumer application.

**Important**  
KCL uses the name of the consumer application to create the name of the lease table that this consumer application uses, therefore each consumer application name must be unique.

You can view the lease table using the [Amazon DynamoDB console](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) while the consumer application is running.

If the lease table for your KCL consumer application does not exist when the application starts up, one of the workers creates the lease table for this application. 

**Important**  
 Your account is charged for the costs associated with the DynamoDB table, in addition to the costs associated with Kinesis Data Streams itself. 

Each row in the lease table represents a shard that is being processed by the workers of your consumer application. If your KCL consumer application processes only one data stream, then `leaseKey` which is the hash key for the lease table is the shard ID. If you are [Process multiple data streams with the same KCL 2.x for Java consumer application](#shared-throughput-kcl-multistream), then the structure of the leaseKey looks like this: `account-id:StreamName:streamCreationTimestamp:ShardId`. For example, `111111111:multiStreamTest-1:12345:shardId-000000000336`.

In addition to the shard ID, each row also includes the following data:
+ **checkpoint:** The most recent checkpoint sequence number for the shard. This value is unique across all shards in the data stream.
+ **checkpointSubSequenceNumber:** When using the Kinesis Producer Library's aggregation feature, this is an extension to **checkpoint** that tracks individual user records within the Kinesis record.
+ **leaseCounter:** Used for lease versioning so that workers can detect that their lease has been taken by another worker.
+ **leaseKey:** A unique identifier for a lease. Each lease is particular to a shard in the data stream and is held by one worker at a time.
+ **leaseOwner:** The worker that is holding this lease.
+ **ownerSwitchesSinceCheckpoint:** How many times this lease has changed workers since the last time a checkpoint was written.
+ **parentShardId:** Used to ensure that the parent shard is fully processed before processing starts on the child shards. This ensures that records are processed in the same order they were put into the stream.
+ **hashrange:** Used by the `PeriodicShardSyncManager` to run periodic syncs to find missing shards in the lease table and create leases for them if required. 
**Note**  
This data is present in the lease table for every shard starting with KCL 1.14 and KCL 2.3. For more information about `PeriodicShardSyncManager` and periodic synchronization between leases and shards, see [How a lease table is synchronized with the shards in a Kinesis data stream](#shared-throughput-kcl-consumers-leasetable-sync).
+ **childshards:** Used by the `LeaseCleanupManager` to review the child shard's processing status and decide whether the parent shard can be deleted from the lease table.
**Note**  
This data is present in the lease table for every shard starting with KCL 1.14 and KCL 2.3.
+ **shardID:** The ID of the shard.
**Note**  
This data is only present in the lease table if you are [Process multiple data streams with the same KCL 2.x for Java consumer application](#shared-throughput-kcl-multistream). This is only supported in KCL 2.x for Java, starting with KCL 2.3 for Java and later. 
+ **stream name** The identifier of the data stream in the following format: `account-id:StreamName:streamCreationTimestamp`.
**Note**  
This data is only present in the lease table if you are [Process multiple data streams with the same KCL 2.x for Java consumer application](#shared-throughput-kcl-multistream). This is only supported in KCL 2.x for Java, starting with KCL 2.3 for Java and later. 

### Throughput
<a name="shared-throughput-kcl-leasetable-throughput"></a>

If your Amazon Kinesis Data Streams application receives provisioned-throughput exceptions, you should increase the provisioned throughput for the DynamoDB table. The KCL creates the table with a provisioned throughput of 10 reads per second and 10 writes per second, but this might not be sufficient for your application. For example, if your Amazon Kinesis Data Streams application does frequent checkpointing or operates on a stream that is composed of many shards, you might need more throughput.

For information about provisioned throughput in DynamoDB, see [Read/Write Capacity Mode](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html) and [Working with Tables and Data](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html) in the *Amazon DynamoDB Developer Guide*.

### How a lease table is synchronized with the shards in a Kinesis data stream
<a name="shared-throughput-kcl-consumers-leasetable-sync"></a>

Workers in KCL consumer applications use leases to process shards from a given data stream. The information on what worker is leasing what shard at any given time is stored in a lease table. The lease table must remain in sync with the latest shard information from the data stream while the KCL consumer application is running. KCL synchronizes the lease table with the shards information acquired from the Kinesis Data Streams service during the consumer application bootstraping (either when the consumer application is initialized or restarted) and also whenever a shard that is being processed reaches an end (resharding). In other words, the workers or a KCL consumer application are synchronized with the data stream that they are processing during the initial consumer application bootstrap and whenever the consumer application encounters a data stream reshard event.

**Topics**
+ [

#### Synchronization in KCL 1.0 - 1.13 and KCL 2.0 - 2.2
](#shared-throughput-kcl-consumers-leasetable-sync-old)
+ [

#### Synchronization in KCL 2.x, starting with KCL 2.3 and later
](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl2)
+ [

#### Synchronization in KCL 1.x, starting with KCL 1.14 and later
](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl1)

#### Synchronization in KCL 1.0 - 1.13 and KCL 2.0 - 2.2
<a name="shared-throughput-kcl-consumers-leasetable-sync-old"></a>

In KCL 1.0 - 1.13 and KCL 2.0 - 2.2, during consumer application's bootstraping and also during each data stream reshard event, KCL synchronizes the lease table with the shards information acquired from the Kinesis Data Streams service by invoking the `ListShards` or the `DescribeStream` discovery APIs. In all the KCL versions listed above, each worker of a KCL consumer application completes the following steps to perform the lease/shard synchronization process during the consumer application's bootstrapping and at each stream reshard event:
+ Fetches all the shards for data the stream that is being processed
+ Fetches all the shard leases from the lease table
+ Filters out each open shard that does not have a lease in the lease table
+ Iterates over all found open shards and for each open shard with no open parent:
  + Traverses the hierarchy tree through its ancestors path to determine if the shard is a descendant. A shard is considered a descendant, if an ancestor shard is being processed (lease entry for ancestor shard exists in the lease table) or if an ancestor shard should be processed (for example, if the initial position is `TRIM_HORIZON` or `AT_TIMESTAMP`)
  + If the open shard in context is a descendant, KCL checkpoints the shard based on initial position and creates leases for its parents, if required

#### Synchronization in KCL 2.x, starting with KCL 2.3 and later
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl2"></a>

Starting with the latest supported versions of KCL 2.x (KCL 2.3) and later, the library now supports the following changes to the synchronization process. These lease/shard synchronization changes significantly reduce the number of API calls made by KCL consumer applications to the Kinesis Data Streams service and optimize the lease management in your KCL consumer application. 
+ During application's bootstraping, if the lease table is empty, KCL utilizes the `ListShard` API's filtering option (the `ShardFilter` optional request parameter) to retrieve and create leases only for a snapshot of shards open at the time specified by the `ShardFilter` parameter. The `ShardFilter` parameter enables you to filter out the response of the `ListShards` API. The only required property of the `ShardFilter` parameter is `Type`. KCL uses the `Type` filter property and the following of its valid values to identify and return a snapshot of open shards that might require new leases:
  + `AT_TRIM_HORIZON` - the response includes all the shards that were open at `TRIM_HORIZON`. 
  + `AT_LATEST` - the response includes only the currently open shards of the data stream. 
  + `AT_TIMESTAMP` - the response includes all shards whose start timestamp is less than or equal to the given timestamp and end timestamp is greater than or equal to the given timestamp or still open.

  `ShardFilter` is used when creating leases for an empty lease table to initialize leases for a snapshot of shards specified at `RetrievalConfig#initialPositionInStreamExtended`.

  For more information about `ShardFilter`, see [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html).
+ Instead of all workers performing the lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard synchronization.
+ KCL 2.3 uses the `ChildShards` return parameter of the `GetRecords` and the `SubscribeToShard` APIs to perform lease/shard synchronization that happens at `SHARD_END` for closed shards, allowing a KCL worker to only create leases for the child shards of the shard it finished processing. For shared throughout consumer applications, this optimization of the lease/shard synchronization uses the `ChildShards` parameter of the `GetRecords` API. For the dedicated throughput (enhanced fan-out) consumer applications, this optimization of the lease/shard synchronization uses the `ChildShards` parameter of the `SubscribeToShard` API. For more information, see [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html), [SubscribeToShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html), and [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).
+ With the above changes, the behavior of KCL is moving from the model of all workers learning about all existing shards to the model of workers learning only about the children shards of the shards that each worker owns. Therefore, in addition to the synchronization that happens during consumer application bootstraping and reshard events, KCL now also performs additional periodic shard/lease scans in order to identify any potential holes in the lease table (in other words, to learn about all new shards) to ensure the complete hash range of the data stream is being processed and create leases for them if required. `PeriodicShardSyncManager` is the component that is responsible for running periodic lease/shard scans. 

  For more information about `PeriodicShardSyncManager` in KCL 2.3, see [https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java\$1L201-L213](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213).

  In KCL 2.3, new configuration options are available to configure `PeriodicShardSyncManager` in `LeaseManagementConfig`:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html)

  New CloudWatch metrics are also now emitted to monitor the health of the `PeriodicShardSyncManager`. For more information, see [PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task).
+ Including an optimization to `HierarchicalShardSyncer` to only create leases for one layer of shards.

#### Synchronization in KCL 1.x, starting with KCL 1.14 and later
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl1"></a>

Starting with the latest supported versions of KCL 1.x (KCL 1.14) and later, the library now supports the following changes to the synchronization process. These lease/shard synchronization changes significantly reduce the number of API calls made by KCL consumer applications to the Kinesis Data Streams service and optimize the lease management in your KCL consumer application. 
+ During application's bootstraping, if the lease table is empty, KCL utilizes the `ListShard` API's filtering option (the `ShardFilter` optional request parameter) to retrieve and create leases only for a snapshot of shards open at the time specified by the `ShardFilter` parameter. The `ShardFilter` parameter enables you to filter out the response of the `ListShards` API. The only required property of the `ShardFilter` parameter is `Type`. KCL uses the `Type` filter property and the following of its valid values to identify and return a snapshot of open shards that might require new leases:
  + `AT_TRIM_HORIZON` - the response includes all the shards that were open at `TRIM_HORIZON`. 
  + `AT_LATEST` - the response includes only the currently open shards of the data stream. 
  + `AT_TIMESTAMP` - the response includes all shards whose start timestamp is less than or equal to the given timestamp and end timestamp is greater than or equal to the given timestamp or still open.

  `ShardFilter` is used when creating leases for an empty lease table to initialize leases for a snapshot of shards specified at `KinesisClientLibConfiguration#initialPositionInStreamExtended`.

  For more information about `ShardFilter`, see [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html).
+ Instead of all workers performing the lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard synchronization.
+ KCL 1.14 uses the `ChildShards` return parameter of the `GetRecords` and the `SubscribeToShard` APIs to perform lease/shard synchronization that happens at `SHARD_END` for closed shards, allowing a KCL worker to only create leases for the child shards of the shard it finished processing. For more information, see [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) and [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).
+ With the above changes, the behavior of KCL is moving from the model of all workers learning about all existing shards to the model of workers learning only about the children shards of the shards that each worker owns. Therefore, in addition to the synchronization that happens during consumer application bootstraping and reshard events, KCL now also performs additional periodic shard/lease scans in order to identify any potential holes in the lease table (in other words, to learn about all new shards) to ensure the complete hash range of the data stream is being processed and create leases for them if required. `PeriodicShardSyncManager` is the component that is responsible for running periodic lease/shard scans. 

  When `KinesisClientLibConfiguration#shardSyncStrategyType` is set to `ShardSyncStrategyType.SHARD_END`, `PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold` is used to determine the threshold for number of consecutive scans containing holes in the lease table after which to enforce a shard synchronization. When `KinesisClientLibConfiguration#shardSyncStrategyType` is set to `ShardSyncStrategyType.PERIODIC`, `leasesRecoveryAuditorInconsistencyConfidenceThreshold` is ignored.

  For more information about `PeriodicShardSyncManager` in KCL 1.14, see [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java\$1L987-L999](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999).

  In KCL 1.14, new configuration option is available to configure `PeriodicShardSyncManager` in `LeaseManagementConfig`:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html)

  New CloudWatch metrics are also now emitted to monitor the health of the `PeriodicShardSyncManager`. For more information, see [PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task).
+ KCL 1.14 now also supports deferred lease cleanup. Leases are deleted asynchronously by `LeaseCleanupManager` upon reaching `SHARD_END`, when a shard has either expired past the data stream’s retention period or been closed as the result of a resharding operation.

  New configuration options are available to configure `LeaseCleanupManager`.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html)
+ Including an optimization to `KinesisShardSyncer` to only create leases for one layer of shards.

## Process multiple data streams with the same KCL 2.x for Java consumer application
<a name="shared-throughput-kcl-multistream"></a>

This section describes the following changes in KCL 2.x for Java that enable you to create KCL consumer applications that can process more than one data stream at the same time. 

**Important**  
Multistream processing is only supported in KCL 2.x for Java, starting with KCL 2.3 for Java and later.   
Multistream processing is NOT supported for any other languages in which KCL 2.x can be implemented.  
Multistream processing is NOT supported in any versions of KCL 1.x.
+ **MultistreamTracker interface**

  To build a consumer application that can process multiple streams at the same time, you must implement a new interface called [MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java). This interface includes the `streamConfigList` method that returns the list of data streams and their configurations to be processed by the KCL consumer application. Notice that the data streams that are being processed can be changed during the consumer application runtime. `streamConfigList` is called periodically by the KCL to learn about the changes in data streams to process.

  The `streamConfigList` method populates the [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23) list. 

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```

  Note that the `StreamIdentifier` and `InitialPositionInStreamExtended` are required fields, while `consumerArn` is optional. You must provide the `consumerArn` only if you are using KCL 2.x to implement an enhanced fan-out consumer application.

  For more information about `StreamIdentifier`, see [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java\$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129). To create a `StreamIdentifier`, we recommend that you create a multistream instance from the `streamArn` and the `streamCreationEpoch` that is available in v2.5.0 and later. In KCL v2.3 and v2.4, which don't support `streamArm`, create a multistream instance by using the format `account-id:StreamName:streamCreationTimestamp`. This format will be deprecated and no longer supported starting with the next major release.

  `MultistreamTracker` also includes a strategy for deleting leases of old streams in the lease table (`formerStreamsLeasesDeletionStrategy`). Notice that the strategy CANNOT be changed during the consumer application runtime. For more information, see [https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java)
+ [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java) is a an application-wide class that you can use to specify all of the KCL 2.x configuration settings to be used when building your KCL consumer application. `ConfigsBuilder` class now has support for the `MultistreamTracker` interface. You can initialize ConfigsBuilder either with the name of the one data stream to consume records from:

  ```
   /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```

  Or you can initialize ConfigsBuilder with `MultiStreamTracker` if you want to implement a KCL consumer application that processes multiple streams at the same time.

  ```
  * Constructor to initialize ConfigsBuilder with MultiStreamTracker
       * @param multiStreamTracker
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.left(multiStreamTracker);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```
+ With multistream support implemented for your KCL consumer application, each row of the application's lease table now contains the shard ID and the stream name of the multiple data streams that this application processes. 
+ When multistream support for your KCL consumer application is implemented, the leaseKey takes the following structure: `account-id:StreamName:streamCreationTimestamp:ShardId`. For example, `111111111:multiStreamTest-1:12345:shardId-000000000336`.
**Important**  
When your existing KCL consumer application is configured to process only one data stream, the leaseKey (which is the hash key for the lease table) is the shard ID. If you reconfigure this existing KCL consumer application to process multiple data streams, it breaks your lease table, because with multistream support, the leaseKey structure must be as follows: `account-id:StreamName:StreamCreationTimestamp:ShardId`.

## Use the KCL with the AWS Glue Schema Registry
<a name="shared-throughput-kcl-consumers-glue-schema-registry"></a>

You can integrate your Kinesis data streams with the AWS Glue Schema Registry. The AWS Glue Schema Registry allows you to centrally discover, control, and evolve schemas, while ensuring data produced is continuously validated by a registered schema. A schema defines the structure and format of a data record. A schema is a versioned specification for reliable data publication, consumption, or storage. The AWS GlueSchema Registry lets you improve end-to-end data quality and data governance within your streaming applications. For more information, see [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). One of the ways to set up this integration is through the KCL in Java. 

**Important**  
Currently, Kinesis Data Streams and AWS Glue Schema Registry integration is only supported for the Kinesis data streams that use KCL 2.3 consumers implemented in Java. Multi-language support is not provided. KCL 1.0 consumers are not supported. KCL 2.x consumers prior to KCL 2.3 are not supported.

For detailed instructions on how to set up integration of Kinesis Data Streams with Schema Registry using the KCL, see the "Interacting with Data Using the KPL/KCL Libraries" section in [Use Case: Integrating Amazon Kinesis Data Streams with the AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds).

# Develop custom consumers with shared throughput
<a name="shared-throughput-consumers"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

If you don't need dedicated throughput when receiving data from Kinesis Data Streams, and if you don't need read propagation delays under 200 ms, you can build consumer applications as described in the following topics. You can use the Kinesis Client Library (KCL) or the AWS SDK for Java.

**Topics**
+ [

# Develop custom consumers with shared throughput using KCL
](custom-kcl-consumers.md)

For information about building consumers that can receive records from Kinesis data streams with dedicated throughput, see [Develop enhanced fan-out consumers with dedicated throughput](enhanced-consumers.md).

# Develop custom consumers with shared throughput using KCL
<a name="custom-kcl-consumers"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

One of the methods of developing a custom consumer application with shared throughput is to use the Kinesis Client Library (KCL). 

Choose from the following topics for the KCL version you are using.

**Topics**
+ [

# Develop KCL 1.x consumers
](developing-consumers-with-kcl.md)
+ [

# Develop KCL 2.x Consumers
](developing-consumers-with-kcl-v2.md)

# Develop KCL 1.x consumers
<a name="developing-consumers-with-kcl"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

You can develop a consumer application for Amazon Kinesis Data Streams using the Kinesis Client Library (KCL). 

For more information about KCL, see [About KCL (previous versions)](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview).

Choose from the following topics depending on the option you want to use.

**Topics**
+ [

# Develop a Kinesis Client Library consumer in Java
](kinesis-record-processor-implementation-app-java.md)
+ [

# Develop a Kinesis Client Library consumer in Node.js
](kinesis-record-processor-implementation-app-nodejs.md)
+ [

# Develop a Kinesis Client Library consumer in .NET
](kinesis-record-processor-implementation-app-dotnet.md)
+ [

# Develop a Kinesis Client Library consumer in Python
](kinesis-record-processor-implementation-app-py.md)
+ [

# Develop a Kinesis Client Library consumer in Ruby
](kinesis-record-processor-implementation-app-ruby.md)

# Develop a Kinesis Client Library consumer in Java
<a name="kinesis-record-processor-implementation-app-java"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

You can use the Kinesis Client Library (KCL) to build applications that process data from your Kinesis data streams. The Kinesis Client Library is available in multiple languages. This topic discusses Java. To view the Javadoc reference, see the [AWS Javadoc topic for Class AmazonKinesisClient](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html).

To download the Java KCL from GitHub, go to [Kinesis Client Library (Java)](https://github.com/awslabs/amazon-kinesis-client). To locate the Java KCL on Apache Maven, go to the [KCL search results](https://search.maven.org/#search|ga|1|amazon-kinesis-client) page. To download sample code for a Java KCL consumer application from GitHub, go to the [KCL for Java sample project](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis) page on GitHub. 

The sample application uses [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html). You can change the logging configuration in the static `configure` method defined in the `AmazonKinesisApplicationSample.java` file. For more information about how to use Apache Commons Logging with Log4j and AWS Java applications, see [Logging with Log4j](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html) in the *AWS SDK for Java Developer Guide*.

You must complete the following tasks when implementing a KCL consumer application in Java:

**Topics**
+ [

## Implement the IRecordProcessor methods
](#kinesis-record-processor-implementation-interface-java)
+ [

## Implement a class factory for the IRecordProcessor interface
](#kinesis-record-processor-implementation-factory-java)
+ [

## Create a worker
](#kcl-java-worker)
+ [

## Modify the configuration properties
](#kinesis-record-processor-initialization-java)
+ [

## Migrate to Version 2 of the record processor interface
](#kcl-java-v2-migration)

## Implement the IRecordProcessor methods
<a name="kinesis-record-processor-implementation-interface-java"></a>

The KCL currently supports two versions of the `IRecordProcessor` interface:The original interface is available with the first version of the KCL, and version 2 is available starting with KCL version 1.5.0. Both interfaces are fully supported. Your choice depends on your specific scenario requirements. Refer to your locally built Javadocs or the source code to see all the differences. The following sections outline the minimal implementation for getting started.

**Topics**
+ [

### Original Interface (Version 1)
](#kcl-java-interface-original)
+ [

### Updated interface (Version 2)
](#kcl-java-interface-v2)

### Original Interface (Version 1)
<a name="kcl-java-interface-original"></a>

The original `IRecordProcessor` interface (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) exposes the following record processor methods that your consumer must implement. The sample provides implementations that you can use as a starting point (see `AmazonKinesisApplicationSampleRecordProcessor.java`).

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**initialize**  
The KCL calls the `initialize` method when the record processor is instantiated, passing a specific shard ID as a parameter. This record processor processes only this shard and typically, the reverse is also true (this shard is processed only by this record processor). However, your consumer should account for the possibility that a data record might be processed more than one time. Kinesis Data Streams has *at least once* semantics, meaning that every data record from a shard is processed at least one time by a worker in your consumer. For more information about cases in which a particular shard may be processed by more than one worker, see [Use resharding, scaling, and parallel processing to change the number of shards](kinesis-record-processor-scaling.md).

```
public void initialize(String shardId)
```

**processRecords**  
The KCL calls the `processRecords` method, passing a list of data record from the shard specified by the `initialize(shardId)` method. The record processor processes the data in these records according to the semantics of the consumer. For example, the worker might perform a transformation on the data and then store the result in an Amazon Simple Storage Service (Amazon S3) bucket.

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

In addition to the data itself, the record also contains a sequence number and partition key. The worker can use these values when processing the data. For example, the worker could choose the S3 bucket in which to store the data based on the value of the partition key. The `Record` class exposes the following methods that provide access to the record's data, sequence number, and partition key. 

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

In the sample, the private method `processRecordsWithRetries` has code that shows how a worker can access the record's data, sequence number, and partition key.

Kinesis Data Streams requires the record processor to keep track of the records that have already been processed in a shard. The KCL takes care of this tracking for you by passing a checkpointer (`IRecordProcessorCheckpointer`) to `processRecords`. The record processor calls the `checkpoint` method on this interface to inform the KCL of how far it has progressed in processing the records in the shard. If the worker fails, the KCL uses this information to restart the processing of the shard at the last known processed record.

For a split or merge operation, the KCL won't start processing the new shards until the processors for the original shards have called `checkpoint` to signal that all processing on the original shards is complete.

If you don't pass a parameter, the KCL assumes that the call to `checkpoint` means that all records have been processed, up to the last record that was passed to the record processor. Therefore, the record processor should call `checkpoint` only after it has processed all the records in the list that was passed to it. Record processors do not need to call `checkpoint` on each call to `processRecords`. A processor could, for example, call `checkpoint` on every third call to `processRecords`. You can optionally specify the exact sequence number of a record as a parameter to `checkpoint`. In this case, the KCL assumes that all records have been processed up to that record only.

In the sample, the private method `checkpoint` shows how to call `IRecordProcessorCheckpointer.checkpoint` using the appropriate exception handling and retry logic.

The KCL relies on `processRecords` to handle any exceptions that arise from processing the data records. If an exception is thrown from `processRecords`, the KCL skips over the data records that were passed before the exception. That is, these records are not re-sent to the record processor that threw the exception or to any other record processor in the consumer.

**shutdown**  
The KCL calls the `shutdown` method either when processing ends (the shutdown reason is `TERMINATE`) or the worker is no longer responding (the shutdown reason is `ZOMBIE`).

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

Processing ends when the record processor does not receive any further records from the shard, because either the shard was split or merged, or the stream was deleted.

The KCL also passes a `IRecordProcessorCheckpointer` interface to `shutdown`. If the shutdown reason is `TERMINATE`, the record processor should finish processing any data records, and then call the `checkpoint` method on this interface.

### Updated interface (Version 2)
<a name="kcl-java-interface-v2"></a>

The updated `IRecordProcessor` interface (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) exposes the following record processor methods that your consumer must implement: 

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

All of the arguments from the original version of the interface are accessible through get methods on the container objects. For example, to retrieve the list of records in `processRecords()`, you can use `processRecordsInput.getRecords()`.

As of version 2 of this interface (KCL 1.5.0 and later), the following new inputs are available in addition to the inputs provided by the original interface:

starting sequence number  
In the `InitializationInput` object passed to the `initialize()` operation, the starting sequence number from which records would be provided to the record processor instance. This is the sequence number that was last checkpointed by the record processor instance previously processing the same shard. This is provided in case your application needs this information. 

pending checkpoint sequence number  
In the `InitializationInput` object passed to the `initialize()` operation, the pending checkpoint sequence number (if any) that could not be committed before the previous record processor instance stopped.

## Implement a class factory for the IRecordProcessor interface
<a name="kinesis-record-processor-implementation-factory-java"></a>

You also need to implement a factory for the class that implements the record processor methods. When your consumer instantiates the worker, it passes a reference to this factory.

The sample implements the factory class in the file `AmazonKinesisApplicationSampleRecordProcessorFactory.java` using the original record processor interface. If you want the class factory to create version 2 record processors, use the package name `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`.

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## Create a worker
<a name="kcl-java-worker"></a>

As discussed in [Implement the IRecordProcessor methods](#kinesis-record-processor-implementation-interface-java), there are two versions of the KCL record processor interface to choose from, which affects how you would create a worker. The original record processor interface uses the following code structure to create a worker:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

With version 2 of the record processor interface, you can use `Worker.Builder` to create a worker without needing to worry about which constructor to use and the order of the arguments. The updated record processor interface uses the following code structure to create a worker:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## Modify the configuration properties
<a name="kinesis-record-processor-initialization-java"></a>

The sample provides default values for configuration properties. This configuration data for the worker is then consolidated in a `KinesisClientLibConfiguration` object. This object and a reference to the class factory for `IRecordProcessor` are passed in the call that instantiates the worker. You can override any of these properties with your own values using a Java properties file (see `AmazonKinesisApplicationSample.java`).

### Application name
<a name="configuration-property-application-name"></a>

The KCL requires an application name that is unique across your applications, and across Amazon DynamoDB tables in the same Region. It uses the application name configuration value in the following ways:
+ All workers associated with this application name are assumed to be working together on the same stream. These workers may be distributed on multiple instances. If you run an additional instance of the same application code, but with a different application name, the KCL treats the second instance as an entirely separate application that is also operating on the same stream.
+ The KCL creates a DynamoDB table with the application name and uses the table to maintain state information (such as checkpoints and worker-shard mapping) for the application. Each application has its own DynamoDB table. For more information, see [Use a lease table to track the shards processed by the KCL consumer application](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Set up credentials
<a name="kinesis-record-processor-cred-java"></a>

You must make your AWS credentials available to one of the credential providers in the default credential providers chain. For example, if you are running your consumer on an EC2 instance, we recommend that you launch the instance with an IAM role. AWS credentials that reflect the permissions associated with this IAM role are made available to applications on the instance through its instance metadata. This is the most secure way to manage credentials for a consumer running on an EC2 instance.

The sample application first attempts to retrieve IAM credentials from instance metadata: 

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

If the sample application cannot obtain credentials from the instance metadata, it attempts to retrieve credentials from a properties file:

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

For more information about instance metadata, see [Instance Metadata](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html) in the *Amazon EC2 User Guide*.

### Use the worker ID for multiple instances
<a name="kinesis-record-processor-workerid-java"></a>

The sample initialization code creates an ID for the worker, `workerId`, using the name of the local computer and appending a globally unique identifier as shown in the following code snippet. This approach supports the scenario of multiple instances of the consumer application running on a single computer.

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## Migrate to Version 2 of the record processor interface
<a name="kcl-java-v2-migration"></a>

If you want to migrate code that uses the original interface, in addition to the steps described previously, the following steps are required:

1. Change your record processor class to import the version 2 record processor interface:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. Change the references to inputs to use `get` methods on the container objects. For example, in the `shutdown()` operation, change "`checkpointer`" to "`shutdownInput.getCheckpointer()`".

1. Change your record processor factory class to import the version 2 record processor factory interface:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. Change the construction of the worker to use `Worker.Builder`. For example:

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# Develop a Kinesis Client Library consumer in Node.js
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

You can use the Kinesis Client Library (KCL) to build applications that process data from your Kinesis data streams. The Kinesis Client Library is available in multiple languages. This topic discusses Node.js.

The KCL is a Java library; support for languages other than Java is provided using a multi-language interface called the *MultiLangDaemon*. This daemon is Java-based and runs in the background when you are using a KCL language other than Java. Therefore, if you install the KCL for Node.js and write your consumer app entirely in Node.js, you still need Java installed on your system because of the MultiLangDaemon. Further, MultiLangDaemon has some default settings you may need to customize for your use case, for example, the AWS Region that it connects to. For more information about the MultiLangDaemon on GitHub, go to the [KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang) page.

To download the Node.js KCL from GitHub, go to [Kinesis Client Library (Node.js)](https://github.com/awslabs/amazon-kinesis-client-nodejs).

**Sample Code Downloads**

There are two code samples available for KCL in Node.js:
+ [basic-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  Used in the following sections to illustrate the fundamentals of building a KCL consumer application in Node.js.
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   Slightly more advanced and uses a real-world scenario, after you have familiarized yourself with the basic sample code. This sample is not discussed here but has a README file with more information.

You must complete the following tasks when implementing a KCL consumer application in Node.js:

**Topics**
+ [

## Implement the record processor
](#kinesis-record-processor-implementation-interface-nodejs)
+ [

## Modify the configuration properties
](#kinesis-record-processor-initialization-nodejs)

## Implement the record processor
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

The simplest possible consumer using the KCL for Node.js must implement a `recordProcessor` function, which in turn contains the functions `initialize`, `processRecords`, and `shutdown`. The sample provides an implementation that you can use as a starting point (see `sample_kcl_app.js`).

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**initialize**  
The KCL calls the `initialize` function when the record processor starts. This record processor processes only the shard ID passed as `initializeInput.shardId`, and typically, the reverse is also true (this shard is processed only by this record processor). However, your consumer should account for the possibility that a data record might be processed more than one time. This is because Kinesis Data Streams has *at least once* semantics, meaning that every data record from a shard is processed at least one time by a worker in your consumer. For more information about cases in which a particular shard might be processed by more than one worker, see [Use resharding, scaling, and parallel processing to change the number of shards](kinesis-record-processor-scaling.md).

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 The KCL calls this function with input that contains a list of data records from the shard specified to the `initialize` function. The record processor that you implement processes the data in these records according to the semantics of your consumer. For example, the worker might perform a transformation on the data and then store the result in an Amazon Simple Storage Service (Amazon S3) bucket. 

```
processRecords: function(processRecordsInput, completeCallback)
```

In addition to the data itself, the record also contains a sequence number and partition key, which the worker can use when processing the data. For example, the worker could choose the S3 bucket in which to store the data based on the value of the partition key. The `record` dictionary exposes the following key-value pairs to access the record's data, sequence number, and partition key:

```
record.data
record.sequenceNumber
record.partitionKey
```

Note that the data is Base64-encoded.

In the basic sample, the function `processRecords` has code that shows how a worker can access the record's data, sequence number, and partition key.

Kinesis Data Streams requires the record processor to keep track of the records that have already been processed in a shard. The KCL takes care of this tracking for with a `checkpointer` object passed as `processRecordsInput.checkpointer`. Your record processor calls the `checkpointer.checkpoint` function to inform the KCL how far it has progressed in processing the records in the shard. In the event that the worker fails, the KCL uses this information when you restart the processing of the shard so that it continues from the last known processed record.

For a split or merge operation, the KCL doesn't start processing the new shards until the processors for the original shards have called `checkpoint` to signal that all processing on the original shards is complete.

If you don't pass the sequence number to the `checkpoint` function, the KCL assumes that the call to `checkpoint` means that all records have been processed, up to the last record that was passed to the record processor. Therefore, the record processor should call `checkpoint` **only** after it has processed all the records in the list that was passed to it. Record processors do not need to call `checkpoint` on each call to `processRecords`. A processor could, for example, call `checkpoint` on every third call, or some event external to your record processor, such as a custom verification/validation service you've implemented. 

You can optionally specify the exact sequence number of a record as a parameter to `checkpoint`. In this case, the KCL assumes that all records have been processed up to that record only.

The basic sample application shows the simplest possible call to the `checkpointer.checkpoint` function. You can add other checkpointing logic you need for your consumer at this point in the function.

**shutdown**  
The KCL calls the `shutdown` function either when processing ends (`shutdownInput.reason` is `TERMINATE`) or the worker is no longer responding (`shutdownInput.reason` is `ZOMBIE`).

```
shutdown: function(shutdownInput, completeCallback)
```

Processing ends when the record processor does not receive any further records from the shard, because either the shard was split or merged, or the stream was deleted.

The KCL also passes a `shutdownInput.checkpointer` object to `shutdown`. If the shutdown reason is `TERMINATE`, you should make sure that the record processor has finished processing any data records, and then call the `checkpoint` function on this interface.

## Modify the configuration properties
<a name="kinesis-record-processor-initialization-nodejs"></a>

The sample provides default values for the configuration properties. You can override any of these properties with your own values (see `sample.properties` in the basic sample).

### Application name
<a name="kinesis-record-processor-application-name-nodejs"></a>

The KCL requires an application that this is unique among your applications, and among Amazon DynamoDB tables in the same Region. It uses the application name configuration value in the following ways:
+ All workers associated with this application name are assumed to be working together on the same stream. These workers may be distributed on multiple instances. If you run an additional instance of the same application code, but with a different application name, the KCL treats the second instance as an entirely separate application that is also operating on the same stream.
+ The KCL creates a DynamoDB table with the application name and uses the table to maintain state information (such as checkpoints and worker-shard mapping) for the application. Each application has its own DynamoDB table. For more information, see [Use a lease table to track the shards processed by the KCL consumer application](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Set up credentials
<a name="kinesis-record-processor-credentials-nodejs"></a>

You must make your AWS credentials available to one of the credential providers in the default credential providers chain. You can you use the `AWSCredentialsProvider` property to set a credentials provider. The `sample.properties` file must make your credentials available to one of the credentials providers in the [default credential providers chain](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). If you are running your consumer on an Amazon EC2 instance, we recommend that you configure the instance with an IAM role. AWS credentials that reflect the permissions associated with this IAM role are made available to applications on the instance through its instance metadata. This is the most secure way to manage credentials for a consumer application running on an EC2 instance.

The following example configures KCL to process a Kinesis data stream named `kclnodejssample` using the record processor supplied in `sample_kcl_app.js`:

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# Develop a Kinesis Client Library consumer in .NET
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

You can use the Kinesis Client Library (KCL) to build applications that process data from your Kinesis data streams. The Kinesis Client Library is available in multiple languages. This topic discusses .NET.

The KCL is a Java library; support for languages other than Java is provided using a multi-language interface called the *MultiLangDaemon*. This daemon is Java-based and runs in the background when you are using a KCL language other than Java. Therefore, if you install the KCL for .NET and write your consumer app entirely in .NET, you still need Java installed on your system because of the MultiLangDaemon. Further, MultiLangDaemon has some default settings you may need to customize for your use case, for example, the AWS Region that it connects to. For more information about the MultiLangDaemon on GitHub, go to the [KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang) page.

To download the .NET KCL from GitHub, go to [Kinesis Client Library (.NET)](https://github.com/awslabs/amazon-kinesis-client-net). To download sample code for a .NET KCL consumer application, go to the [KCL for .NET sample consumer project](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer) page on GitHub.

You must complete the following tasks when implementing a KCL consumer application in .NET:

**Topics**
+ [

## Implement the IRecordProcessor class methods
](#kinesis-record-processor-implementation-interface-dotnet)
+ [

## Modify the configuration properties
](#kinesis-record-processor-initialization-dotnet)

## Implement the IRecordProcessor class methods
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

The consumer must implement the following methods for `IRecordProcessor`. The sample consumer provides implementations that you can use as a starting point (see the `SampleRecordProcessor` class in `SampleConsumer/AmazonKinesisSampleConsumer.cs`).

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**Initialize**  
The KCL calls this method when the record processor is instantiated, passing a specific shard ID in the `input` parameter (`input.ShardId`). This record processor processes only this shard, and typically, the reverse is also true (this shard is processed only by this record processor). However, your consumer should account for the possibility that a data record might be processed more than one time. This is because Kinesis Data Streams has *at least once* semantics, meaning that every data record from a shard is processed at least one time by a worker in your consumer. For more information about cases in which a particular shard might be processed by more than one worker, see [Use resharding, scaling, and parallel processing to change the number of shards](kinesis-record-processor-scaling.md).

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
The KCL calls this method, passing a list of data records in the `input` parameter (`input.Records`) from the shard specified by the `Initialize` method. The record processor that you implement processes the data in these records according to the semantics of your consumer. For example, the worker might perform a transformation on the data and then store the result in an Amazon Simple Storage Service (Amazon S3) bucket.

```
public void ProcessRecords(ProcessRecordsInput input)
```

In addition to the data itself, the record also contains a sequence number and partition key. The worker can use these values when processing the data. For example, the worker could choose the S3 bucket in which to store the data based on the value of the partition key. The `Record` class exposes the following to access the record's data, sequence number, and partition key:

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

In the sample, the method `ProcessRecordsWithRetries` has code that shows how a worker can access the record's data, sequence number, and partition key.

Kinesis Data Streams requires the record processor to keep track of the records that have already been processed in a shard. The KCL takes care of this tracking for you by passing a `Checkpointer` object to `ProcessRecords` (`input.Checkpointer`). The record processor calls the `Checkpointer.Checkpoint` method to inform the KCL of how far it has progressed in processing the records in the shard. If the worker fails, the KCL uses this information to restart the processing of the shard at the last known processed record.

For a split or merge operation, the KCL doesn't start processing the new shards until the processors for the original shards have called `Checkpointer.Checkpoint` to signal that all processing on the original shards is complete.

If you don't pass a parameter, the KCL assumes that the call to `Checkpointer.Checkpoint` signifies that all records have been processed, up to the last record that was passed to the record processor. Therefore, the record processor should call `Checkpointer.Checkpoint` only after it has processed all the records in the list that was passed to it. Record processors do not need to call `Checkpointer.Checkpoint` on each call to `ProcessRecords`. A processor could, for example, call `Checkpointer.Checkpoint` on every third or fourth call. You can optionally specify the exact sequence number of a record as a parameter to `Checkpointer.Checkpoint`. In this case, the KCL assumes that records have been processed only up to that record.

In the sample, the private method `Checkpoint(Checkpointer checkpointer)` shows how to call the `Checkpointer.Checkpoint` method using appropriate exception handling and retry logic.

The KCL for .NET handles exceptions differently from other KCL language libraries in that it does not handle any exceptions that arise from processing the data records. Any uncaught exceptions from user code crashes the program.

**Shutdown**  
The KCL calls the `Shutdown` method either when processing ends (the shutdown reason is `TERMINATE`) or the worker is no longer responding (the shutdown `input.Reason` value is `ZOMBIE`).

```
public void Shutdown(ShutdownInput input)
```

Processing ends when the record processor does not receive any further records from the shard, because the shard was split or merged, or the stream was deleted.

The KCL also passes a `Checkpointer` object to `shutdown`. If the shutdown reason is `TERMINATE`, the record processor should finish processing any data records, and then call the `checkpoint` method on this interface.

## Modify the configuration properties
<a name="kinesis-record-processor-initialization-dotnet"></a>

The sample consumer provides default values for the configuration properties. You can override any of these properties with your own values (see `SampleConsumer/kcl.properties`).

### Application name
<a name="modify-kinesis-record-processor-application-name"></a>

The KCL requires an application that this is unique among your applications, and among Amazon DynamoDB tables in the same Region. It uses the application name configuration value in the following ways:
+ All workers associated with this application name are assumed to be working together on the same stream. These workers may be distributed on multiple instances. If you run an additional instance of the same application code, but with a different application name, the KCL treats the second instance as an entirely separate application that is also operating on the same stream.
+ The KCL creates a DynamoDB table with the application name and uses the table to maintain state information (such as checkpoints and worker-shard mapping) for the application. Each application has its own DynamoDB table. For more information, see [Use a lease table to track the shards processed by the KCL consumer application](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Set up credentials
<a name="kinesis-record-processor-creds-dotnet"></a>

You must make your AWS credentials available to one of the credential providers in the default credential providers chain. You can you use the `AWSCredentialsProvider` property to set a credentials provider. The [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) must make your credentials available to one of the credentials providers in the [default credential providers chain](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). If you are running your consumer application on an EC2 instance, we recommend that you configure the instance with an IAM role. AWS credentials that reflect the permissions associated with this IAM role are made available to applications on the instance through its instance metadata. This is the most secure way to manage credentials for a consumer running on an EC2 instance.

The sample's properties file configures KCL to process a Kinesis data stream called "words" using the record processor supplied in `AmazonKinesisSampleConsumer.cs`. 

# Develop a Kinesis Client Library consumer in Python
<a name="kinesis-record-processor-implementation-app-py"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

You can use the Kinesis Client Library (KCL) to build applications that process data from your Kinesis data streams. The Kinesis Client Library is available in multiple languages. This topic discusses Python.

The KCL is a Java library; support for languages other than Java is provided using a multi-language interface called the *MultiLangDaemon*. This daemon is Java-based and runs in the background when you are using a KCL language other than Java. Therefore, if you install the KCL for Python and write your consumer app entirely in Python, you still need Java installed on your system because of the MultiLangDaemon. Further, MultiLangDaemon has some default settings you may need to customize for your use case, for example, the AWS Region that it connects to. For more information about the MultiLangDaemon on GitHub, go to the [KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang) page.

To download the Python KCL from GitHub, go to [Kinesis Client Library (Python)](https://github.com/awslabs/amazon-kinesis-client-python). To download sample code for a Python KCL consumer application, go to the [KCL for Python sample project](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) page on GitHub.

You must complete the following tasks when implementing a KCL consumer application in Python:

**Topics**
+ [

## Implement the RecordProcessor class methods
](#kinesis-record-processor-implementation-interface-py)
+ [

## Modify the configuration properties
](#kinesis-record-processor-initialization-py)

## Implement the RecordProcessor class methods
<a name="kinesis-record-processor-implementation-interface-py"></a>

The `RecordProcess` class must extend the `RecordProcessorBase` to implement the following methods. The sample provides implementations that you can use as a starting point (see `sample_kclpy_app.py`).

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**initialize**  
The KCL calls the `initialize` method when the record processor is instantiated, passing a specific shard ID as a parameter. This record processor processes only this shard, and typically, the reverse is also true (this shard is processed only by this record processor). However, your consumer should account for the possibility that a data record might be processed more than one time. This is because Kinesis Data Streams has *at least once* semantics, meaning that every data record from a shard is processed at least one time by a worker in your consumer. For more information about cases in which a particular shard may be processed by more than one worker, see [Use resharding, scaling, and parallel processing to change the number of shards](kinesis-record-processor-scaling.md).

```
def initialize(self, shard_id)
```

**process\$1records**  
 The KCL calls this method, passing a list of data record from the shard specified by the `initialize` method. The record processor that you implement processes the data in these records according to the semantics of your consumer. For example, the worker might perform a transformation on the data and then store the result in an Amazon Simple Storage Service (Amazon S3) bucket.

```
def process_records(self, records, checkpointer) 
```

In addition to the data itself, the record also contains a sequence number and partition key. The worker can use these values when processing the data. For example, the worker could choose the S3 bucket in which to store the data based on the value of the partition key. The `record` dictionary exposes the following key-value pairs to access the record's data, sequence number, and partition key:

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

Note that the data is Base64-encoded.

In the sample, the method `process_records` has code that shows how a worker can access the record's data, sequence number, and partition key.

Kinesis Data Streams requires the record processor to keep track of the records that have already been processed in a shard. The KCL takes care of this tracking for you by passing a `Checkpointer` object to `process_records`. The record processor calls the `checkpoint` method on this object to inform the KCL of how far it has progressed in processing the records in the shard. If the worker fails, the KCL uses this information to restart the processing of the shard at the last known processed record.

For a split or merge operation, the KCL doesn't start processing the new shards until the processors for the original shards have called `checkpoint` to signal that all processing on the original shards is complete.

If you don't pass a parameter, the KCL assumes that the call to `checkpoint` means that all records have been processed, up to the last record that was passed to the record processor. Therefore, the record processor should call `checkpoint` only after it has processed all the records in the list that was passed to it. Record processors do not need to call `checkpoint` on each call to `process_records`. A processor could, for example, call `checkpoint` on every third call. You can optionally specify the exact sequence number of a record as a parameter to `checkpoint`. In this case, the KCL assumes that all records have been processed up to that record only.

In the sample, the private method `checkpoint` shows how to call the `Checkpointer.checkpoint` method using appropriate exception handling and retry logic.

The KCL relies on `process_records` to handle any exceptions that arise from processing the data records. If an exception is thrown from `process_records`, the KCL skips over the data records that were passed to `process_records` before the exception. That is, these records are not re-sent to the record processor that threw the exception or to any other record processor in the consumer.

**shutdown**  
 The KCL calls the `shutdown` method either when processing ends (the shutdown reason is `TERMINATE`) or the worker is no longer responding (the shutdown `reason` is `ZOMBIE`).

```
def shutdown(self, checkpointer, reason)
```

Processing ends when the record processor does not receive any further records from the shard, because either the shard was split or merged, or the stream was deleted.

 The KCL also passes a `Checkpointer` object to `shutdown`. If the shutdown `reason` is `TERMINATE`, the record processor should finish processing any data records, and then call the `checkpoint` method on this interface.

## Modify the configuration properties
<a name="kinesis-record-processor-initialization-py"></a>

The sample provides default values for the configuration properties. You can override any of these properties with your own values (see `sample.properties`).

### Application name
<a name="kinesis-record-processor-application-name-py"></a>

The KCL requires an application name that is unique among your applications, and among Amazon DynamoDB tables in the same Region. It uses the application name configuration value in the following ways:
+ All workers that are associated with this application name are assumed to be working together on the same stream. These workers can be distributed on multiple instances. If you run an additional instance of the same application code, but with a different application name, the KCL treats the second instance as an entirely separate application that is also operating on the same stream.
+ The KCL creates a DynamoDB table with the application name and uses the table to maintain state information (such as checkpoints and worker-shard mapping) for the application. Each application has its own DynamoDB table. For more information, see [Use a lease table to track the shards processed by the KCL consumer application](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Set up credentials
<a name="kinesis-record-processor-creds-py"></a>

You must make your AWS credentials available to one of the credential providers in the default credential providers chain. You can you use the `AWSCredentialsProvider` property to set a credentials provider. The [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) must make your credentials available to one of the credentials providers in the [default credential providers chain](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). If you are running your consumer application on an Amazon EC2 instance, we recommend that you configure the instance with an IAM role. AWS credentials that reflect the permissions associated with this IAM role are made available to applications on the instance through its instance metadata. This is the most secure way to manage credentials for a consumer application running on an EC2 instance.

The sample's properties file configures KCL to process a Kinesis data stream called "words" using the record processor supplied in `sample_kclpy_app.py`. 

# Develop a Kinesis Client Library consumer in Ruby
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

You can use the Kinesis Client Library (KCL) to build applications that process data from your Kinesis data streams. The Kinesis Client Library is available in multiple languages. This topic discusses Ruby.

The KCL is a Java library; support for languages other than Java is provided using a multi-language interface called the *MultiLangDaemon*. This daemon is Java-based and runs in the background when you are using a KCL language other than Java. Therefore, if you install the KCL for Ruby and write your consumer app entirely in Ruby, you still need Java installed on your system because of the MultiLangDaemon. Further, MultiLangDaemon has some default settings you may need to customize for your use case, for example, the AWS Region that it connects to. For more information about the MultiLangDaemon on GitHub, go to the [KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang) page.

To download the Ruby KCL from GitHub, go to [Kinesis Client Library (Ruby)](https://github.com/awslabs/amazon-kinesis-client-ruby). To download sample code for a Ruby KCL consumer application, go to the [KCL for Ruby sample project](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples) page on GitHub.

For more information about the KCL Ruby support library, see [KCL Ruby Gems Documentation](http://www.rubydoc.info/gems/aws-kclrb).

# Develop KCL 2.x Consumers
<a name="developing-consumers-with-kcl-v2"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

This topic shows you how to use version 2.0 of the Kinesis Client Library (KCL). 

For more information about the KCL, see the overview provided in [Developing Consumers Using the Kinesis Client Library 1.x](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html).

Choose from the following topics depending on the option you want to use.

**Topics**
+ [

# Develop a Kinesis Client Library consumer in Java
](kcl2-standard-consumer-java-example.md)
+ [

# Develop a Kinesis Client Library consumer in Python
](kcl2-standard-consumer-python-example.md)
+ [

# Develop enhanced fan-out consumers with KCL 2.x
](building-enhanced-consumers-kcl-retired.md)

# Develop a Kinesis Client Library consumer in Java
<a name="kcl2-standard-consumer-java-example"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

The following code shows an example implementation in Java of `ProcessorFactory` and `RecordProcessor`. If you want to take advantage of the enhanced fan-out feature, see [Using Consumers with Enhanced Fan-Out ](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html).

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Develop a Kinesis Client Library consumer in Python
<a name="kcl2-standard-consumer-python-example"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

You can use the Kinesis Client Library (KCL) to build applications that process data from your Kinesis data streams. The Kinesis Client Library is available in multiple languages. This topic discusses Python.

The KCL is a Java library; support for languages other than Java is provided using a multi-language interface called the *MultiLangDaemon*. This daemon is Java-based and runs in the background when you are using a KCL language other than Java. Therefore, if you install the KCL for Python and write your consumer app entirely in Python, you still need Java installed on your system because of the MultiLangDaemon. Further, MultiLangDaemon has some default settings you may need to customize for your use case, for example, the AWS Region that it connects to. For more information about the MultiLangDaemon on GitHub, go to the [KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang) page.

To download the Python KCL from GitHub, go to [Kinesis Client Library (Python)](https://github.com/awslabs/amazon-kinesis-client-python). To download sample code for a Python KCL consumer application, go to the [KCL for Python sample project](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) page on GitHub.

You must complete the following tasks when implementing a KCL consumer application in Python:

**Topics**
+ [

## Implement the RecordProcessor class methods
](#kinesis-record-processor-implementation-interface-py)
+ [

## Modify the configuration properties
](#kinesis-record-processor-initialization-py)

## Implement the RecordProcessor class methods
<a name="kinesis-record-processor-implementation-interface-py"></a>

The `RecordProcess` class must extend the `RecordProcessorBase` class to implement the following methods:

```
initialize
process_records
shutdown_requested
```

This sample provides implementations that you can use as a starting point.

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## Modify the configuration properties
<a name="kinesis-record-processor-initialization-py"></a>

The sample provides default values for the configuration properties, as shown in the following script. You can override any of these properties with your own values.

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### Application name
<a name="kinesis-record-processor-application-name-py"></a>

The KCL requires an application name that is unique among your applications and among Amazon DynamoDB tables in the same Region. It uses the application name configuration value in the following ways:
+ All workers that are associated with this application name are assumed to be working together on the same stream. These workers can be distributed across multiple instances. If you run an additional instance of the same application code, but with a different application name, the KCL treats the second instance as an entirely separate application that is also operating on the same stream.
+ The KCL creates a DynamoDB table with the application name and uses the table to maintain state information (such as checkpoints and worker-shard mapping) for the application. Each application has its own DynamoDB table. For more information, see [Use a lease table to track the shards processed by the KCL consumer application](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Credentials
<a name="kinesis-record-processor-creds-py"></a>

You must make your AWS credentials available to one of the credential providers in the [default credential providers chain](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). You can you use the `AWSCredentialsProvider` property to set a credentials provider. If you run your consumer application on an Amazon EC2 instance, we recommend that you configure the instance with an IAM role. AWS credentials that reflect the permissions associated with this IAM role are made available to applications on the instance through its instance metadata. This is the most secure way to manage credentials for a consumer application running on an EC2 instance.

# Develop enhanced fan-out consumers with KCL 2.x
<a name="building-enhanced-consumers-kcl-retired"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

Consumers that use *enhanced fan-out* in Amazon Kinesis Data Streams can receive records from a data stream with dedicated throughput of up to 2 MB of data per second per shard. This type of consumer doesn't have to contend with other consumers that are receiving data from the stream. For more information, see [Develop enhanced fan-out consumers with dedicated throughput](enhanced-consumers.md).

You can use version 2.0 or later of the Kinesis Client Library (KCL) to develop applications that use enhanced fan-out to receive data from streams. The KCL automatically subscribes your application to all the shards of a stream, and ensures that your consumer application can read with a throughput value of 2 MB/sec per shard. If you want to use the KCL without turning on enhanced fan-out, see [Developing Consumers Using the Kinesis Client Library 2.0](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html).

**Topics**
+ [

# Develop enhanced fan-out consumers using KCL 2.x in Java
](building-enhanced-consumers-kcl-java.md)

# Develop enhanced fan-out consumers using KCL 2.x in Java
<a name="building-enhanced-consumers-kcl-java"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

You can use version 2.0 or later of the Kinesis Client Library (KCL) to develop applications in Amazon Kinesis Data Streams to receive data from streams using enhanced fan-out. The following code shows an example implementation in Java of `ProcessorFactory` and `RecordProcessor`.

It is recommended that you use `KinesisClientUtil` to create `KinesisAsyncClient` and to configure `maxConcurrency` in `KinesisAsyncClient`.

**Important**  
The Amazon Kinesis Client might see significantly increased latency, unless you configure `KinesisAsyncClient` to have a `maxConcurrency` high enough to allow all leases plus additional usages of `KinesisAsyncClient`.

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Migrate consumers from KCL 1.x to KCL 2.x
<a name="kcl-migration"></a>

**Important**  
Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We **strongly recommend** that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see [Amazon Kinesis Client Library page on GitHub](https://github.com/awslabs/amazon-kinesis-client). For information about the latest KCL versions, see [Use Kinesis Client Library](kcl.md). For information about migrating from KCL 1.x to KCL 3.x, see [Migrating from KCL 1.x to KCL 3.x](kcl-migration-1-3.md).

This topic explains the differences between versions 1.x and 2.x of the Kinesis Client Library (KCL). It also shows you how to migrate your consumer from version 1.x to version 2.x of the KCL. After you migrate your client, it will start processing records from the last checkpointed location.

Version 2.0 of the KCL introduces the following interface changes:


**KCL Interface Changes**  

| KCL 1.x Interface | KCL 2.0 Interface | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | Folded into software.amazon.kinesis.processor.ShardRecordProcessor | 

**Topics**
+ [

## Migrate the record processor
](#recrod-processor-migration)
+ [

## Migrate the record processor factory
](#recrod-processor-factory-migration)
+ [

## Migrate the worker
](#worker-migration)
+ [

## Configure the Amazon Kinesis client
](#client-configuration)
+ [

## Idle time removal
](#idle-time-removal)
+ [

## Client configuration removals
](#client-configuration-removals)

## Migrate the record processor
<a name="recrod-processor-migration"></a>

The following example shows a record processor implemented for KCL 1.x:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**To migrate the record processor class**

1. Change the interfaces from `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` and `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` to `software.amazon.kinesis.processor.ShardRecordProcessor`, as follows:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. Update the `import` statements for the `initialize` and `processRecords` methods.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. Replace the `shutdown` method with the following new methods: `leaseLost`, `shardEnded`, and `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

The following is the updated version of the record processor class.

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## Migrate the record processor factory
<a name="recrod-processor-factory-migration"></a>

The record processor factory is responsible for creating record processors when a lease is acquired. The following is an example of a KCL 1.x factory.

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**To migrate the record processor factory**

1. Change the implemented interface from `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` to `software.amazon.kinesis.processor.ShardRecordProcessorFactory`, as follows.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. Change the return signature for `createProcessor`.

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

The following is an example of the record processor factory in 2.0:

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## Migrate the worker
<a name="worker-migration"></a>

In version 2.0 of the KCL, a new class, called `Scheduler`, replaces the `Worker` class. The following is an example of a KCL 1.x worker.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**To migrate the worker**

1. Change the `import` statement for the `Worker` class to the import statements for the `Scheduler` and `ConfigsBuilder` classes.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Create the `ConfigsBuilder` and a `Scheduler` as shown in the following example.

   It is recommended that you use `KinesisClientUtil` to create `KinesisAsyncClient` and to configure `maxConcurrency` in `KinesisAsyncClient`.
**Important**  
The Amazon Kinesis Client might see significantly increased latency, unless you configure `KinesisAsyncClient` to have a `maxConcurrency` high enough to allow all leases plus additional usages of `KinesisAsyncClient`.

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## Configure the Amazon Kinesis client
<a name="client-configuration"></a>

With the 2.0 release of the Kinesis Client Library, the configuration of the client moved from a single configuration class (`KinesisClientLibConfiguration`) to six configuration classes. The following table describes the migration.


**Configuration Fields and Their New Classes**  

| Original Field | New Configuration Class | Description | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | The name for this the KCL application. Used as the default for the tableName and consumerName. | 
| tableName | ConfigsBuilder | Allows overriding the table name used for the Amazon DynamoDB lease table. | 
| streamName | ConfigsBuilder | The name of the stream that this application processes records from. | 
| kinesisEndpoint | ConfigsBuilder | This option has been removed. See Client Configuration Removals. | 
| dynamoDBEndpoint | ConfigsBuilder | This option has been removed. See Client Configuration Removals. | 
| initialPositionInStreamExtended | RetrievalConfig | The location in the shard from which the KCL begins fetching records, starting with the application's initial run. | 
| kinesisCredentialsProvider | ConfigsBuilder | This option has been removed. See Client Configuration Removals. | 
| dynamoDBCredentialsProvider | ConfigsBuilder | This option has been removed. See Client Configuration Removals. | 
| cloudWatchCredentialsProvider | ConfigsBuilder | This option has been removed. See Client Configuration Removals. | 
| failoverTimeMillis | LeaseManagementConfig | The number of milliseconds that must pass before you can consider a lease owner to have failed. | 
| workerIdentifier | ConfigsBuilder | A unique identifier that represents this instantiation of the application processor. This must be unique. | 
| shardSyncIntervalMillis | LeaseManagementConfig | The time between shard sync calls. | 
| maxRecords | PollingConfig | Allows setting the maximum number of records that Kinesis returns. | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | This option has been removed. See Idle Time Removal. | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | When set, the record processor is called even when no records were provided from Kinesis. | 
| parentShardPollIntervalMillis | CoordinatorConfig | How often a record processor should poll to see if the parent shard has been completed. | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | When set, leases are removed as soon as the child leases have started processing. | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | When set, child shards that have an open shard are ignored. This is primarily for DynamoDB Streams. | 
| kinesisClientConfig | ConfigsBuilder | This option has been removed. See Client Configuration Removals. | 
| dynamoDBClientConfig | ConfigsBuilder | This option has been removed. See Client Configuration Removals. | 
| cloudWatchClientConfig | ConfigsBuilder | This option has been removed. See Client Configuration Removals. | 
| taskBackoffTimeMillis | LifecycleConfig | The time to wait to retry failed tasks. | 
| metricsBufferTimeMillis | MetricsConfig | Controls CloudWatch metric publishing. | 
| metricsMaxQueueSize | MetricsConfig | Controls CloudWatch metric publishing. | 
| metricsLevel | MetricsConfig | Controls CloudWatch metric publishing. | 
| metricsEnabledDimensions | MetricsConfig | Controls CloudWatch metric publishing. | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | This option has been removed. See Checkpoint Sequence Number Validation. | 
| regionName | ConfigsBuilder | This option has been removed. See Client Configuration Removal. | 
| maxLeasesForWorker | LeaseManagementConfig | The maximum number of leases a single instance of the application should accept. | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | The maximum number of leases an application should attempt to steal at one time. | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | The DynamoDB read IOPs that is used if the Kinesis Client Library needs to create a new DynamoDB lease table. | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | The DynamoDB read IOPs that is used if the Kinesis Client Library needs to create a new DynamoDB lease table. | 
| initialPositionInStreamExtended | LeaseManagementConfig | The initial position in the stream that the application should start at. This is only used during initial lease creation. | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | Disable synchronizing shard data if the lease table contains existing leases. TODO: KinesisEco-438 | 
| shardPrioritization | CoordinatorConfig | Which shard prioritization to use. | 
| shutdownGraceMillis | N/A | This option has been removed. See MultiLang Removals. | 
| timeoutInSeconds | N/A | This option has been removed. See MultiLang Removals. | 
| retryGetRecordsInSeconds | PollingConfig | Configures the delay between GetRecords attempts for failures. | 
| maxGetRecordsThreadPool | PollingConfig | The thread pool size used for GetRecords. | 
| maxLeaseRenewalThreads | LeaseManagementConfig | Controls the size of the lease renewer thread pool. The more leases that your application could take, the larger this pool should be. | 
| recordsFetcherFactory | PollingConfig | Allows replacing the factory used to create fetchers that retrieve from streams. | 
| logWarningForTaskAfterMillis | LifecycleConfig | How long to wait before a warning is logged if a task hasn't completed. | 
| listShardsBackoffTimeInMillis | RetrievalConfig | The number of milliseconds to wait between calls to ListShards when failures occur. | 
| maxListShardsRetryAttempts | RetrievalConfig | The maximum number of times that ListShards retries before giving up. | 

## Idle time removal
<a name="idle-time-removal"></a>

In the 1.x version of the KCL, the `idleTimeBetweenReadsInMillis` corresponded to two quantities: 
+ The amount of time between task dispatching checks. You can now configure this time between tasks by setting `CoordinatorConfig#shardConsumerDispatchPollIntervalMillis`.
+ The amount of time to sleep when no records were returned from Kinesis Data Streams. In version 2.0, in enhanced fan-out records are pushed from their respective retriever. Activity on the shard consumer only occurs when a pushed request arrives. 

## Client configuration removals
<a name="client-configuration-removals"></a>

In version 2.0, the KCL no longer creates clients. It depends on the user to supply a valid client. With this change, all configuration parameters that controlled client creation have been removed. If you need these parameters, you can set them on the clients before providing the clients to `ConfigsBuilder`.


****  

| Removed Field | Equivalent Configuration | 
| --- | --- | 
| kinesisEndpoint | Configure the SDK KinesisAsyncClient with preferred endpoint: KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build(). | 
| dynamoDBEndpoint | Configure the SDK DynamoDbAsyncClient with preferred endpoint: DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build(). | 
| kinesisClientConfig | Configure the SDK KinesisAsyncClient with the needed configuration: KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| dynamoDBClientConfig | Configure the SDK DynamoDbAsyncClient with the needed configuration: DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| cloudWatchClientConfig | Configure the SDK CloudWatchAsyncClient with the needed configuration: CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| regionName | Configure the SDK with the preferred Region. This is the same for all SDK clients. For example, KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build(). | 

# Develop consumers with the AWS SDK for Java
<a name="develop-consumers-sdk"></a>

 You can develop custom consumers using the Amazon Kinesis Data Streams APIs. This section describes using the Kinesis Data Streams APIs with the AWS SDK for Java.

**Important**  
The recommended method for developing custom Kinesis Data Streams consumers with shared throughout is to use the Kinesis Client Library (KCL). KCL helps you consume and process data from a Kinesis data stream by taking care of many of the complex tasks associated with distributed computing. For more information, see [Develop consumers with KCL in Java](develop-kcl-consumers-java.md).

**Topics**
+ [

# Develop shared-throughput consumers with the AWS SDK for Java
](developing-consumers-with-sdk.md)
+ [

# Develop enhanced fan-out consumers with the AWS SDK for Java
](building-enhanced-consumers-api.md)
+ [

# Interact with data using the AWS Glue Schema Registry
](building-enhanced-consumers-glue-schema-registry.md)

# Develop shared-throughput consumers with the AWS SDK for Java
<a name="developing-consumers-with-sdk"></a>

One of the methods for developing custom Kinesis Data Streams consumers with shared throughout is to use the Amazon Kinesis Data Streams APIs with the AWS SDK for Java. This section describes using the Kinesis Data Streams APIs with the AWS SDK for Java. You can call the Kinesis Data Streams APIs using other different programming languages. For more information about all available AWS SDKs, see [Start Developing with Amazon Web Services](https://aws.amazon.com/developers/getting-started/). 

The Java sample code in this section demonstrates how to perform basic Kinesis Data Streams API operations, and is divided up logically by operation type. These examples don't represent production-ready code. They don't check for all possible exceptions or account for all possible security or performance considerations. 

**Topics**
+ [

## Get data from a stream
](#kinesis-using-sdk-java-get-data)
+ [

## Use shard iterators
](#kinesis-using-sdk-java-get-data-shard-iterators)
+ [

## Use GetRecords
](#kinesis-using-sdk-java-get-data-getrecords)
+ [

## Adapt to a reshard
](#kinesis-using-sdk-java-get-data-reshard)

## Get data from a stream
<a name="kinesis-using-sdk-java-get-data"></a>

The Kinesis Data Streams APIs include the `getShardIterator` and `getRecords` methods that you can invoke to retrieve records from a data stream. This is the pull model, where your code draws data records directly from the shards of the data stream.

**Important**  
We recommend that you use the record processor support provided by KCL to retrieve records from your data streams. This is the push model, where you implement the code that processes the data. The KCL retrieves data records from the data stream and delivers them to your application code. In addition, the KCL provides failover, recovery, and load balancing functionality. For more information, see [Developing Custom Consumers with Shared Throughput Using KCL](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html).

However, in some cases you might prefer to use the Kinesis Data Streams APIs. For example, to implement custom tools for monitoring or debugging your data streams.

**Important**  
Kinesis Data Streams supports changes to the data record retention period of your data stream. For more information, see [Change the data retention period](kinesis-extended-retention.md).

## Use shard iterators
<a name="kinesis-using-sdk-java-get-data-shard-iterators"></a>

You retrieve records from the stream on a per-shard basis. For each shard, and for each batch of records that you retrieve from that shard, you must obtain a *shard iterator*. The shard iterator is used in the `getRecordsRequest` object to specify the shard from which records are to be retrieved. The type associated with the shard iterator determines the point in the shard from which the records should be retrieved (see later in this section for more details). Before you can work with the shard iterator, you must retrieve the shard. For more information, see [List shards](kinesis-using-sdk-java-list-shards.md).

Obtain the initial shard iterator using the `getShardIterator` method. Obtain shard iterators for additional batches of records using the `getNextShardIterator` method of the `getRecordsResult` object returned by the `getRecords` method. A shard iterator is valid for 5 minutes. If you use a shard iterator while it is valid, you get a new one. Each shard iterator remains valid for 5 minutes, even after it is used.

To obtain the initial shard iterator, instantiate `GetShardIteratorRequest` and pass it to the `getShardIterator` method. To configure the request, specify the stream and the shard ID. For information about how to obtain the streams in your AWS account, see [List streams](kinesis-using-sdk-java-list-streams.md). For information about how to obtain the shards in a stream, see [List shards](kinesis-using-sdk-java-list-shards.md).

```
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(myStreamName);
getShardIteratorRequest.setShardId(shard.getShardId());
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");

GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
```

This sample code specifies `TRIM_HORIZON` as the iterator type when obtaining the initial shard iterator. This iterator type means that records should be returned beginning with the first record added to the shard—rather than beginning with the most recently added record, also known as the *tip*. The following are possible iterator types:
+ `AT_SEQUENCE_NUMBER`
+ `AFTER_SEQUENCE_NUMBER`
+ `AT_TIMESTAMP`
+ `TRIM_HORIZON`
+ `LATEST`

For more information, see [ShardIteratorType](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType).

Some iterator types require that you specify a sequence number in addition to the type; for example:

```
getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER");
getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);
```

After you obtain a record using `getRecords`, you can get the sequence number for the record by calling the record's `getSequenceNumber` method. 

```
record.getSequenceNumber()
```

In addition, the code that adds records to the data stream can get the sequence number for an added record by calling `getSequenceNumber` on the result of `putRecord`. 

```
lastSequenceNumber = putRecordResult.getSequenceNumber();
```

You can use sequence numbers to guarantee strictly increasing ordering of records. For more information, see the code example in [PutRecord example](developing-producers-with-sdk.md#kinesis-using-sdk-java-putrecord-example).

## Use GetRecords
<a name="kinesis-using-sdk-java-get-data-getrecords"></a>

After you obtain the shard iterator, instantiate a `GetRecordsRequest` object. Specify the iterator for the request using the `setShardIterator` method. 

Optionally, you can also set the number of records to retrieve using the `setLimit` method. The number of records returned by `getRecords` is always equal to or less than this limit. If you do not specify this limit, `getRecords` returns 10 MB of retrieved records. The sample code below sets this limit to 25 records.

If no records are returned, that means no data records are currently available from this shard at the sequence number referenced by the shard iterator. In this situation, your application should wait for an amount of time that's appropriate for the data sources for the stream. Then try to get data from the shard again using the shard iterator returned by the preceding call to `getRecords`. 

Pass the `getRecordsRequest` to the `getRecords` method, and capture the returned value as a `getRecordsResult` object. To get the data records, call the `getRecords` method on the `getRecordsResult` object. 

```
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(25);

GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
```

To prepare for another call to `getRecords`, obtain the next shard iterator from `getRecordsResult`. 

```
shardIterator = getRecordsResult.getNextShardIterator();
```

For best results, sleep for at least 1 second (1,000 milliseconds) between calls to `getRecords` to avoid exceeding the limit on `getRecords` frequency. 

```
try {
  Thread.sleep(1000);
}
catch (InterruptedException e) {}
```

Typically, you should call `getRecords` in a loop, even when you're retrieving a single record in a test scenario. A single call to `getRecords` might return an empty record list, even when the shard contains more records at later sequence numbers. When this occurs, the `NextShardIterator` returned along with the empty record list references a later sequence number in the shard, and successive `getRecords` calls eventually returns the records. The following sample demonstrates the use of a loop.

**Example: getRecords**  
The following code example reflects the `getRecords` tips in this section, including making calls in a loop.

```
// Continuously read data records from a shard
List<Record> records;
    
while (true) {
   
  // Create a new getRecordsRequest with an existing shardIterator 
  // Set the maximum records to return to 25
  
  GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
  getRecordsRequest.setShardIterator(shardIterator);
  getRecordsRequest.setLimit(25); 

  GetRecordsResult result = client.getRecords(getRecordsRequest);
  
  // Put the result into record list. The result can be empty.
  records = result.getRecords();
  
  try {
    Thread.sleep(1000);
  } 
  catch (InterruptedException exception) {
    throw new RuntimeException(exception);
  }
  
  shardIterator = result.getNextShardIterator();
}
```

If you are using the Kinesis Client Library, it might make multiple calls before returning data. This behavior is by design and does not indicate a problem with the KCL or your data.

## Adapt to a reshard
<a name="kinesis-using-sdk-java-get-data-reshard"></a>

 If `getRecordsResult.getNextShardIterator` returns `null`, it indicates that a shard split or merge has occurred that involved this shard. This shard is now in a `CLOSED` state and you have read all available data records from this shard. 

 In this scenario, you can use `getRecordsResult.childShards` to learn about the new child shards of the shard that is being processed that were created by the split or merge. For more information, see [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).

 In the case of a split, the two new shards both have `parentShardId` equal to the shard ID of the shard that you were processing previously. The value of `adjacentParentShardId` for both of these shards is `null`. 

 In the case of a merge, the single new shard created by the merge has `parentShardId` equal to shard ID of one of the parent shards and `adjacentParentShardId` equal to the shard ID of the other parent shard. Your application has already read all the data from one of these shards. This is the shard for which `getRecordsResult.getNextShardIterator` returned `null`. If the order of the data is important to your application, ensure that it also reads all the data from the other parent shard before reading any new data from the child shard created by the merge. 

 If you are using multiple processors to retrieve data from the stream (say, one processor per shard), and a shard split or merge occurs, adjust the number of processors up or down to adapt to the change in the number of shards. 

 For more information about resharding, including a discussion of shards states—such as `CLOSED`—see [Reshard a stream](kinesis-using-sdk-java-resharding.md). 

# Develop enhanced fan-out consumers with the AWS SDK for Java
<a name="building-enhanced-consumers-api"></a>

*Enhanced fan-out* is an Amazon Kinesis Data Streams feature that enables consumers to receive records from a data stream with dedicated throughput of up to 2 MB of data per second per shard. A consumer that uses enhanced fan-out doesn't have to contend with other consumers that are receiving data from the stream. For more information, see [Develop enhanced fan-out consumers with dedicated throughput](enhanced-consumers.md).

You can use API operations to build a consumer that uses enhanced fan-out in Kinesis Data Streams.

**To register a consumer with enhanced fan-out using the Kinesis Data Streams API**

1. Call [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html) to register your application as a consumer that uses enhanced fan-out. Kinesis Data Streams generates an Amazon Resource Name (ARN) for the consumer and returns it in the response.

1. To start listening to a specific shard, pass the consumer ARN in a call to [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html). Kinesis Data Streams then starts pushing the records from that shard to you, in the form of events of type [SubscribeToShardEvent](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html) over an HTTP/2 connection. The connection remains open for up to 5 minutes. Call [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) again if you want to continue receiving records from the shard after the `future` that is returned by the call to [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) completes normally or exceptionally.
**Note**  
`SubscribeToShard` API also returns the list of the child shards of the current shard when the end of the current shard is reached. 

1. To deregister a consumer that is using enhanced fan-out, call [DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html).

The following code is an example of how you can subscribe your consumer to a shard, renew the subscription periodically, and handle the events.

```
    import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
    import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
     
    import java.util.concurrent.CompletableFuture;
     
    /**
     * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java
     * for complete code and more examples.
     */
    public class SubscribeToShardSimpleImpl {
     
        private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737";
        private static final String SHARD_ID = "shardId-000000000000";
     
        public static void main(String[] args) {
     
            KinesisAsyncClient client = KinesisAsyncClient.create();
     
            SubscribeToShardRequest request = SubscribeToShardRequest.builder()
                    .consumerARN(CONSUMER_ARN)
                    .shardId(SHARD_ID)
                    .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
     
            // Call SubscribeToShard iteratively to renew the subscription periodically.
            while(true) {
                // Wait for the CompletableFuture to complete normally or exceptionally.
                callSubscribeToShardWithVisitor(client, request).join();
            }
     
            // Close the connection before exiting.
            // client.close();
        }
     
     
        /**
         * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface.
         */
        private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) {
            SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() {
                @Override
                public void visit(SubscribeToShardEvent event) {
                    System.out.println("Received subscribe to shard event " + event);
                }
            };
            SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                    .builder()
                    .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                    .subscriber(visitor)
                    .build();
            return client.subscribeToShard(request, responseHandler);
        }
    }
```

 If `event.ContinuationSequenceNumber` returns `null`, it indicates that a shard split or merge has occurred that involved this shard. This shard is now in a `CLOSED` state, and you have read all available data records from this shard. In this scenario, per example above, you can use `event.childShards` to learn about the new child shards of the shard that is being processed that were created by the split or merge. For more information, see [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).

# Interact with data using the AWS Glue Schema Registry
<a name="building-enhanced-consumers-glue-schema-registry"></a>

You can integrate your Kinesis data streams with the AWS Glue Schema Registry. The AWS Glue Schema Registry allows you to centrally discover, control, and evolve schemas, while ensuring data produced is continuously validated by a registered schema. A schema defines the structure and format of a data record. A schema is a versioned specification for reliable data publication, consumption, or storage. The AWS Glue Schema Registry enables you to improve end-to-end data quality and data governance within your streaming applications. For more information, see [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). One of the ways to set up this integration is through the `GetRecords` Kinesis Data Streams API available in the AWS Java SDK. 

For detailed instructions on how to set up integration of Kinesis Data Streams with Schema Registry using the `GetRecords` Kinesis Data Streams APIs, see the "Interacting with Data Using the Kinesis Data Streams APIs" section in [Use Case: Integrating Amazon Kinesis Data Streams with the AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds).

# Develop consumers using AWS Lambda
<a name="lambda-consumer"></a>

You can use an AWS Lambda function to process records in a data stream. AWS Lambda is a compute service that lets you run code without provisioning or managing servers. It executes your code only when needed and scales automatically, from a few requests per day to thousands per second. You pay only for the compute time you consume. There is no charge when your code is not running. With AWS Lambda, you can run code for virtually any type of application or backend service, all with zero administration. It runs your code on a high-availability compute infrastructure and performs all of the administration of the compute resources, including server and operating system maintenance, capacity provisioning and automatic scaling, code monitoring and logging. For more information, see [Using AWS Lambda with Amazon Kinesis](/lambda/latest/dg/with-kinesis.html).

For troubleshooting information, see [Why is Kinesis Data Streams trigger unable to invoke my Lambda function?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-streams-lambda-invocation/)

# Develop consumers using Amazon Managed Service for Apache Flink
<a name="kda-consumer"></a>

You can use an Amazon Managed Service for Apache Flink application to process and analyze data in a Kinesis stream using SQL, Java, or Scala. Managed Service for Apache Flink applications can enrich data using reference sources, aggregate data over time, or use machine learning to find data anomalies. Then you can write the analysis results to another Kinesis stream, a Firehose delivery stream, or a Lambda function. For more information, see the [Managed Service for Apache Flink Developer Guide for SQL Applications](/kinesisanalytics/latest/dev/what-is.html) or the [Managed Service for Apache Flink Developer Guide for Flink Applications](/kinesisanalytics/latest/java/what-is.html).

# Develop consumers using Amazon Data Firehose
<a name="kdf-consumer"></a>

You can use a Firehose to read and process records from a Kinesis stream. Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3, Amazon Redshift, Amazon OpenSearch Service, and Splunk. Firehose also supports any custom HTTP endpoint or HTTP endpoints owned by supported third-party service providers, including Datadog, MongoDB, and New Relic. You can also configure Firehose to transform your data records and to convert the record format before delivering your data to its destination. For more information, see [Writing to Firehose Using Kinesis Data Streams](/firehose/latest/dev/writing-with-kinesis-streams.html).

# Read data from Kinesis Data Streams using other AWS services
<a name="using-other-services-read"></a>

The following AWS services can directly integrate with Amazon Kinesis Data Streams to read data from Kinesis data streams. Review the information for each service that you are interested in and refer to the provided references. 

**Topics**
+ [

# Read data from Kinesis Data Streams using Amazon EMR
](using-other-services-emr.md)
+ [

# Read data from Kinesis Data Streams using Amazon EventBridge Pipes
](using-other-services-ev-pipes.md)
+ [

# Read data from Kinesis Data Streams using AWS Glue
](using-other-services-glue.md)
+ [

# Read data from Kinesis Data Streams using Amazon Redshift
](using-other-services-redshift.md)

# Read data from Kinesis Data Streams using Amazon EMR
<a name="using-other-services-emr"></a>

Amazon EMR clusters can read and process Kinesis streams directly, using familiar tools in the Hadoop ecosystem such as Hive, Pig, MapReduce, the Hadoop Streaming API, and Cascading. You can also join real-time data from Kinesis Data Streams with existing data on Amazon S3, Amazon DynamoDB, and HDFS in a running cluster. You can directly load the data from Amazon EMR to Amazon S3 or DynamoDB for post-processing activities.

For more information, see [Amazon Kinesis](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-kinesis.html) in the *Amazon EMR Release Guide*. 

# Read data from Kinesis Data Streams using Amazon EventBridge Pipes
<a name="using-other-services-ev-pipes"></a>

Amazon EventBridge Pipes supports Kinesis data streams as a source. Amazon EventBridge Pipes helps you create point-to-point integrations between event producers and consumers with optional transform, filter and enrich steps. You can use EventBridge Pipes to receive records in a Kinesis data stream and optionally filter or enhance these records before sending them to one of the available destinations for processing, including Kinesis Data Streams. 

For more information, see [Amazon Kinesis stream as a source](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kinesis.html) in the *Amazon EventBridge Release Guide*. 

# Read data from Kinesis Data Streams using AWS Glue
<a name="using-other-services-glue"></a>

Using AWS Glue streaming ETL, you can create streaming extract, transform, and load (ETL) jobs that run continuously and consume data from Amazon Kinesis Data Streams. The jobs cleanse and transform the data, and then load the results into Amazon S3 data lakes or JDBC data stores.

For more information, see [Streaming ETL jobs in AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/add-job-streaming.html) in the *AWS Glue Release Guide*. 

# Read data from Kinesis Data Streams using Amazon Redshift
<a name="using-other-services-redshift"></a>

Amazon Redshift supports streaming ingestion from Amazon Kinesis Data Streams. The Amazon Redshift streaming ingestion feature provides low-latency, high-speed ingestion of streaming data from Amazon Kinesis Data Streams into an Amazon Redshift materialized view. Amazon Redshift streaming ingestion removes the need to stage data in Amazon S3before ingesting into Amazon Redshift. 

For more information, see [Streaming ingestion](https://docs.aws.amazon.com/redshift/latest/dg/materialized-view-streaming-ingestion.html) in the *Amazon Redshift Release Guide*. 

 

# Read from Kinesis Data Streams using third-party integrations
<a name="using-services-third-party-read"></a>

You can read data from Amazon Kinesis Data Streams data streams using one of the following third-party options that integrate with Kinesis Data Streams. Select the option you want to learn more about and find resources and links to relevant documentation.

**Topics**
+ [

# Apache Flink
](using-other-services-read-flink.md)
+ [

# Adobe Experience Platform
](using-other-services-read-adobe.md)
+ [

# Apache Druid
](using-other-services-read-druid.md)
+ [

# Apache Spark
](using-other-services-read-spark.md)
+ [

# Databricks
](using-other-services-read-databricks.md)
+ [

# Kafka Confluent Platform
](using-other-services-read-kafka.md)
+ [

# Kinesumer
](using-other-services-read-kinesumer.md)
+ [

# Talend
](using-other-services-read-talend.md)

# Apache Flink
<a name="using-other-services-read-flink"></a>

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. For more information on consuming Kinesis Data Streams using Apache Flink, see [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kinesis/). 

# Adobe Experience Platform
<a name="using-other-services-read-adobe"></a>

Adobe Experience Platform enables organizations to centralize and standardize customer data from any system. It then applies data science and machine learning to dramatically improve the design and delivery of rich, personalized experiences. For more information on consuming Kinesis data streams using the Adobe Experience Platform, see [Amazon Kinesis connector](https://experienceleague.adobe.com/docs/experience-platform/sources/connectors/cloud-storage/kinesis.html). 

# Apache Druid
<a name="using-other-services-read-druid"></a>

Druid is a high performance, real-time analytics database that delivers sub-second queries on streaming and batch data at scale and under load. For more information on ingesting Kinesis data streams using Apache Druid, see [Amazon Kinesis ingestion](https://druid.apache.org/docs/latest/development/extensions-core/kinesis-ingestion.html). 

# Apache Spark
<a name="using-other-services-read-spark"></a>

Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. You can use Apache Spark to build stream processing applications that consume the data in your Kinesis data streams. 

To consume Kinesis data streams using Apache Spark Structured Streaming, use the Amazon Kinesis Data Streams [connector](https://github.com/awslabs/spark-sql-kinesis-connector). This connector supports consumption with Enhanced Fan-Out, which provides your application with dedicated read throughput of up to 2 MB of data per second per shard. For more information, see [Developing Custom Consumers with Dedicated Throughput (Enhanced Fan-Out)](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html). 

To consume Kinesis data streams using Spark Streaming, see [Spark Streaming \$1 Kinesis Integration](https://spark.apache.org/docs/latest/streaming-kinesis-integration.html). 

# Databricks
<a name="using-other-services-read-databricks"></a>

Databricks is a cloud-based platform that provides a collaborative environment for data engineering, data science, and machine learning. For more information on consuming Kinesis data streams using Databricks, see [Connect to Amazon Kinesis](https://docs.databricks.com/structured-streaming/kinesis.html). 

# Kafka Confluent Platform
<a name="using-other-services-read-kafka"></a>

Confluent Platform is built on top of Kafka and provides additional features and functionality that help enterprises build and manage real-time data pipelines and streaming applications. For more information on consuming Kinesis data streams using the Confluent Platform, see [Amazon Kinesis Source Connector for Confluent Platform](https://docs.confluent.io/kafka-connectors/kinesis/current/overview.html#features). 

# Kinesumer
<a name="using-other-services-read-kinesumer"></a>

Kinesumer is a Go client implementing a client-side distributed consumer group client for Kinesis data streams. For more information, see [Kinesumer Github repository](https://github.com/daangn/kinesumer). 

# Talend
<a name="using-other-services-read-talend"></a>

Talend is a data integration and management software that allows users to collect, transform, and connect data from various sources in a scalable and efficient manner. For more information on consuming Kinesis data streams using Talend, see [Connect talend to an Amazon Kinesis stream](https://help.talend.com/r/en-US/Cloud/connectors-guide/connector-kinesis). 

# Troubleshoot Kinesis Data Streams consumers
<a name="troubleshooting-consumers"></a>

**Topics**
+ [

## Compilation error with the LeaseManagementConfig constructor
](#compilation-error-leasemanagementconfig)
+ [

## Some Kinesis Data Streams records are skipped when using the Kinesis Client Library
](#records-skipped)
+ [

## Records belonging to the same shard are processed by different record processors at the same time
](#records-belonging-to-the-same-shard)
+ [

## The consumer application is reading at a slower rate than expected
](#consumer-app-reading-slower)
+ [

## GetRecords returns an empty records array even when there is data in the stream
](#getrecords-returns-empty)
+ [

## The shard iterator expires unexpectedly
](#shard-iterator-expires-unexpectedly)
+ [

## Consumer record processing is falling behind
](#record-processing-falls-behind)
+ [

## Unauthorized KMS key permission error
](#unauthorized-kms-consumer)
+ [

## DynamoDbException: The document path provided in the update expression is invalid for update
](#dynamo-db-exception)
+ [

## Troubleshoot other common issues for consumers
](#misc-troubleshooting-consumer)

## Compilation error with the LeaseManagementConfig constructor
<a name="compilation-error-leasemanagementconfig"></a>

When upgrading to Kinesis Client Library (KCL) version 3.x or later, you may encounter a compilation error related to the `LeaseManagementConfig` constructor. If you are directly creating a `LeaseManagementConfig` object to set configurations instead of using `ConfigsBuilder` in KCL versions 3.x or later, you might see the following error message while compiling your KCL application code.

```
Cannot resolve constructor 'LeaseManagementConfig(String, DynamoDbAsyncClient, KinesisAsyncClient, String)'
```

 KCL with versions 3.x or later requires you to add one more parameter, applicationName (type: String), after the tableName parameter. 
+ *Before*: leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName, workerIdentifier)
+ *After*: leaseManagementConfig = new LeaseManagementConfig(tableName, **applicationName**, dynamoDBClient, kinesisClient, streamName, workerIdentifier)

Instead of directly creating a LeaseManagementConfig object, we recommend using `ConfigsBuilder` to set configurations in KCL 3.x and later versions. `ConfigsBuilder` provides a more flexible and maintainable way to configure your KCL application.

The following is an example of using `ConfigsBuilder` to set KCL configurations.

```
ConfigsBuilder configsBuilder = new ConfigsBuilder(
    streamName,
    applicationName,
    kinesisClient,
    dynamoClient,
    cloudWatchClient,
    UUID.randomUUID().toString(),
    new SampleRecordProcessorFactory()
);

Scheduler scheduler = new Scheduler(
    configsBuilder.checkpointConfig(),
    configsBuilder.coordinatorConfig(),
    configsBuilder.leaseManagementConfig()
    .failoverTimeMillis(60000), // this is an example
    configsBuilder.lifecycleConfig(),
    configsBuilder.metricsConfig(),
    configsBuilder.processorConfig(),
    configsBuilder.retrievalConfig()
);
```

## Some Kinesis Data Streams records are skipped when using the Kinesis Client Library
<a name="records-skipped"></a>

The most common cause of skipped records is an unhandled exception thrown from `processRecords`. The Kinesis Client Library (KCL) relies on your `processRecords` code to handle any exceptions that arise from processing the data records. Any exception thrown from `processRecords` is absorbed by the KCL. To avoid infinite retries on a recurring failure, the KCL does not resend the batch of records processed at the time of the exception. The KCL then calls `processRecords` for the next batch of data records without restarting the record processor. This effectively results in consumer applications observing skipped records. To prevent skipped records, handle all exceptions within `processRecords` appropriately.

## Records belonging to the same shard are processed by different record processors at the same time
<a name="records-belonging-to-the-same-shard"></a>

For any running Kinesis Client Library (KCL) application, a shard only has one owner. However, multiple record processors may temporarily process the same shard. If a worker instance loses network connectivity, the KCL assumes that the unreachable worker is no longer processing records after the failover time expires, and directs other worker instances to take over. For a brief period, new record processors and record processors from the unreachable worker may process data from the same shard. 

Set a failover time that is appropriate for your application. For low-latency applications, the 10-second default may represent the maximum time you want to wait. However, in cases where you expect connectivity issues such as making calls across geographical areas where connectivity could be lost more frequently, this number may be too low.

Your application should anticipate and handle this scenario, especially because network connectivity is usually restored to the previously unreachable worker. If a record processor has its shards taken by another record processor, it must handle the following two cases to perform graceful shutdown:

1. After the current call to `processRecords` is completed, the KCL invokes the shutdown method on the record processor with shutdown reason 'ZOMBIE.' Your record processors are expected to clean up any resources as appropriate and then exit.

1.  When you attempt to checkpoint from a 'zombie' worker, the KCL throws `ShutdownException`. After receiving this exception, your code is expected to exit the current method cleanly.

For more information, see [Handle duplicate records](kinesis-record-processor-duplicates.md).

## The consumer application is reading at a slower rate than expected
<a name="consumer-app-reading-slower"></a>

The most common reasons for read throughput being slower than expected are as follows:

1. Multiple consumer applications have total reads exceeding the per-shard limits. For more information, see [Quotas and limits](service-sizes-and-limits.md). In this case, increase the number of shards in the Kinesis data stream.

1. The [limit](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#API_GetRecords_RequestSyntax) that specifies the maximum number of **GetRecords** per call may have been configured with a low value. If you are using the KCL, you may have configured the worker with a low value for the `maxRecords` property. In general, we recommend using the system defaults for this property.

1. The logic inside your `processRecords` call may be taking longer than expected for a number of possible reasons; the logic may be CPU intensive, I/O blocking, or bottlenecked on synchronization. To test if this is true, test run empty record processors and compare the read throughput. For information about how to keep up with the incoming data, see [Use resharding, scaling, and parallel processing to change the number of shards](kinesis-record-processor-scaling.md).

If you have only one consumer application, it is always possible to read at least two times faster than the put rate. That’s because you can write up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second (including partition keys). Each open shard can support up to 5 transactions per second for reads, up to a maximum total data read rate of 2 MB per second. Note that each read (**GetRecords** call) gets a batch of records. The size of the data returned by **GetRecords** varies depending on the utilization of the shard. The maximum size of data that **GetRecords** can return is 10 MB. If a call returns that limit, subsequent calls made within the next 5 seconds throw a `ProvisionedThroughputExceededException`.

## GetRecords returns an empty records array even when there is data in the stream
<a name="getrecords-returns-empty"></a>

Consuming, or getting records is a pull model. Developers are expected to call [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) in a continuous loop with no back-offs. Every call to **GetRecords** also returns a `ShardIterator` value, which must be used in the next iteration of the loop. 

The **GetRecords** operation does not block. Instead, it returns immediately; with either relevant data records or with an empty `Records` element. An empty `Records` element is returned under two conditions: 

1. There is no more data currently in the shard. 

1. There is no data near the part of the shard pointed to by the `ShardIterator`.

The latter condition is subtle, but is a necessary design tradeoff to avoid unbounded seek time (latency) when retrieving records. Thus, the stream-consuming application should loop and call **GetRecords**, handling empty records as a matter of course. 

In a production scenario, the only time the continuous loop should be exited is when the `NextShardIterator` value is `NULL`. When `NextShardIterator` is `NULL`, it means that the current shard has been closed and the `ShardIterator`value would otherwise point past the last record. If the consuming application never calls **SplitShard** or **MergeShards**, the shard remains open and the calls to **GetRecords** never return a `NextShardIterator` value that is `NULL`. 

If you use the Kinesis Client Library (KCL), the preceding consumption pattern is abstracted for you. This includes automatic handling of a set of shards that dynamically change. With the KCL, the developer only supplies the logic to process incoming records. This is possible because the library makes continuous calls to **GetRecords** for you. 

## The shard iterator expires unexpectedly
<a name="shard-iterator-expires-unexpectedly"></a>

A new shard iterator is returned by every **GetRecords** request (as `NextShardIterator`), which you then use in the next **GetRecords** request (as `ShardIterator`). Typically, this shard iterator does not expire before you use it. However, you may find that shard iterators expire because you have not called **GetRecords** for more than 5 minutes, or because you've performed a restart of your consumer application.

If the shard iterator expires immediately before you can use it, this might indicate that the DynamoDB table used by Kinesis does not have enough capacity to store the lease data. This situation is more likely to happen if you have a large number of shards. To solve this problem, increase the write capacity assigned to the shard table. For more information, see [Use a lease table to track the shards processed by the KCL consumer application](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

## Consumer record processing is falling behind
<a name="record-processing-falls-behind"></a>

For most use cases, consumer applications are reading the latest data from the stream. In certain circumstances, consumer reads may fall behind, which may not be desired. After you identify how far behind your consumers are reading, look at the most common reasons why consumers fall behind. 

Start with the `GetRecords.IteratorAgeMilliseconds` metric, which tracks the read position across all shards and consumers in the stream. Note that if an iterator's age passes 50% of the retention period (by default, 24 hours, configurable up to 365 days), there is risk for data loss due to record expiration. A quick stopgap solution is to increase the retention period. This stops the loss of important data while you troubleshoot the issue further. For more information, see [Monitor the Amazon Kinesis Data Streams service with Amazon CloudWatch](monitoring-with-cloudwatch.md). Next, identify how far behind your consumer application is reading from each shard using a custom CloudWatch metric emitted by the Kinesis Client Library (KCL), `MillisBehindLatest`. For more information, see [Monitor the Kinesis Client Library with Amazon CloudWatch](monitoring-with-kcl.md).

Here are the most common reasons consumers can fall behind:
+ Sudden large increases to `GetRecords.IteratorAgeMilliseconds` or `MillisBehindLatest` usually indicate a transient problem, such as API operation failures to a downstream application. Investigate these sudden increases if either of the metrics consistently display this behavior. 
+ A gradual increase to these metrics indicates that a consumer is not keeping up with the stream because it is not processing records fast enough. The most common root causes for this behavior are insufficient physical resources or record processing logic that has not scaled with an increase in stream throughput. You can verify this behavior by looking at the other custom CloudWatch metrics that the KCL emits associated with the `processTask` operation, including `RecordProcessor.processRecords.Time`, `Success`, and `RecordsProcessed`.
  + If you see an increase in the `processRecords.Time` metric that correlates with increased throughput, you should analyze your record processing logic to identify why it is not scaling with the increased throughput.
  + If you see an increase to the `processRecords.Time` values that are not correlated with increased throughput, check to see if you are making any blocking calls in the critical path, which are often the cause of slowdowns in record processing. An alternative approach is to increase your parallelism by increasing the number of shards. Finally, confirm that you have an adequate amount of physical resources (memory, CPU utilization, among others) on the underlying processing nodes during peak demand.

## Unauthorized KMS key permission error
<a name="unauthorized-kms-consumer"></a>

This error occurs when a consumer application reads from an encrypted stream without permissions on the AWS KMS key. To assign permissions to an application to access a KMS key, see [Using Key Policies in AWS KMS](https://docs.aws.amazon.com/kms/latest/developerguide/key-policies.html) and [Using IAM Policies with AWS KMS](https://docs.aws.amazon.com/kms/latest/developerguide/iam-policies.html).

## DynamoDbException: The document path provided in the update expression is invalid for update
<a name="dynamo-db-exception"></a>

When using KCL 3.x with AWS SDK for Java versions 2.27.19 through 2.27.23, you may encounter the following DynamoDB exception:

 "software.amazon.awssdk.services.dynamodb.model.DynamoDbException: The document path provided in the update expression is invalid for update (Service: DynamoDb, Status Code: 400, Request ID: xxx)"

This error occurs due to a known issue in the AWS SDK for Java that affects the DynamoDB metadata table managed by KCL 3.x. The issue was introduced in version 2.27.19 and impacts all versions up to 2.27.23. The issue has been resolved in the AWS SDK for Java version 2.27.24. For optimal performance and stability, we recommend upgrading to version 2.28.0 or later.

## Troubleshoot other common issues for consumers
<a name="misc-troubleshooting-consumer"></a>
+ [Why is Kinesis Data Streams trigger unable to invoke my Lambda function?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-streams-lambda-invocation/)
+ [How do I detect and troubleshoot ReadProvisionedThroughputExceeded exceptions in Kinesis Data Streams?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-readprovisionedthroughputexceeded/) 
+ [Why am I experiencing high latency issues with Kinesis Data Streams?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-latency-issues/)
+ [Why is my Kinesis data stream returning a 500 Internal Server Error?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-500-error/)
+ [How do I troubleshoot a blocked or stuck KCL application for Kinesis Data Streams?](https://aws.amazon.com/premiumsupport/knowledge-center/kcl-kinesis-data-streams/)
+ [Can I use different Amazon Kinesis Client Library applications with the same Amazon DynamoDB table?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-kcl-apps-dynamodb-table/)

# Optimize Amazon Kinesis Data Streams consumers
<a name="advanced-consumers"></a>

You can further optimize your Amazon Kinesis Data Streams consumer based on specific behavior you see. 

Review the following topics to identify solutions.

**Topics**
+ [

# Improve low-latency processing
](kinesis-low-latency.md)
+ [

# Process serialized data using AWS Lambda with the Amazon Kinesis Producer Library
](kinesis-record-deaggregation.md)
+ [

# Use resharding, scaling, and parallel processing to change the number of shards
](kinesis-record-processor-scaling.md)
+ [

# Handle duplicate records
](kinesis-record-processor-duplicates.md)
+ [

# Handle startup, shutdown, and throttling
](kinesis-record-processor-additional-considerations.md)

# Improve low-latency processing
<a name="kinesis-low-latency"></a>

*Propagation delay* is defined as the end-to-end latency from the moment a record is written to the stream until it is read by a consumer application. This delay varies depending upon a number of factors, but it is primarily affected by the polling interval of consumer applications.

For most applications, we recommend polling each shard one time per second per application. This enables you to have multiple consumer applications processing a stream concurrently without hitting Amazon Kinesis Data Streams limits of 5 `GetRecords` calls per second. Additionally, processing larger batches of data tends to be more efficient at reducing network and other downstream latencies in your application.

The KCL defaults are set to follow the best practice of polling every 1 second. This default results in average propagation delays that are typically below 1 second.

Kinesis Data Streams records are available to be read immediately after they are written. There are some use cases that need to take advantage of this and require consuming data from the stream as soon as it is available. You can significantly reduce the propagation delay by overriding the KCL default settings to poll more frequently, as shown in the following examples.

Java KCL configuration code:

```
kinesisClientLibConfiguration = new
        KinesisClientLibConfiguration(applicationName,
        streamName,               
        credentialsProvider,
        workerId).withInitialPositionInStream(initialPositionInStream).withIdleTimeBetweenReadsInMillis(250);
```

Property file setting for Python and Ruby KCL:

```
idleTimeBetweenReadsInMillis = 250
```

**Note**  
Because Kinesis Data Streams has a limit of 5 `GetRecords` calls per second, per shard, setting the `idleTimeBetweenReadsInMillis` property lower than 200ms may result in your application observing the `ProvisionedThroughputExceededException` exception. Too many of these exceptions can result in exponential back-offs and thereby cause significant unexpected latencies in processing. If you set this property to be at or just above 200 ms and have more than one processing application, you will experience similar throttling.

# Process serialized data using AWS Lambda with the Amazon Kinesis Producer Library
<a name="kinesis-record-deaggregation"></a>

The [Amazon Kinesis Producer Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-kpl.html) (KPL) aggregates small user-formatted records into larger records up to 1 MB to make better use of Amazon Kinesis Data Streams throughput. While the KCL for Java supports deaggregating these records, you need to use a special module to deaggregate records when using AWS Lambda as the consumer of your streams. You can obtain the necessary project code and instructions from GitHub at [Amazon Kinesis Producer Library Deaggregation Modules for AWS Lambda](https://github.com/awslabs/kinesis-deaggregation). The components in this project give you the ability to process KPL serialized data within AWS Lambda, in Java, Node.js and Python. These components can also be used as part of a [multi-lang KCL application](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/package-info.java).

# Use resharding, scaling, and parallel processing to change the number of shards
<a name="kinesis-record-processor-scaling"></a>

*Resharding* enables you to increase or decrease the number of shards in a stream in order to adapt to changes in the rate of data flowing through the stream. Resharding is typically performed by an administrative application that monitors shard data-handling metrics. Although the KCL itself doesn't initiate resharding operations, it is designed to adapt to changes in the number of shards that result from resharding. 

As noted in [Use a lease table to track the shards processed by the KCL consumer application](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable), the KCL tracks the shards in the stream using an Amazon DynamoDB table. When new shards are created as a result of resharding, the KCL discovers the new shards and populates new rows in the table. The workers automatically discover the new shards and create processors to handle the data from them. The KCL also distributes the shards in the stream across all the available workers and record processors. 

The KCL ensures that any data that existed in shards prior to the resharding is processed first. After that data has been processed, data from the new shards is sent to record processors. In this way, the KCL preserves the order in which data records were added to the stream for a particular partition key.

## Example: Resharding, scaling, and parallel processing
<a name="kinesis-record-processor-scaling-example"></a>

The following example illustrates how the KCL helps you handle scaling and resharding:
+ For example, if your application is running on one EC2 instance, and is processing one Kinesis data stream that has four shards. This one instance has one KCL worker and four record processors (one record processor for every shard). These four record processors run in parallel within the same process. 
+ Next, if you scale the application to use another instance, you have two instances processing one stream that has four shards. When the KCL worker starts up on the second instance, it load-balances with the first instance, so that each instance now processes two shards. 
+ If you then decide to split the four shards into five shards. The KCL again coordinates the processing across instances: one instance processes three shards, and the other processes two shards. A similar coordination occurs when you merge shards.

Typically, when you use the KCL, you should ensure that the number of instances does not exceed the number of shards (except for failure standby purposes). Each shard is processed by exactly one KCL worker and has exactly one corresponding record processor, so you never need multiple instances to process one shard. However, one worker can process any number of shards, so it's fine if the number of shards exceeds the number of instances. 

To scale up processing in your application, you should test a combination of these approaches:
+ Increasing the instance size (because all record processors run in parallel within a process)
+ Increasing the number of instances up to the maximum number of open shards (because shards can be processed independently)
+ Increasing the number of shards (which increases the level of parallelism)

Note that you can use Auto Scaling to automatically scale your instances based on appropriate metrics. For more information, see the [Amazon EC2 Auto Scaling User Guide](https://docs.aws.amazon.com/autoscaling/ec2/userguide/).

When resharding increases the number of shards in the stream, the corresponding increase in the number of record processors increases the load on the EC2 instances that are hosting them. If the instances are part of an Auto Scaling group, and the load increases sufficiently, the Auto Scaling group adds more instances to handle the increased load. You should configure your instances to launch your Amazon Kinesis Data Streams application at startup, so that additional workers and record processors become active on the new instance right away.

For more information about resharding, see [Reshard a stream](kinesis-using-sdk-java-resharding.md). 

# Handle duplicate records
<a name="kinesis-record-processor-duplicates"></a>

There are two primary reasons why records may be delivered more than one time to your Amazon Kinesis Data Streams application: producer retries and consumer retries. Your application must anticipate and appropriately handle processing individual records multiple times.

## Producer retries
<a name="kinesis-record-processor-duplicates-producer"></a>

Consider a producer that experiences a network-related timeout after it makes a call to `PutRecord`, but before it can receive an acknowledgement from Amazon Kinesis Data Streams. The producer cannot be sure if the record was delivered to Kinesis Data Streams. Assuming that every record is important to the application, the producer would have been written to retry the call with the same data. If both `PutRecord` calls on that same data were successfully committed to Kinesis Data Streams, then there will be two Kinesis Data Streams records. Although the two records have identical data, they also have unique sequence numbers. Applications that need strict guarantees should embed a primary key within the record to remove duplicates later when processing. Note that the number of duplicates due to producer retries is usually low compared to the number of duplicates due to consumer retries.

**Note**  
If you use the AWS SDK `PutRecord`, learn about SDK [Retry behavior](https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html) in the *AWS SDKs and Tools user guide*.

## Consumer retries
<a name="kinesis-record-processor-duplicates-consumer"></a>

Consumer (data processing application) retries happen when record processors restart. Record processors for the same shard restart in the following cases:

1. A worker terminates unexpectedly 

1. Worker instances are added or removed 

1. Shards are merged or split 

1. The application is deployed 

In all these cases, the shards-to-worker-to-record-processor mapping is continuously updated to load balance processing. Shard processors that were migrated to other instances restart processing records from the last checkpoint. This results in duplicated record processing as shown in the example below. For more information about load-balancing, see [Use resharding, scaling, and parallel processing to change the number of shards](kinesis-record-processor-scaling.md).

### Example: Consumer retries resulting in redelivered records
<a name="kinesis-record-processor-duplicates-consumer-example"></a>

In this example, you have an application that continuously reads records from a stream, aggregates records into a local file, and uploads the file to Amazon S3. For simplicity, assume there is only 1 shard and 1 worker processing the shard. Consider the following example sequence of events, assuming that the last checkpoint was at record number 10000:

1.  A worker reads the next batch of records from the shard, records 10001 to 20000.

1.  The worker then passes the batch of records to the associated record processor.

1.  The record processor aggregates the data, creates an Amazon S3 file, and uploads the file to Amazon S3 successfully.

1.  Worker terminates unexpectedly before a new checkpoint can occur. 

1.  Application, worker, and record processor restart.

1.  Worker now begins reading from the last successful checkpoint, in this case 10001.

Thus, records 10001-20000 are consumed more than one time.

### Being resilient to consumer retries
<a name="kinesis-record-processor-duplicates-consumer-resilience"></a>

Even though records may be processed more than one time, your application may want to present the side effects as if records were processed only one time (idempotent processing). Solutions to this problem vary in complexity and accuracy. If the destination of the final data can handle duplicates well, we recommend relying on the final destination to achieve idempotent processing. For example, with [Opensearch](https://www.opensearch.org/) you can use a combination of versioning and unique IDs to prevent duplicated processing. 

In the example application in the previous section, it continuously reads records from a stream, aggregates records into a local file, and uploads the file to Amazon S3. As illustrated, records 10001 -20000 are consumed more than one time resulting in multiple Amazon S3 files with the same data. One way to mitigate duplicates from this example is to ensure that step 3 uses the following scheme: 

1.  Record Processor uses a fixed number of records per Amazon S3 file, such as 5000.

1.  The file name uses this schema: Amazon S3 prefix, shard-id, and `First-Sequence-Num`. In this case, it could be something like `sample-shard000001-10001`.

1.  After you upload the Amazon S3 file, checkpoint by specifying `Last-Sequence-Num`. In this case, you would checkpoint at record number 15000. 

With this scheme, even if records are processed more than one time, the resulting Amazon S3 file has the same name and has the same data. The retries only result in writing the same data to the same file more than one time.

In the case of a reshard operation, the number of records left in the shard may be less than your desired fixed number needed. In this case, your `shutdown()` method has to flush the file to Amazon S3 and checkpoint on the last sequence number. The above scheme is compatible with reshard operations as well.

# Handle startup, shutdown, and throttling
<a name="kinesis-record-processor-additional-considerations"></a>

Here are some additional considerations to incorporate into the design of your Amazon Kinesis Data Streams application.

**Topics**
+ [

## Start up data producers and data consumers
](#kinesis-record-processor-producer-consumer-coordination)
+ [

## Shut down an Amazon Kinesis Data Streams application
](#developing-consumers-with-kcl-shutdown)
+ [

## Read throttling
](#kinesis-record-processor-read-throttling)

## Start up data producers and data consumers
<a name="kinesis-record-processor-producer-consumer-coordination"></a>

By default, the KCL begins reading records from the tip of the stream, which is the most recently added record. In this configuration, if a data-producing application adds records to the stream before any receiving record processors are running, the records are not read by the record processors after they start up. 

To change the behavior of the record processors so that it always reads data from the beginning of the stream, set the following value in the properties file for your Amazon Kinesis Data Streams application: 

```
initialPositionInStream = TRIM_HORIZON
```

By default, Amazon Kinesis Data Streams stores all data for 24 hours. It also supports extended retention of up to 7 days and the long-term retention of up to 365 days. This time frame is called the *retention period*. Setting the starting position to the `TRIM_HORIZON` will start the record processor with the oldest data in the stream, as defined by the retention period. Even with the `TRIM_HORIZON` setting, if a record processor were to start after a greater time has passed than the retention period, then some of the records in the stream will no longer be available. For this reason, you should always have consumer applications reading from the stream and use the CloudWatch metric `GetRecords.IteratorAgeMilliseconds` to monitor that applications are keeping up with incoming data.

In some scenarios, it may be fine for record processors to miss the first few records in the stream. For example, you might run some initial records through the stream to test that the stream is working end-to-end as expected. After doing this initial verification, you would then start your workers and begin to put production data into the stream. 

For more information about the `TRIM_HORIZON` setting, see [Use shard iterators](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data-shard-iterators).

## Shut down an Amazon Kinesis Data Streams application
<a name="developing-consumers-with-kcl-shutdown"></a>

When your Amazon Kinesis Data Streams application has completed its intended task, you should shut it down by terminating the EC2 instances on which it is running. You can terminate the instances using the [AWS Management Console](https://console.aws.amazon.com//ec2/home) or the [AWS CLI](https://docs.aws.amazon.com/cli/latest/reference/ec2/index.html). 

 After shutting down your Amazon Kinesis Data Streams application, you should delete the Amazon DynamoDB table that the KCL used to track the application's state. 

## Read throttling
<a name="kinesis-record-processor-read-throttling"></a>

The throughput of a stream is provisioned at the shard level. Each shard has a read throughput of up to 5 transactions per second for reads, up to a maximum total data read rate of 2 MB per second. If an application (or a group of applications operating on the same stream) attempts to get data from a shard at a faster rate, Kinesis Data Streams throttles the corresponding Get operations. 

In an Amazon Kinesis Data Streams application, if a record processor is processing data faster than the limit — such as in the case of a failover — throttling occurs. Because KCL manages the interactions between the application and Kinesis Data Streams, throttling exceptions occur in the KCL code rather than in the application code. However, because the KCL logs these exceptions, you see them in the logs.

If you find that your application is throttled consistently, you should consider increasing the number of shards for the stream.