

# Optimize Amazon Kinesis Data Streams consumers
<a name="advanced-consumers"></a>

You can further optimize your Amazon Kinesis Data Streams consumer based on specific behavior you see. 

Review the following topics to identify solutions.

**Topics**
+ [Improve low-latency processing](kinesis-low-latency.md)
+ [Process serialized data using AWS Lambda with the Amazon Kinesis Producer Library](kinesis-record-deaggregation.md)
+ [Use resharding, scaling, and parallel processing to change the number of shards](kinesis-record-processor-scaling.md)
+ [Handle duplicate records](kinesis-record-processor-duplicates.md)
+ [Handle startup, shutdown, and throttling](kinesis-record-processor-additional-considerations.md)

# Improve low-latency processing
<a name="kinesis-low-latency"></a>

*Propagation delay* is defined as the end-to-end latency from the moment a record is written to the stream until it is read by a consumer application. This delay varies depending upon a number of factors, but it is primarily affected by the polling interval of consumer applications.

For most applications, we recommend polling each shard one time per second per application. This enables you to have multiple consumer applications processing a stream concurrently without hitting Amazon Kinesis Data Streams limits of 5 `GetRecords` calls per second. Additionally, processing larger batches of data tends to be more efficient at reducing network and other downstream latencies in your application.

The KCL defaults are set to follow the best practice of polling every 1 second. This default results in average propagation delays that are typically below 1 second.

Kinesis Data Streams records are available to be read immediately after they are written. There are some use cases that need to take advantage of this and require consuming data from the stream as soon as it is available. You can significantly reduce the propagation delay by overriding the KCL default settings to poll more frequently, as shown in the following examples.

Java KCL configuration code:

```
kinesisClientLibConfiguration = new
        KinesisClientLibConfiguration(applicationName,
        streamName,               
        credentialsProvider,
        workerId).withInitialPositionInStream(initialPositionInStream).withIdleTimeBetweenReadsInMillis(250);
```

Property file setting for Python and Ruby KCL:

```
idleTimeBetweenReadsInMillis = 250
```

**Note**  
Because Kinesis Data Streams has a limit of 5 `GetRecords` calls per second, per shard, setting the `idleTimeBetweenReadsInMillis` property lower than 200ms may result in your application observing the `ProvisionedThroughputExceededException` exception. Too many of these exceptions can result in exponential back-offs and thereby cause significant unexpected latencies in processing. If you set this property to be at or just above 200 ms and have more than one processing application, you will experience similar throttling.

# Process serialized data using AWS Lambda with the Amazon Kinesis Producer Library
<a name="kinesis-record-deaggregation"></a>

The [Amazon Kinesis Producer Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-kpl.html) (KPL) aggregates small user-formatted records into larger records up to 1 MB to make better use of Amazon Kinesis Data Streams throughput. While the KCL for Java supports deaggregating these records, you need to use a special module to deaggregate records when using AWS Lambda as the consumer of your streams. You can obtain the necessary project code and instructions from GitHub at [Amazon Kinesis Producer Library Deaggregation Modules for AWS Lambda](https://github.com/awslabs/kinesis-deaggregation). The components in this project give you the ability to process KPL serialized data within AWS Lambda, in Java, Node.js and Python. These components can also be used as part of a [multi-lang KCL application](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/package-info.java).

# Use resharding, scaling, and parallel processing to change the number of shards
<a name="kinesis-record-processor-scaling"></a>

*Resharding* enables you to increase or decrease the number of shards in a stream in order to adapt to changes in the rate of data flowing through the stream. Resharding is typically performed by an administrative application that monitors shard data-handling metrics. Although the KCL itself doesn't initiate resharding operations, it is designed to adapt to changes in the number of shards that result from resharding. 

As noted in [Use a lease table to track the shards processed by the KCL consumer application](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable), the KCL tracks the shards in the stream using an Amazon DynamoDB table. When new shards are created as a result of resharding, the KCL discovers the new shards and populates new rows in the table. The workers automatically discover the new shards and create processors to handle the data from them. The KCL also distributes the shards in the stream across all the available workers and record processors. 

The KCL ensures that any data that existed in shards prior to the resharding is processed first. After that data has been processed, data from the new shards is sent to record processors. In this way, the KCL preserves the order in which data records were added to the stream for a particular partition key.

## Example: Resharding, scaling, and parallel processing
<a name="kinesis-record-processor-scaling-example"></a>

The following example illustrates how the KCL helps you handle scaling and resharding:
+ For example, if your application is running on one EC2 instance, and is processing one Kinesis data stream that has four shards. This one instance has one KCL worker and four record processors (one record processor for every shard). These four record processors run in parallel within the same process. 
+ Next, if you scale the application to use another instance, you have two instances processing one stream that has four shards. When the KCL worker starts up on the second instance, it load-balances with the first instance, so that each instance now processes two shards. 
+ If you then decide to split the four shards into five shards. The KCL again coordinates the processing across instances: one instance processes three shards, and the other processes two shards. A similar coordination occurs when you merge shards.

Typically, when you use the KCL, you should ensure that the number of instances does not exceed the number of shards (except for failure standby purposes). Each shard is processed by exactly one KCL worker and has exactly one corresponding record processor, so you never need multiple instances to process one shard. However, one worker can process any number of shards, so it's fine if the number of shards exceeds the number of instances. 

To scale up processing in your application, you should test a combination of these approaches:
+ Increasing the instance size (because all record processors run in parallel within a process)
+ Increasing the number of instances up to the maximum number of open shards (because shards can be processed independently)
+ Increasing the number of shards (which increases the level of parallelism)

Note that you can use Auto Scaling to automatically scale your instances based on appropriate metrics. For more information, see the [Amazon EC2 Auto Scaling User Guide](https://docs.aws.amazon.com/autoscaling/ec2/userguide/).

When resharding increases the number of shards in the stream, the corresponding increase in the number of record processors increases the load on the EC2 instances that are hosting them. If the instances are part of an Auto Scaling group, and the load increases sufficiently, the Auto Scaling group adds more instances to handle the increased load. You should configure your instances to launch your Amazon Kinesis Data Streams application at startup, so that additional workers and record processors become active on the new instance right away.

For more information about resharding, see [Reshard a stream](kinesis-using-sdk-java-resharding.md). 

# Handle duplicate records
<a name="kinesis-record-processor-duplicates"></a>

There are two primary reasons why records may be delivered more than one time to your Amazon Kinesis Data Streams application: producer retries and consumer retries. Your application must anticipate and appropriately handle processing individual records multiple times.

## Producer retries
<a name="kinesis-record-processor-duplicates-producer"></a>

Consider a producer that experiences a network-related timeout after it makes a call to `PutRecord`, but before it can receive an acknowledgement from Amazon Kinesis Data Streams. The producer cannot be sure if the record was delivered to Kinesis Data Streams. Assuming that every record is important to the application, the producer would have been written to retry the call with the same data. If both `PutRecord` calls on that same data were successfully committed to Kinesis Data Streams, then there will be two Kinesis Data Streams records. Although the two records have identical data, they also have unique sequence numbers. Applications that need strict guarantees should embed a primary key within the record to remove duplicates later when processing. Note that the number of duplicates due to producer retries is usually low compared to the number of duplicates due to consumer retries.

**Note**  
If you use the AWS SDK `PutRecord`, learn about SDK [Retry behavior](https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html) in the *AWS SDKs and Tools user guide*.

## Consumer retries
<a name="kinesis-record-processor-duplicates-consumer"></a>

Consumer (data processing application) retries happen when record processors restart. Record processors for the same shard restart in the following cases:

1. A worker terminates unexpectedly 

1. Worker instances are added or removed 

1. Shards are merged or split 

1. The application is deployed 

In all these cases, the shards-to-worker-to-record-processor mapping is continuously updated to load balance processing. Shard processors that were migrated to other instances restart processing records from the last checkpoint. This results in duplicated record processing as shown in the example below. For more information about load-balancing, see [Use resharding, scaling, and parallel processing to change the number of shards](kinesis-record-processor-scaling.md).

### Example: Consumer retries resulting in redelivered records
<a name="kinesis-record-processor-duplicates-consumer-example"></a>

In this example, you have an application that continuously reads records from a stream, aggregates records into a local file, and uploads the file to Amazon S3. For simplicity, assume there is only 1 shard and 1 worker processing the shard. Consider the following example sequence of events, assuming that the last checkpoint was at record number 10000:

1.  A worker reads the next batch of records from the shard, records 10001 to 20000.

1.  The worker then passes the batch of records to the associated record processor.

1.  The record processor aggregates the data, creates an Amazon S3 file, and uploads the file to Amazon S3 successfully.

1.  Worker terminates unexpectedly before a new checkpoint can occur. 

1.  Application, worker, and record processor restart.

1.  Worker now begins reading from the last successful checkpoint, in this case 10001.

Thus, records 10001-20000 are consumed more than one time.

### Being resilient to consumer retries
<a name="kinesis-record-processor-duplicates-consumer-resilience"></a>

Even though records may be processed more than one time, your application may want to present the side effects as if records were processed only one time (idempotent processing). Solutions to this problem vary in complexity and accuracy. If the destination of the final data can handle duplicates well, we recommend relying on the final destination to achieve idempotent processing. For example, with [Opensearch](https://www.opensearch.org/) you can use a combination of versioning and unique IDs to prevent duplicated processing. 

In the example application in the previous section, it continuously reads records from a stream, aggregates records into a local file, and uploads the file to Amazon S3. As illustrated, records 10001 -20000 are consumed more than one time resulting in multiple Amazon S3 files with the same data. One way to mitigate duplicates from this example is to ensure that step 3 uses the following scheme: 

1.  Record Processor uses a fixed number of records per Amazon S3 file, such as 5000.

1.  The file name uses this schema: Amazon S3 prefix, shard-id, and `First-Sequence-Num`. In this case, it could be something like `sample-shard000001-10001`.

1.  After you upload the Amazon S3 file, checkpoint by specifying `Last-Sequence-Num`. In this case, you would checkpoint at record number 15000. 

With this scheme, even if records are processed more than one time, the resulting Amazon S3 file has the same name and has the same data. The retries only result in writing the same data to the same file more than one time.

In the case of a reshard operation, the number of records left in the shard may be less than your desired fixed number needed. In this case, your `shutdown()` method has to flush the file to Amazon S3 and checkpoint on the last sequence number. The above scheme is compatible with reshard operations as well.

# Handle startup, shutdown, and throttling
<a name="kinesis-record-processor-additional-considerations"></a>

Here are some additional considerations to incorporate into the design of your Amazon Kinesis Data Streams application.

**Topics**
+ [Start up data producers and data consumers](#kinesis-record-processor-producer-consumer-coordination)
+ [Shut down an Amazon Kinesis Data Streams application](#developing-consumers-with-kcl-shutdown)
+ [Read throttling](#kinesis-record-processor-read-throttling)

## Start up data producers and data consumers
<a name="kinesis-record-processor-producer-consumer-coordination"></a>

By default, the KCL begins reading records from the tip of the stream, which is the most recently added record. In this configuration, if a data-producing application adds records to the stream before any receiving record processors are running, the records are not read by the record processors after they start up. 

To change the behavior of the record processors so that it always reads data from the beginning of the stream, set the following value in the properties file for your Amazon Kinesis Data Streams application: 

```
initialPositionInStream = TRIM_HORIZON
```

By default, Amazon Kinesis Data Streams stores all data for 24 hours. It also supports extended retention of up to 7 days and the long-term retention of up to 365 days. This time frame is called the *retention period*. Setting the starting position to the `TRIM_HORIZON` will start the record processor with the oldest data in the stream, as defined by the retention period. Even with the `TRIM_HORIZON` setting, if a record processor were to start after a greater time has passed than the retention period, then some of the records in the stream will no longer be available. For this reason, you should always have consumer applications reading from the stream and use the CloudWatch metric `GetRecords.IteratorAgeMilliseconds` to monitor that applications are keeping up with incoming data.

In some scenarios, it may be fine for record processors to miss the first few records in the stream. For example, you might run some initial records through the stream to test that the stream is working end-to-end as expected. After doing this initial verification, you would then start your workers and begin to put production data into the stream. 

For more information about the `TRIM_HORIZON` setting, see [Use shard iterators](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data-shard-iterators).

## Shut down an Amazon Kinesis Data Streams application
<a name="developing-consumers-with-kcl-shutdown"></a>

When your Amazon Kinesis Data Streams application has completed its intended task, you should shut it down by terminating the EC2 instances on which it is running. You can terminate the instances using the [AWS Management Console](https://console.aws.amazon.com//ec2/home) or the [AWS CLI](https://docs.aws.amazon.com/cli/latest/reference/ec2/index.html). 

 After shutting down your Amazon Kinesis Data Streams application, you should delete the Amazon DynamoDB table that the KCL used to track the application's state. 

## Read throttling
<a name="kinesis-record-processor-read-throttling"></a>

The throughput of a stream is provisioned at the shard level. Each shard has a read throughput of up to 5 transactions per second for reads, up to a maximum total data read rate of 2 MB per second. If an application (or a group of applications operating on the same stream) attempts to get data from a shard at a faster rate, Kinesis Data Streams throttles the corresponding Get operations. 

In an Amazon Kinesis Data Streams application, if a record processor is processing data faster than the limit — such as in the case of a failover — throttling occurs. Because KCL manages the interactions between the application and Kinesis Data Streams, throttling exceptions occur in the KCL code rather than in the application code. However, because the KCL logs these exceptions, you see them in the logs.

If you find that your application is throttled consistently, you should consider increasing the number of shards for the stream.