

# Create and manage Kinesis data streams
<a name="working-with-streams"></a>

Amazon Kinesis Data Streams ingests a large amount of data in real time, durably stores the data, and makes the data available for consumption. The unit of data stored by Kinesis Data Streams is a *data record*. A *data stream* represents a group of data records. The data records in a data stream are distributed into shards.

A *shard* has a sequence of data records in a stream. It serves as a base throughput unit of a Kinesis data stream. A shard supports 1 MB/s and 1000 records per second for *writes* and 2 MB/s for *reads* in both on-demand and provisioned capacity modes. The shard limits ensure predictable performance, making it easier to design and operate a highly reliable data streaming workflow. 

In this section, you learn how to set the capacity mode for the stream and how to create a stream using either the AWS Management Console or APIs. Then you can take additional actions on the stream. 

**Topics**
+ [Choose the right mode to stream in](how-do-i-size-a-stream.md)
+ [Create a stream using the AWS Management Console](how-do-i-create-a-stream.md)
+ [Create a stream using the APIs](kinesis-using-sdk-java-create-stream.md)
+ [Update a stream](updating-a-stream.md)
+ [List streams](kinesis-using-sdk-java-list-streams.md)
+ [List shards](kinesis-using-sdk-java-list-shards.md)
+ [Delete a stream](kinesis-using-sdk-java-delete-stream.md)
+ [Reshard a stream](kinesis-using-sdk-java-resharding.md)
+ [Change the data retention period](kinesis-extended-retention.md)
+ [Tag your Amazon Kinesis Data Streams resources](tagging.md)
+ [Handle large records](large-records.md)
+ [Perform resilience testing with AWS Fault Injection Service](kinesis-fis.md)

# Choose the right mode to stream in
<a name="how-do-i-size-a-stream"></a>

The following topics explain how to choose the best mode for your application and how to switch between modes, if needed.

**Topics**
+ [What are the different modes in Kinesis Data Streams?](#diff-modes-kds)
+ [On-demand Standard mode features and use cases](#ondemandmode)
+ [On-demand Advantage mode features and use cases](#ondemand-advantage-mode)
+ [Provisioned mode features and use cases](#provisionedmode)
+ [Switch between modes](#switchingmodes)

## What are the different modes in Kinesis Data Streams?
<a name="diff-modes-kds"></a>

A mode determines how the capacity of a data stream is managed and how you're charged for the usage of your data stream. In Amazon Kinesis Data Streams, you can choose **On-demand Standard**, **On-demand Advantage**, and **provisioned** as the mode for your data streams.
+ **On-demand Standard** - Data streams with an on-demand mode require no capacity planning and automatically scale to handle gigabytes of write and read throughput per minute. With the on-demand mode, Kinesis Data Streams automatically manages the shards in order to provide the necessary throughput.
+  **On-demand Advantage** - An account-level mode that enables more capabilities and provides a simpler pricing structure for on-demand streams. In this mode, you can proactively warm a stream’s write throughput capacity at any time. For pricing, there's no longer a fixed, per-stream level charge, and the data ingest, data retrieval, and extended retention usage across all on-demand streams are at least 60% lower than that of **On-demand Standard**. 
+ **Provisioned** - For the data streams with a provisioned mode, you must specify the number of shards for the data stream. The total capacity of a data stream is the sum of the capacities of its shards. You can increase or decrease the number of shards in a data stream, as needed.

You can use Kinesis Data Streams `PutRecord` and `PutRecords` APIs to write data into your data streams in any mode. To retrieve data, all three modes support default consumers that use the `GetRecords` API and Enhanced Fan-Out (EFO) consumers that use the `SubscribeToShard` API.

All Kinesis Data Streams capabilities, including retention mode, encryption, monitoring metrics, and others, are supported for both the on-demand and provisioned modes. Kinesis Data Streams provides the high durability and availability in both the on-demand and provisioned capacity modes.

## On-demand Standard mode features and use cases
<a name="ondemandmode"></a>

Data streams in the on-demand mode require no capacity planning and automatically scale to handle gigabytes of write and read throughput per minute. On-demand mode simplifies ingesting and storing large data volumes at a low-latency because it eliminates provisioning and managing servers, storage, or throughput. You can ingest billions of records per day without any operational overhead.

On-demand mode is ideal for addressing the needs of highly variable and unpredictable application traffic. You no longer have to provision these workloads for peak capacity, which can result in higher costs due to low utilization. On-demand mode is suited for workloads with unpredictable and highly-variable traffic patterns. 

With the on-demand capacity mode, you pay per GB of data written and read from your data streams. You do not need to specify how much read and write throughput you expect your application to perform. Kinesis Data Streams instantly accommodates your workloads as they ramp up or down. For more information, see [Amazon Kinesis Data Streams pricing](https://aws.amazon.com/kinesis/data-streams/pricing/).

A data stream in the on-demand mode accommodates up to double the peak write throughput observed in the previous 30 days. As your data stream’s write throughput reaches a new peak, Kinesis Data Streams scales the data stream’s capacity automatically. For example, if your data stream has a write throughput that varies between 10 MB/s and 40 MB/s, then Kinesis Data Streams ensures that you can easily burst to double your previous peak throughput, or 80 MB/s. If the same data stream sustains a new peak throughput of 50 MB/s, Kinesis Data Streams ensures that there is enough capacity to ingest 100 MB/s of write throughput. However, write throttling can occur if your traffic increases to more than double the previous peak within a 15-minute duration. You need to retry these throttled requests.

The aggregate read capacity of a data stream with the on-demand mode increases proportionally to write throughput. This helps to ensure that consumer applications always have adequate read throughput to process incoming data in real time. You get at least twice the write throughput compared to read data using the `GetRecords` API. We recommend that you use one consumer application with the `GetRecord` API, so that it has enough room to catch up when the application needs to recover from downtime. It is recommended that you use the Enhanced Fan-Out capability of Kinesis Data Streams for scenarios that require adding more than one consumer application. Enhanced Fan-Out supports adding up to 20 consumer applications to a data stream using the `SubscribeToShard` API, with each consumer application having dedicated throughput. 

### Handle read and write throughput exceptions
<a name="hotshards"></a>

With the on-demand mode (same as with the provisioned capacity mode), you must specify a partition key with each record to write data into your data stream. Kinesis Data Streams uses your partition keys to distribute data across shards. Kinesis Data Streams monitors traffic for each shard. When the incoming traffic exceeds 500 KB/s per shard, it splits the shard within 15 minutes. The parent shard’s hash key values are redistributed evenly across child shards.

 If your incoming traffic exceeds twice your prior peak, you can experience read or write exceptions for about 15 minutes, even when your data is distributed evenly across the shards. We recommend that you retry all such requests so that all the records are properly stored in Kinesis Data Streams. 

You may experience read and write exceptions if you are using a partition key that leads to uneven data distribution, and the records assigned to a particular shard exceed its limits. With on-demand mode, the data stream automatically adapts to handle uneven data distribution patterns unless a single partition key exceeds a shard’s 1 MB/s throughput and 1000 records per second limits. 

In the on-demand mode, Kinesis Data Streams splits the shards evenly when it detects an increase in traffic. However, it does not detect and isolate hash keys that are driving a higher portion of incoming traffic to a particular shard. If you are using highly uneven partition keys you may continue to receive write exceptions. For such use cases, we recommend that you use the provisioned capacity mode that supports granular shard splits.

## On-demand Advantage mode features and use cases
<a name="ondemand-advantage-mode"></a>

On-demand Advantage is an account-level setting that unlocks more capabilities and provides a different pricing structure for all on-demand streams in the Region. In this mode, on-demand streams retain their functionality and continue to automatically scale capacity based on actual data usage. If you want to proactively warm a stream’s write throughput capacity, you can configure warm throughput. For example, if your data stream has a write throughput between 10 MB/s and 40 MB/s, you can expect it to handle up to 80MB/s of instant throughput increases without throttling. However, if you forecast an upcoming event to peak around 200MB/s of traffic, you can configure the stream with a warm throughput of 200MB/s to ensure that the capacity is available when the data throughput arrives. Using warm throughput doesn't incur an additional charge.

Another benefit of On-demand Advantage mode is that on-demand streams transition to a simpler pricing structure. With the mode enabled, the account will no longer see fixed, per-stream charges, and you would only deal with data ingest, data retrieval, and the optional extended retention charges. Each pricing dimension is also at a significant discount compared to the corresponding dimension in On-demand Standard. For more information, see [Amazon Kinesis Data Streams pricing](https://aws.amazon.com/kinesis/data-streams/pricing/).

Enhanced fan-out data retrievals also do not have a price premium compared to the standard data retrievals in this mode. Additionally, with On-demand Advantage mode, you can register up to 50 consumers per stream to use enhanced fan-out. Enabling On-demand Advantage commits the account to at least 25MiB/s of data ingest and 25MiB/s of data retrieval across all on-demand streams. For accounts that meet the minimum usage requirement, the Kinesis Data Streams console has a check to see if your account’s usage patterns would be a good fit to use **On-demand Advantage mode**.

If your account’s data usage is below the requirement, you'll be charged the difference as a shortfall, but it will still be at the same discounted rate. Enabling On-demand Advantage also has a minimum period of 24 hours before you can disable the mode. Overall, On-demand Advantage is the best way to stream with Kinesis Data Streams if you have consistent throughput usage close to or above the minimum commitment, need many fan-out consumers, or operate with hundreds of data streams. 

## Provisioned mode features and use cases
<a name="provisionedmode"></a>

With provisioned mode, after you create the data stream, you can dynamically scale your shard capacity up or down using the AWS Management Console or the [UpdateShardCount](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UpdateShardCount.html) API. You can make updates while there is a Kinesis Data Streams producer or consumer application writing to or reading data from the stream. 

The provisioned mode is suited for predictable traffic with capacity requirements that are easy to forecast. You can use the provisioned mode if you want fine-grained control over how data is distributed across shards. 

With the provisioned mode, you must specify the number of shards for the data stream. To determine the size of a data stream with the provisioned mode, you need the following input values:
+ The average size of the data record written to the stream in kilobytes (KB), rounded up to the nearest 1 KB (`average_data_size_in_KB`).
+ The number of data records written to and read from the stream per second (`records_per_second`).
+ The number of consumers, which are Kinesis Data Streams applications that consume data concurrently and independently from the stream (`number_of_consumers`).
+ The incoming write bandwidth in KB (`incoming_write_bandwidth_in_KB`), which is equal to the `average_data_size_in_KB` multiplied by the `records_per_second`.
+ The outgoing read bandwidth in KB (`outgoing_read_bandwidth_in_KB`), which is equal to the `incoming_write_bandwidth_in_KB` multiplied by the `number_of_consumers`.

You can calculate the number of shards (`number_of_shards`) that your stream needs by using the input values in the following formula.

```
number_of_shards = ceiling(max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048))
```

You may still experience read and write throughput exceptions in the provisioned mode if you don't configure your data stream to handle your peak throughput. In this case, you must manually scale your data stream to accommodate your data traffic. 

You may also experience read and write exceptions if you're using a partition key that leads to uneven data distribution and the records assigned to a shard exceed its limits. To resolve this issue in the provisioned mode, identify such shards and manually split them to better accommodate your traffic. For more information, see [Resharding a Stream](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-resharding.html). 

## Switch between modes
<a name="switchingmodes"></a>

For each data stream in your AWS account, you can switch between the on-demand and provisioned modes twice within 24 hours. Switching between modes doesn't cause any disruptions to your applications that use this data stream. You can continue writing to and reading from this data stream. As you're switching between modes, either from on-demand to provisioned or from provisioned to on-demand, the status of the stream is set to *Updating*. You must wait for the data stream status to get to *Active* before you can modify its properties again.

When you switch from provisioned to on-demand capacity mode, your data stream initially retains whatever shard count it had before the transition, and from this point on, Kinesis Data Streams monitors your data traffic and scales the shard count of this on-demand data stream depending on your write throughput. When you switch from on-demand to provisioned mode, your data stream also initially retains whatever shard count it had before the transition, but from this point on, you're responsible for monitoring and adjusting the shard count of this data stream to properly accomodate your write throughput.

You can switch from **On-demand Standard** to **On-demand Advantage** mode by enabling an account level setting. When enabled, the account commits to at least 25MiB/s of data ingest and 25MiB/s of data retrieval usage across all on-demand streams in the Region. Once enabled, you must wait at least 24 hours before you can disable **On-demand Advantage**, but you can request the change at any time. If you want to switch from **On-demand Advantage** to **On-demand Standard**, you must first remove any warm throughput configured with-demand streams.

# Create a stream using the AWS Management Console
<a name="how-do-i-create-a-stream"></a>

You can create a stream using the Kinesis Data Streams console, the Kinesis Data Streams API, or the AWS Command Line Interface (AWS CLI).

**To create a data stream using the console**

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. In the navigation bar, expand the Region selector and choose a Region.

1. Choose **Create data stream**.

1. On the **Create Kinesis stream** page, enter a name for your data stream and then choose either the **On-demand** or **Provisioned** capacity mode. The **On-demand** mode is selected by default. For more information, see [Choose the right mode to stream in](how-do-i-size-a-stream.md).

   With the **On-demand** mode, you can then choose **Create Kinesis stream** to create your data stream. With the **Provisioned** mode, you must then specify the number of shards you need, and then choose **Create Kinesis stream**.

   On the **Kinesis streams** page, your stream's **Status** is **Creating** while the stream is being created. When the stream is ready to use, the **Status** changes to **Active**.

1. Choose the name of your stream. The **Stream Details** page displays a summary of your stream configuration, along with monitoring information.

**To create a stream using the Kinesis Data Streams API**
+ For information about creating a stream using the Kinesis Data Streams API, see [Create a stream using the APIs](kinesis-using-sdk-java-create-stream.md).

**To create a stream using the AWS CLI**
+ For information about creating a stream using the AWS CLI, see the [create-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/create-stream.html) command.

# Create a stream using the APIs
<a name="kinesis-using-sdk-java-create-stream"></a>

Use the following steps to create your Kinesis data stream.

## Build the Kinesis Data Streams client
<a name="kinesis-using-sdk-java-create-client"></a>

Before you can work with Kinesis data streams, you must build a client object. The following Java code instantiates a client builder and uses it to set the Region, credentials, and the client configuration. It then builds a client object. 

```
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        
clientBuilder.setRegion(regionName);
clientBuilder.setCredentials(credentialsProvider);
clientBuilder.setClientConfiguration(config);
        
AmazonKinesis client = clientBuilder.build();
```

For more information, see [Kinesis Data Streams Regions and Endpoints](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region) in the *AWS General Reference*.

## Create the stream
<a name="kinesis-using-sdk-java-create-the-stream"></a>

Now that you have created your Kinesis Data Streams client, you can create a stream using the console or programmatically. To create a stream programmatically, instantiate a `CreateStreamRequest` object and specify a name for the stream. If you want to use provisioned mode, specify the number of shards for the stream to use.
+ **On-demand**:

  ```
  CreateStreamRequest createStreamRequest = new CreateStreamRequest();
  createStreamRequest.setStreamName( myStreamName );
  ```
+ **Provisioned**:

  ```
  CreateStreamRequest createStreamRequest = new CreateStreamRequest();
  createStreamRequest.setStreamName( myStreamName );
  createStreamRequest.setShardCount( myStreamSize );
  ```

The stream name identifies the stream. The name is scoped to the AWS account used by the application. It is also scoped by Region. That is, two streams in two different AWS accounts can have the same name, and two streams in the same AWS account but in two different Regions can have the same name, but not two streams on the same account and in the same Region.

The throughput of the stream is a function of the number of shards. For greater provisioned throughput, you require more shards. More shards also increase the cost that AWS charges for the stream. For more information about calculating an appropriate number of shards for your application, see [Choose the right mode to stream in](how-do-i-size-a-stream.md).

 After you have configured the `createStreamRequest` object, create a stream by calling the `createStream` method on the client. After calling `createStream`, wait for the stream to reach the `ACTIVE` state before performing any operations on the stream. To check the state of the stream, call the `describeStream` method. However, `describeStream` throws an exception if the stream does not exist. Therefore, enclose the `describeStream` call in a `try/catch` block. 

```
client.createStream( createStreamRequest );
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName( myStreamName );

long startTime = System.currentTimeMillis();
long endTime = startTime + ( 10 * 60 * 1000 );
while ( System.currentTimeMillis() < endTime ) {
  try {
    Thread.sleep(20 * 1000);
  } 
  catch ( Exception e ) {}
  
  try {
    DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest );
    String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus();
    if ( streamStatus.equals( "ACTIVE" ) ) {
      break;
    }
    //
    // sleep for one second
    //
    try {
      Thread.sleep( 1000 );
    }
    catch ( Exception e ) {}
  }
  catch ( ResourceNotFoundException e ) {}
}
if ( System.currentTimeMillis() >= endTime ) {
  throw new RuntimeException( "Stream " + myStreamName + " never went active" );
}
```

# Update a stream
<a name="updating-a-stream"></a>

You can update the details of a stream using the Kinesis Data Streams console, the Kinesis Data Streams API, or the AWS CLI.

**Note**  
You can enable server-side encryption for existing streams, or for streams that you have recently created.

## Use the console
<a name="update-stream-console"></a>

**To update a data stream using the console**

1. Open the Amazon Kinesis console at [https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/).

1. In the navigation bar, expand the Region selector and choose a Region.

1. Choose the name of your stream in the list. The **Stream Details** page displays a summary of your stream configuration and monitoring information.

1. To switch between on-demand and provisioned capacity modes for a data stream, choose **Edit capacity mode** in the **Configuration** tab. For more information, see [Choose the right mode to stream in](how-do-i-size-a-stream.md).
**Important**  
For each data stream in your AWS account, you can switch between the on-demand and provisioned modes twice within 24 hours.

1. For a data stream with the provisioned mode, to edit the number of shards, choose **Edit provisioned shards** in the **Configuration** tab, and then enter a new shard count.

1. To enable server-side encryption of data records, choose **Edit** in the **Server-side encryption** section. Choose a KMS key to use as the master key for encryption, or use the default master key, **aws/kinesis**, managed by Kinesis. If you enable encryption for a stream and use your own AWS KMS master key, ensure that your producer and consumer applications have access to the AWS KMS master key that you used. To assign permissions to an application to access a user-generated AWS KMS key, see [Permissions to use user-generated KMS keys](permissions-user-key-KMS.md).

1. To edit the data retention period, choose **Edit** in the **Data retention period** section, and then enter a new data retention period.

1. If you have enabled custom metrics on your account, choose **Edit** in the **Shard level metrics** section, and then specify metrics for your stream. For more information, see [Monitor the Amazon Kinesis Data Streams service with Amazon CloudWatch](monitoring-with-cloudwatch.md).

## Use the API
<a name="update-stream-api"></a>

To update stream details using the API, see the following methods:
+ [AddTagsToStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_AddTagsToStream.html)
+ [DecreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DecreaseStreamRetentionPeriod.html)
+ [DisableEnhancedMonitoring](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DisableEnhancedMonitoring.html)
+ [EnableEnhancedMonitoring](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_EnableEnhancedMonitoring.html)
+ [IncreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_IncreaseStreamRetentionPeriod.html)
+ [RemoveTagsFromStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RemoveTagsFromStream.html)
+ [StartStreamEncryption](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StartStreamEncryption.html)
+ [StopStreamEncryption](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StopStreamEncryption.html)
+ [UpdateShardCount](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UpdateShardCount.html)

## Use the AWS CLI
<a name="update-stream-cli"></a>

For information about updating a stream using the AWS CLI, see the [Kinesis CLI reference](https://docs.aws.amazon.com/cli/latest/reference/kinesis/index.html). 

# List streams
<a name="kinesis-using-sdk-java-list-streams"></a>

Streams are scoped to the AWS account associated with the AWS credentials used to instantiate the Kinesis Data Streams client and also to the Region specified for the client. An AWS account could have many streams active at one time. You can list your streams in the Kinesis Data Streams console, or programmatically. The code in this section shows how to list all the streams for your AWS account. 

```
ListStreamsRequest listStreamsRequest = new ListStreamsRequest();
listStreamsRequest.setLimit(20); 
ListStreamsResult listStreamsResult = client.listStreams(listStreamsRequest);
List<String> streamNames = listStreamsResult.getStreamNames();
```

This code example first creates a new instance of `ListStreamsRequest` and calls its `setLimit` method to specify that a maximum of 20 streams should be returned for each call to `listStreams`. If you do not specify a value for `setLimit`, Kinesis Data Streams returns a number of streams less than or equal to the number in the account. The code then passes `listStreamsRequest` to the `listStreams` method of the client. The return value `listStreams` is stored in a `ListStreamsResult` object. The code calls the `getStreamNames` method on this object and stores the returned stream names in the `streamNames` list. Note that Kinesis Data Streams might return fewer streams than specified by the specified limit even if there are more streams than that in the account and Region. To ensure that you retrieve all the streams, use the `getHasMoreStreams` method as described in the next code example. 

```
while (listStreamsResult.getHasMoreStreams()) 
{
    if (streamNames.size() > 0) {
      listStreamsRequest.setExclusiveStartStreamName(streamNames.get(streamNames.size() - 1));
    }
    listStreamsResult = client.listStreams(listStreamsRequest);
    streamNames.addAll(listStreamsResult.getStreamNames());
}
```

This code calls the `getHasMoreStreams` method on `listStreamsRequest` to check if there are additional streams available beyond the ones returned in the initial call to `listStreams`. If so, the code calls the `setExclusiveStartStreamName` method with the name of the last stream that was returned in the previous call to `listStreams`. The `setExclusiveStartStreamName` method causes the next call to `listStreams` to start after that stream. The group of stream names returned by that call is then added to the `streamNames` list. This process continues until all the stream names have been collected in the list.

 The streams returned by `listStreams` can be in one of the following states: 
+ `CREATING`
+ `ACTIVE`
+ `UPDATING`
+ `DELETING`

You can check the state of a stream using the `describeStream` method, as shown in the previous section, [Create a stream using the APIs](kinesis-using-sdk-java-create-stream.md).

# List shards
<a name="kinesis-using-sdk-java-list-shards"></a>

A data stream can have one or more shards. The recommended method for listing or retrieving the shards from a data stream is to use the [ListShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html) API. The following example shows how you can get a list of the shards in a data stream. For a full description of the main operation used in this example and all of the parameters you can set for the operation, see [ListShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html).

```
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;

import java.util.concurrent.TimeUnit;

public class ShardSample {

    public static void main(String[] args) {

        KinesisAsyncClient client = KinesisAsyncClient.builder().build();

        ListShardsRequest request = ListShardsRequest
                .builder().streamName("myFirstStream")
                .build();

        try {
            ListShardsResponse response = client.listShards(request).get(5000, TimeUnit.MILLISECONDS);
            System.out.println(response.toString());
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}
```

To run the previous code example you can use a POM file like the following one.

```
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>kinesis.data.streams.samples</groupId>
    <artifactId>shards</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>kinesis</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>
</project>
```

With the `ListShards` API, you can use the [ShardFilter](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html) parameter to filter out the response of the API. You can only specify one filter at a time. 

If you use the `ShardFilter` parameter when invoking the ListShards API, the `Type` is the required property and must be specified. If you specify the `AT_TRIM_HORIZON`, `FROM_TRIM_HORIZON`, or `AT_LATEST` types, you do not need to specify either the `ShardId` or the `Timestamp` optional properties.

If you specify the `AFTER_SHARD_ID` type, you must also provide the value for the optional `ShardId` property. The `ShardId` property is identical in functionality to the `ExclusiveStartShardId` parameter of the ListShards API. When `ShardId` property is specified, the response includes the shards starting with the shard whose ID immediately follows the `ShardId` that you provided.

If you specify the `AT_TIMESTAMP` or `FROM_TIMESTAMP_ID` type, you must also provide the value for the optional `Timestamp` property. If you specify the `AT_TIMESTAMP` type, then all shards that were open at the provided timestamp are returned. If you specify the `FROM_TIMESTAMP` type, then all shards starting from the provided timestamp to TIP are returned. 

**Important**  
`DescribeStreamSummary` and `ListShard` APIs provide a more scalable way to retrieve information about your data streams. More specifically, the quotas for the DescribeStream API can cause throttling. For more information, see [Quotas and limits](service-sizes-and-limits.md). Note also that `DescribeStream` quotas are shared across all applications that interact with all data streams in your AWS account. The quotas for the ListShards API, on the other hand, are specific to a single data stream. So not only do you get higher TPS with the ListShards API, but the action scales better as you create more data streams.  
We recommend that you migrate all of your producers and consumers that call the DescribeStream API to instead invoke the DescribeStreamSummary and the ListShard APIs. To identify these producers and consumers, we recommend using Athena to parse CloudTrail logs as user agents for KPL and KCL are captured in the API calls.   

```
SELECT useridentity.sessioncontext.sessionissuer.username, 
useridentity.arn,eventname,useragent, count(*) FROM 
cloudtrail_logs WHERE Eventname IN ('DescribeStream')  AND 
eventtime
    BETWEEN ''
        AND ''
GROUP BY  useridentity.sessioncontext.sessionissuer.username,useridentity.arn,eventname,useragent
ORDER BY  count(*) DESC LIMIT 100
```
We also recommend that the AWS Lambda and Amazon Firehose integrations with Kinesis Data Streams that invoke the `DescribeStream` API are reconfigured so that the integrations instead invoke `DescribeStreamSummary` and `ListShards`. Specifically, for AWS Lambda, you must update your event source mapping. For Amazon Firehose, the corresponding IAM permissions must be updated so that they include the `ListShards` IAM permission. 

# Delete a stream
<a name="kinesis-using-sdk-java-delete-stream"></a>

You can delete a stream with the Kinesis Data Streams console, or programmatically. To delete a stream programmatically, use `DeleteStreamRequest`, as shown in the following code.

```
DeleteStreamRequest deleteStreamRequest = new DeleteStreamRequest();
deleteStreamRequest.setStreamName(myStreamName);
client.deleteStream(deleteStreamRequest);
```

Shut down any applications that are operating on the stream before you delete it. If an application attempts to operate on a deleted stream, it receives `ResourceNotFound` exceptions. Also, if you subsequently create a new stream that has the same name as your previous stream, and applications that were operating on the previous stream are still running, these applications might try to interact with the new stream as though it were the previous stream—with unpredictable results.

# Reshard a stream
<a name="kinesis-using-sdk-java-resharding"></a>

**Important**  
You can reshard your stream using the [UpdateShardCount](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UpdateShardCount.html) API. Otherwise, you can continue to perform splits and merges as explained here.

Amazon Kinesis Data Streams supports *resharding*, which lets you adjust the number of shards in your stream to adapt to changes in the rate of data flow through the stream. Resharding is considered an advanced operation. If you are new to Kinesis Data Streams, return to this subject after you are familiar with all the other aspects of Kinesis Data Streams.

There are two types of resharding operations: shard split and shard merge. In a shard split, you divide a single shard into two shards. In a shard merge, you combine two shards into a single shard. Resharding is always *pairwise* in the sense that you cannot split into more than two shards in a single operation, and you cannot merge more than two shards in a single operation. The shard or pair of shards that the resharding operation acts on are referred to as *parent* shards. The shard or pair of shards that result from the resharding operation are referred to as *child* shards. 

Splitting increases the number of shards in your stream and therefore increases the data capacity of the stream. Because you are charged on a per-shard basis, splitting increases the cost of your stream. Similarly, merging reduces the number of shards in your stream and therefore decreases the data capacity—and cost—of the stream. 

Resharding is typically performed by an administrative application that is distinct from the producer (put) applications and the consumer (get) applications. Such an administrative application monitors the overall performance of the stream based on metrics provided by Amazon CloudWatch or based on metrics collected from the producers and consumers. The administrative application also needs a broader set of IAM permissions than the consumers or producers because the consumers and producers usually should not need access to the APIs used for resharding. For more information about IAM permissions for Kinesis Data Streams, see [Controlling access to Amazon Kinesis Data Streams resources using IAM](controlling-access.md). 

For more information about resharding, see [How do I change the number of open shards in Kinesis Data Streams?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-streams-open-shards/)

**Topics**
+ [Decide on a strategy for resharding](kinesis-using-sdk-java-resharding-strategies.md)
+ [Split a shard](kinesis-using-sdk-java-resharding-split.md)
+ [Merge two shards](kinesis-using-sdk-java-resharding-merge.md)
+ [Complete the resharding action](kinesis-using-sdk-java-after-resharding.md)

# Decide on a strategy for resharding
<a name="kinesis-using-sdk-java-resharding-strategies"></a>

The purpose of resharding in Amazon Kinesis Data Streams is to enable your stream to adapt to changes in the rate of data flow. You split shards to increase the capacity (and cost) of your stream. You merge shards to reduce the cost (and capacity) of your stream.

 One approach to resharding could be to split every shard in the stream—which would double the stream's capacity. However, this might provide more additional capacity than you actually need and therefore create unnecessary cost. 

You can also use metrics to determine which are your *hot* or *cold* shards, that is, shards that are receiving much more data, or much less data, than expected. You could then selectively split the hot shards to increase capacity for the hash keys that target those shards. Similarly, you could merge cold shards to make better use of their unused capacity.

You can obtain some performance data for your stream from the Amazon CloudWatch metrics that Kinesis Data Streams publishes. However, you can also collect some of your own metrics for your streams. One approach would be to log the hash key values generated by the partition keys for your data records. Recall that you specify the partition key at the time that you add the record to the stream. 

```
putRecordRequest.setPartitionKey( String.format( "myPartitionKey" ) );
```

Kinesis Data Streams uses [MD5](http://en.wikipedia.org/wiki/MD5) to compute the hash key from the partition key. Because you specify the partition key for the record, you could use MD5 to compute the hash key value for that record and log it. 

You could also log the IDs of the shards that your data records are assigned to. The shard ID is available by using the `getShardId` method of the `putRecordResults` object returned by the `putRecords` method, and the `putRecordResult` object returned by the `putRecord` method.

```
String shardId = putRecordResult.getShardId();
```

With the shard IDs and the hash key values, you can determine which shards and hash keys are receiving the most or least traffic. You can then use resharding to provide more or less capacity, as appropriate for these keys.

# Split a shard
<a name="kinesis-using-sdk-java-resharding-split"></a>

To split a shard in Amazon Kinesis Data Streams, you need to specify how hash key values from the parent shard should be redistributed to the child shards. When you add a data record to a stream, it is assigned to a shard based on a hash key value. The hash key value is the [MD5](http://en.wikipedia.org/wiki/MD5) hash of the partition key that you specify for the data record at the time that you add the data record to the stream. Data records that have the same partition key also have the same hash key value.

The possible hash key values for a given shard constitute a set of ordered contiguous non-negative integers. This range of possible hash key values is given by the following: 

```
shard.getHashKeyRange().getStartingHashKey();
shard.getHashKeyRange().getEndingHashKey();
```

When you split the shard, you specify a value in this range. That hash key value and all higher hash key values are distributed to one of the child shards. All the lower hash key values are distributed to the other child shard. 

The following code demonstrates a shard split operation that redistributes the hash keys evenly between each of the child shards, essentially splitting the parent shard in half. This is just one possible way of dividing the parent shard. You could, for example, split the shard so that the lower one-third of the keys from the parent go to one child shard and the upper two-thirds of the keys go to the other child shard. However, for many applications, splitting shards in half is an effective approach. 

The code assumes that `myStreamName` holds the name of your stream and the object variable `shard` holds the shard to split. Begin by instantiating a new `splitShardRequest` object and setting the stream name and shard ID.

```
SplitShardRequest splitShardRequest = new SplitShardRequest();
splitShardRequest.setStreamName(myStreamName);
splitShardRequest.setShardToSplit(shard.getShardId());
```

Determine the hash key value that is half-way between the lowest and highest values in the shard. This is the starting hash key value for the child shard that will contain the upper half of the hash keys from the parent shard. Specify this value in the `setNewStartingHashKey` method. You need specify only this value. Kinesis Data Streams automatically distributes the hash keys below this value to the other child shard that is created by the split. The last step is to call the `splitShard` method on the Kinesis Data Streams client.

```
BigInteger startingHashKey = new BigInteger(shard.getHashKeyRange().getStartingHashKey());
BigInteger endingHashKey   = new BigInteger(shard.getHashKeyRange().getEndingHashKey());
String newStartingHashKey  = startingHashKey.add(endingHashKey).divide(new BigInteger("2")).toString();

splitShardRequest.setNewStartingHashKey(newStartingHashKey);
client.splitShard(splitShardRequest);
```

The first step after this procedure is shown in [Wait for a stream to become active again](kinesis-using-sdk-java-after-resharding.md#kinesis-using-sdk-java-resharding-wait-until-active). 

# Merge two shards
<a name="kinesis-using-sdk-java-resharding-merge"></a>

 A shard merge operation takes two specified shards and combines them into a single shard. After the merge, the single child shard receives data for all hash key values covered by the two parent shards. 

**Shard Adjacency**  
To merge two shards, the shards must be *adjacent*. Two shards are considered adjacent if the union of the hash key ranges for the two shards forms a contiguous set with no gaps. For example, suppose that you have two shards, one with a hash key range of 276...381 and the other with a hash key range of 382...454. You could merge these two shards into a single shard that would have a hash key range of 276...454. 

To take another example, suppose that you have two shards, one with a hash key range of 276..381 and the other with a hash key range of 455...560. You could not merge these two shards because there would be one or more shards between these two that cover the range 382..454. 

The set of all `OPEN` shards in a stream—as a group—always spans the entire range of MD5 hash key values. For more information about shard states—such as `CLOSED`—see [Consider data routing, data persistence, and shard state after a reshard](kinesis-using-sdk-java-after-resharding.md#kinesis-using-sdk-java-resharding-data-routing). 

To identify shards that are candidates for merging, you should filter out all shards that are in a `CLOSED` state. Shards that are `OPEN`—that is, not `CLOSED`—have an ending sequence number of `null`. You can test the ending sequence number for a shard using: 

```
if( null == shard.getSequenceNumberRange().getEndingSequenceNumber() ) 
{
  // Shard is OPEN, so it is a possible candidate to be merged.
}
```

After filtering out the closed shards, sort the remaining shards by the highest hash key value supported by each shard. You can retrieve this value using: 

```
shard.getHashKeyRange().getEndingHashKey();
```

 If two shards are adjacent in this filtered, sorted list, they can be merged. 

**Code for the Merge Operation**  
 The following code merges two shards. The code assumes that `myStreamName` holds the name of your stream and the object variables `shard1` and `shard2` hold the two adjacent shards to merge.

For the merge operation, begin by instantiating a new `mergeShardsRequest` object. Specify the stream name with the `setStreamName` method. Then specify the two shards to merge using the `setShardToMerge` and `setAdjacentShardToMerge` methods. Finally, call the `mergeShards` method on the Kinesis Data Streams client to carry out the operation.

```
MergeShardsRequest mergeShardsRequest = new MergeShardsRequest();
mergeShardsRequest.setStreamName(myStreamName);
mergeShardsRequest.setShardToMerge(shard1.getShardId());
mergeShardsRequest.setAdjacentShardToMerge(shard2.getShardId());
client.mergeShards(mergeShardsRequest);
```

The first step after this procedure is shown in [Wait for a stream to become active again](kinesis-using-sdk-java-after-resharding.md#kinesis-using-sdk-java-resharding-wait-until-active).

# Complete the resharding action
<a name="kinesis-using-sdk-java-after-resharding"></a>

After any kind of resharding procedure in Amazon Kinesis Data Streams, and before normal record processing resumes, other procedures and considerations are required. The following sections describe these.

**Topics**
+ [Wait for a stream to become active again](#kinesis-using-sdk-java-resharding-wait-until-active)
+ [Consider data routing, data persistence, and shard state after a reshard](#kinesis-using-sdk-java-resharding-data-routing)

## Wait for a stream to become active again
<a name="kinesis-using-sdk-java-resharding-wait-until-active"></a>

After you call a resharding operation, either `splitShard` or `mergeShards`, you must wait for the stream to become active again. The code to use is the same as when you wait for a stream to become active after [creating a stream](kinesis-using-sdk-java-create-stream.md). That code is as follows:

```
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName( myStreamName );

long startTime = System.currentTimeMillis();
long endTime = startTime + ( 10 * 60 * 1000 );
while ( System.currentTimeMillis() < endTime ) 
{
  try {
    Thread.sleep(20 * 1000);
  } 
  catch ( Exception e ) {}
  
  try {
    DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest );
    String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus();
    if ( streamStatus.equals( "ACTIVE" ) ) {
      break;
    }
   //
    // sleep for one second
    //
    try {
      Thread.sleep( 1000 );
    }
    catch ( Exception e ) {}
  }
  catch ( ResourceNotFoundException e ) {}
}
if ( System.currentTimeMillis() >= endTime ) 
{
  throw new RuntimeException( "Stream " + myStreamName + " never went active" );
}
```

## Consider data routing, data persistence, and shard state after a reshard
<a name="kinesis-using-sdk-java-resharding-data-routing"></a>

Kinesis Data Streams is a real-time data streaming service. Your applications should assume that data is flowing continuously through the shards in your stream. When you reshard, data records that were flowing to the parent shards are re-routed to flow to the child shards based on the hash key values that the data-record partition keys map to. However, any data records that were in the parent shards before the reshard remain in those shards. The parent shards do not disappear when the reshard occurs. They persist along with the data they contained before the reshard. The data records in the parent shards are accessible using the [`getShardIterator` and `getRecords`](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data) operations in the Kinesis Data Streams API, or through the Kinesis Client Library.

**Note**  
Data records are accessible from the time they are added to the stream to the current retention period. This holds true regardless of any changes to the shards in the stream during that time period. For more information about a stream’s retention period, see [Change the data retention period](kinesis-extended-retention.md).

In the process of resharding, a parent shard transitions from an `OPEN` state to a `CLOSED` state to an `EXPIRED` state. 
+  **OPEN**: Before a reshard operation, a parent shard is in the `OPEN` state, which means that data records can be both added to the shard and retrieved from the shard.
+  **CLOSED**: After a reshard operation, the parent shard transitions to a `CLOSED` state. This means that data records are no longer added to the shard. Data records that would have been added to this shard are now added to a child shard instead. However, data records can still be retrieved from the shard for a limited time. 
+  **EXPIRED**: After the stream's retention period has expired, all the data records in the parent shard have expired and are no longer accessible. At this point, the shard itself transitions to an `EXPIRED` state. Calls to `getStreamDescription().getShards` to enumerate the shards in the stream do not include `EXPIRED` shards in the list shards returned. For more information about a stream’s retention period, see [Change the data retention period](kinesis-extended-retention.md).

After the reshard has occurred and the stream is again in an `ACTIVE` state, you could immediately begin to read data from the child shards. However, the parent shards that remain after the reshard might still contain data that you haven't read yet that was added to the stream before the reshard. If you read data from the child shards before having read all data from the parent shards, you could read data for a particular hash key out of the order given by the data records' sequence numbers. Therefore, assuming that the order of the data is important, you should, after a reshard, always continue to read data from the parent shards until it is exhausted. Only then should you begin reading data from the child shards. When `getRecordsResult.getNextShardIterator` returns `null`, it indicates that you have read all the data in the parent shard. 

# Change the data retention period
<a name="kinesis-extended-retention"></a>

Amazon Kinesis Data Streams supports changes to the data record retention period of your data stream. A Kinesis data stream is an ordered sequence of data records meant to be written to and read from in real time. Data records are therefore stored in shards in your stream temporarily. The time period from when a record is added to when it is no longer accessible is called the *retention period*. A Kinesis data stream stores records from 24 hours by default, up to 8760 hours (365 days). 

You can update the retention period via the Kinesis Data Streams console or by using the [IncreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_IncreaseStreamRetentionPeriod.html) and the [DecreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DecreaseStreamRetentionPeriod.html) operations. With the Kinesis Data Streams console, you can bulk edit the retention period of more than one data stream at the same time. You can increase the retention period up to a maximum of 8760 hours (365 days) using the [IncreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_IncreaseStreamRetentionPeriod.html) operation or the Kinesis Data Streams console. You can decrease the retention period down to a minimum of 24 hours using the [DecreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DecreaseStreamRetentionPeriod.html) operation or the Kinesis Data Streams console. The request syntax for both operations includes the stream name and the retention period in hours. Finally, you can check the current retention period of a stream by calling the [DescribeStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html) operation.

The following is an example of changing the retention period using the AWS CLI:

```
aws kinesis increase-stream-retention-period --stream-name retentionPeriodDemo --retention-period-hours 72
```

Kinesis Data Streams stops making records inaccessible at the old retention period within several minutes of increasing the retention period. For example, changing the retention period from 24 hours to 48 hours means that records added to the stream 23 hours 55 minutes prior are still available after 24 hours.

Kinesis Data Streams almost immediately makes records older than the new retention period inaccessible upon decreasing the retention period. Therefore, take great care when calling the [DecreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DecreaseStreamRetentionPeriod.html) operation.

Set your data retention period to ensure that your consumers are able to read data before it expires, if problems occur. You should carefully consider all possibilities, such as an issue with your record processing logic or a downstream dependency being down for a long period of time. Think of the retention period as a safety net to allow more time for your data consumers to recover. The retention period API operations allow you to set this up proactively or to respond to operational events reactively.

 Additional charges apply for streams with a retention period set above 24 hours. For more information, see [Amazon Kinesis Data Streams Pricing](https://aws.amazon.com/kinesis/data-streams/pricing/).

# Tag your Amazon Kinesis Data Streams resources
<a name="tagging"></a>

You can assign your own metadata to streams and enhanced fan-out consumers you create in Amazon Kinesis Data Streams in the form of *tags*. A tag is a key-value pair that you define for a stream. Using tags is a simple yet powerful way to manage AWS resources and organize data, including billing data.

**Topics**
+ [Review tag basics](#tagging-basics)
+ [Track costs using tagging](#tagging-billing)
+ [Understand tag restrictions](#tagging-restrictions)
+ [Tag streams using the Kinesis Data Streams console](#tagging-console)
+ [Tag streams using the AWS CLI](#tagging-cli)
+ [Tag streams using the Kinesis Data Streams APIs](#tagging-api)
+ [Tag consumers using the AWS CLI](#tagging-consumers-cli)
+ [Tag consumers using the Kinesis Data Streams APIs](#tagging-consumers-api)

## Review tag basics
<a name="tagging-basics"></a>

The Kinesis Data Streams resources you can tag include data streams and enhanced fan-out consumers. You use the Kinesis Data Streams console, AWS CLI, or Kinesis Data Streams API to complete the following tasks:
+ Create a resource with tags
+ Add tags to a resource
+ List the tags for your resources
+ Remove tags from a resource

**Note**  
You can't apply tags to enhanced fan-out consumers using the Kinesis Data Streams console. For applying tags to consumers, use AWS CLI or Kinesis Data Streams API. 

You can use tags to categorize your resources. For example, you can categorize resources by purpose, owner, or environment. Because you define the key and value for each tag, you can create a custom set of categories to meet your specific needs. For example, you might define a set of tags that helps you track resources by owner and associated application. Here are several examples of tags:
+ Project: Project name
+ Owner: Name
+ Purpose: Load testing 
+ Application: Application name
+ Environment: Production 

**Important**  
To add tags while creating a stream, you must include the `kinesis:CreateStream` and `kinesis:AddTagsToStream` permissions for that stream. You **can't use** the `kinesis:TagResource` permission to tag streams while creating them.
To add tags during consumer registration, you must include the `kinesis:TagResource` and `kinesis:RegisterStreamConsumer` permissions.

## Track costs using tagging
<a name="tagging-billing"></a>

You can use tags to categorize and track your AWS costs. When you apply tags to your Kinesis Data Streams resources, your AWS cost allocation report includes usage and costs aggregated by tags. You can apply tags that represent business categories, such as cost centers, application names, or owners, to organize your costs across multiple services. For more information, see [Use Cost Allocation Tags for Custom Billing Reports](https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/cost-alloc-tags.html) in the *AWS Billing User Guide*.

## Understand tag restrictions
<a name="tagging-restrictions"></a>

The following restrictions apply to tags:

**Basic restrictions**
+ The maximum number of tags for each resource is 50.
+ Tag keys and values are case-sensitive.
+ You can't change or edit tags for a deleted resource.

**Tag key restrictions**
+ Each tag key must be unique. If you add a tag with a key that's already in use, your new tag overwrites the existing key-value pair. 
+ You can't start a tag key with `aws:` because this prefix is reserved for use by AWS. AWS creates tags that begin with this prefix on your behalf, but you can't edit or delete them.
+ Tag keys must be between 1 and 128 Unicode characters in length.
+ Tag keys must consist of the following characters: Unicode letters, digits, white space, and the following special characters: `_ . / = + - @`.

**Tag value restrictions**
+ Tag values must be between 0 and 255 Unicode characters in length.
+ Tag values can be blank. Otherwise, they must consist of the following characters: Unicode letters, digits, white space, and any of the following special characters: `_ . / = + - @`.

## Tag streams using the Kinesis Data Streams console
<a name="tagging-console"></a>

You can add, update, list, and remove tags on your streams using the Kinesis Data Streams console.

**To view the tags for a stream**

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. On the left navigation pane, choose **Data streams**.

1. On the **Data streams** page, choose the stream that you want to tag.

1. On the Stream details page, choose **Configuration**.

1. In the **Tags** section, view the tags applied to the stream.

**To create a data stream with a tag**

1. Open the Kinesis Data Streams console.

1. On the left navigation pane, choose **Data streams**.

1. Choose **Create data stream**.

1. On the **Create data stream** page, enter a name for your data stream.

1. For **Data stream capacity**, choose either the **On-demand** or the **Provisioned** capacity mode.

   For more information about capacity modes, see [Choose the right mode to stream in](how-do-i-size-a-stream.md).

1. In the **Tags** section, do the following:

   1. Choose **Add new tag**.

   1. For **Key**, enter the tag and optionally specify a value in the **Value** field.

      If you see an error, either the tag key or value that you specified doesn't meet the tag restrictions. For more information, see [Understand tag restrictions](#tagging-restrictions).

1. Choose **Create data stream**.

**To add or update a tag on a stream**

1. Open the Kinesis Data Streams console.

1. On the left navigation pane, choose **Data streams**.

1. On the **Data streams** page, choose the stream to which you want to add or update tags.

1. On the Stream details page, choose **Configuration**.

1. In the **Tags** section, choose **Manage tags**.

1. Under **Tags**, do one of the following:
   + To add a new tag, choose **Add new tag**, and then enter the tag's **Key** and **Value** data. Repeat this step as many times as necessary.

     The maximum number of tags you can add for each stream is 50.
   + To update an existing tag, enter a new tag value in the **Value** field of that tag's **Key**.

   If you see an error, either the tag key or value that you specified doesn't meet the tag restrictions. For more information, see [Understand tag restrictions](#tagging-restrictions).

1. Choose **Save changes**.

**To remove a tag from a stream**

1. Open the Kinesis Data Streams console.

1. On the left navigation pane, choose **Data streams**.

1. On the **Data streams** page, choose the stream from which you want to remove tags.

1. On the Stream details page, choose **Configuration**.

1. In the **Tags** section, choose **Manage tags**.

1. Find the tag **Key** and **Value** pair that you want to remove. Then, choose **Remove**.

1. Choose **Save changes**.

## Tag streams using the AWS CLI
<a name="tagging-cli"></a>

You can add, list, and remove tags on your streams using the AWS CLI. For examples, see the following documentation.

 [create-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/create-stream.html)   
Creates a stream with tags.

 [add-tags-to-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/add-tags-to-stream.html)   
Adds or updates tags for the specified stream.

 [list-tags-for-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/list-tags-for-stream.html)  
Lists the tags for the specified stream.

 [remove-tags-from-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/remove-tags-from-stream.html)  
Removes tags from the specified stream.

## Tag streams using the Kinesis Data Streams APIs
<a name="tagging-api"></a>

You can add, list, and remove tags on your streams using the Kinesis Data Streams APIs. For examples, see the following documentation:

 [CreateStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html)   
Creates a stream with tags.

 [AddTagsToStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_AddTagsToStream.html)   
Adds or updates tags for the specified stream.

 [ListTagsForStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListTagsForStream.html)  
Lists the tags for the specified stream.

 [RemoveTagsFromStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RemoveTagsFromStream.html)  
Removes tags from the specified stream.

## Tag consumers using the AWS CLI
<a name="tagging-consumers-cli"></a>

You can add, list, and remove tags on your consumers using the AWS CLI. For examples, see the following documentation:

[register-stream-consumer](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/register-stream-consumer.html)  
Registers a consumer for a Kinesis data stream with tags. 

[tag-resource](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/tag-resource.html)  
Adds or updates tags for the specified Kinesis resource.

[list-tags-for-resource](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/list-tags-for-resource.html)  
Lists the tags for the specified Kinesis resource.

[untag-resource](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/untag-resource.html)  
Removes tags from the specified Kinesis resource.

## Tag consumers using the Kinesis Data Streams APIs
<a name="tagging-consumers-api"></a>

You can add, list, and remove tags on your consumers using the Kinesis Data Streams APIs. For examples, see the following documentation:

[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)  
Registers a consumer for a Kinesis data stream with tags.

[TagResource](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_TagResource.html)  
Adds or updates tags for the specified Kinesis resource.

[ListTagsForResource](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListTagsForResource.html)  
Lists the tags for the specified Kinesis resource.

[UntagResource](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UntagResource.html)  
Removes tags from the specified Kinesis resource.

# Handle large records
<a name="large-records"></a>

Amazon Kinesis Data Streams supports records up to 10 mebibytes (MiBs). This capability is recommended for processing intermittent data payloads that exceed the default 1 MiB record size limit. The default maximum record size for existing, and newly created streams is set to 1 MiB.

This feature benefits Internet of Things (IoT) applications, change data capture (CDC) pipelines, and machine learning workflows that require processing occasional larger data payloads. To start using large records in your stream, update your stream's maximum record size limit.

**Important**  
Individual shard throughput limit of 1 MB/s for writes, and 2 MB/s for reads remain unchanged with support for larger record sizes. Kinesis Data Streams is designed to accommodate intermittent large records alongside a baseline traffic of records less than, or equal to 1 MiB. It is not designed to accommodate sustained high-volume large record ingestion. 

## Update your stream to use large records
<a name="update-stream"></a>

**To process larger records with Kinesis Data Streams**

1. Navigate to the Kinesis Data Streams console.

1. Select your stream, and go to the **Configuration** tab.

1. Click **Edit**, which is located next to **Maximum record size**.

1. Set your maximum record size (up to 10 MiB).

1. Save your changes.

This setting only adjusts the maximum record size for this Kinesis data stream. Before increasing this limit, verify that all downstream applications can handle larger records.

You can also update this setting using the AWS CLI:

```
aws kinesis update-max-record-size \ --stream-arn  \
        --max-record-size-in-ki-b 5000
```

## Optimize your stream performance with large records
<a name="optimizing-performance"></a>

It's recommended to maintain large records to less than 2% of your overall traffic. In a stream, each shard has a throughput capacity of 1 MiB per second. To accommodate large records, Kinesis Data streams bursts up to 10 MiBs, while averaging out to 1 MiB per second. This capacity to support large records is continuously refilled into the stream. The rate of refilling depends on the size of the large records and the size of the baseline record. For best results, use a uniformly distributed partition key. For more information on how Kinesis on-demand scales, see [On-demand mode features and use cases](how-do-i-size-a-stream.html#ondemandmode).

## Mitigate throttling with large records
<a name="mitigate-throttling"></a>

**To mitigate throttling**

1. Implement retry logic with exponential back-off in your producer application.

1. Use randomized partition keys to distribute large records across available shards.

1. Store payloads in Amazon S3 and send only metadata references to the stream for continuous streams of large records. For more information, see [Processing large records with Amazon Kinesis Data Streams](https://aws.amazon.com/blogs/big-data/processing-large-records-with-amazon-kinesis-data-streams/).

## Handle large records using the Kinesis Data Streams APIs
<a name="records-apis"></a>

Large record support introduces one new API, and updates two existing control plane APIs to handle records up to 10 MiBs.

API for modifying record size:
+ `UpdateMaxRecordSize`: Configures the maximum record size limit for existing streams up to 10 MiBs.

Updates to existing APIs:
+ `CreateStream`: Adds the optional `MaxRecordSizeInKiB` parameter for setting record size limits during the stream creation.
+ `DescribeStreamSummary`: Returns the `MaxRecordSizeInKiB` field to show the current stream configuration.

All APIs listed maintain backward compatibility for existing streams. For complete API documentation, see the [Amazon Kinesis Data Streams Service API Reference](https://docs.aws.amazon.com/kinesis/latest/APIReference/Welcome.html).

## AWS components compatible with large records
<a name="record-compatability"></a>

The following AWS components are compatible with large records:


| Component | Description | 
| --- | --- | 
|  AWS SDK | AWS SDK supports handling large records. You can update your stream’s maximum record size up-to 10 MiB using available methods in the AWS SDKs. For more information, see [Using this service with an AWS SDK](https://docs.aws.amazon.com/streams/latest/dev/sdk-general-information-section.html). | 
|  Kinesis Consumer Library (KCL) | Starting with version 2.x, KCL supports handling large records. To use large record support, update the `maxRecordSize` of your stream, and use KCL. For more information, see [Use Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html). | 
|  Kinesis Producer Library (KPL) | Starting with version 1.0.5, KPL supports handling large records. To use large record support, update the maxRecordSize of your stream, and use KPL. For more information, see [Develop producers using the Amazon Kinesis Producer Library (KPL)](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html). | 
|  Amazon EMR | Amazon EMR with Apache Spark supports handling large records up to the Kinesis Data Streams limit (10 MiBs). To use large record support, use the `readStream` function. For more information, see [Amazon EMR and Amazon Kinesis integration](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-kinesis.html). | 
|  Amazon Data Firehose | When used with Kinesis Data Streams, the Amazon Data Firehose behavior with large records depends on the delivery destination: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/large-records.html) For applications requiring delivery to Snowflake or Redshift with large records, deliver the data to Amazon S3 first. After that, use Extract, Transform, Load (ETL) processes to load the data. For all other destinations, test the behavior with large records in a proof-of-concept environment before scaling to production usage. Handling large records varies by destination.   | 
|  AWS Lambda | AWS Lambda supports payloads up to 6 MiBs. This limit includes the Kinesis payload converted to base-64 encoding, and the metadata associated with Event Source Mapping (ESM). For records less than 6 MiBs, Lambda processes them using ESM with no additional configuration required. For records larger than 6 MiBs, Lambda processes them using an on-failure destination. You must configure an on-failure destination using ESM to handle records that exceed Lambda's processing limits. Each event sent to the on-failure destination is a JSON document that contains metadata regarding the failed invocation. It is recommended to create an on-failure destination in the ESM, regardless of record size. This ensures that no records are discarded. For more information, see [Configuring destinations for failed invocations](https://docs.aws.amazon.com/lambda/latest/dg/kinesis-on-failure-destination.html#kinesis-on-failure-destination-console). | 
|  Amazon Redshift | Amazon Redshift only supports record sizes less than 1 MiB when streaming data from Kinesis Data Streams. Records that exceed this limit are not be processed. Records that are not processed are logged as `sys_stream_scan_errors`. For more information, see [SYS\$1STREAM\$1SCAN\$1ERRORS](https://docs.aws.amazon.com/redshift/latest/dg/r_SYS_STREAM_SCAN_ERRORS.html). | 
|  Flink connector for Kinesis Data Streams | There are two approaches for consuming data from Kinesis Data Streams: the Kinesis source connector, and the Kinesis sink connector. The source connector supports handling records less than 1 MiB, and up to 10 MiBs. Do not use the sink connector for records larger than 1 MiB. For more information, see [Use connectors to move data in Amazon Managed Service for Apache Flink with the DataStream API](https://docs.aws.amazon.com/managed-flink/latest/java/how-connectors.html). | 

## Regions where large records are supported
<a name="supported-regions"></a>

This Amazon Kinesis Data Streams feature is available only in the following AWS Regions:


| AWS Region | Region Name | 
| --- | --- | 
|  eu-north-1 | Europe (Stockholm) | 
|  me-south-1 | Middle East (Bahrain) | 
|  ap-south-1 | Asia Pacific (Mumbai) | 
|  eu-west-3 | Europe (Paris) | 
|  ap-southeast-3 | Asia Pacific (Jakarta) | 
|  us-east-2 | US East (Ohio) | 
|  af-south-1 | Africa (Cape Town) | 
|  eu-west-1 | Europe (Ireland) | 
|  me-central-1 | Middle East (UAE) | 
|  eu-central-1 | Europe (Frankfurt) | 
|  sa-east-1 | South America (São Paulo) | 
|  ap-east-1 | Asia Pacific (Hong Kong) | 
|  ap-south-2 | Asia Pacific (Hyderabad) | 
|  us-east-1 | US East (N. Virginia) | 
|  ap-northeast-2 | Asia Pacific (Seoul) | 
|  ap-northeast-3 | Asia Pacific (Osaka) | 
|  eu-west-2 | Europe (London) | 
|  ap-southeast-4 | Asia Pacific (Melbourne) | 
|  ap-northeast-1 | Asia Pacific (Tokyo) | 
|  us-west-2 | US West (Oregon) | 
|  us-west-1 | US West (N. California) | 
|  ap-southeast-1 | Asia Pacific (Singapore) | 
|  ap-southeast-2 | Asia Pacific (Sydney) | 
|  il-central-1 | Israel (Tel Aviv) | 
|  ca-central-1 | Canada (Central) | 
|  ca-west-1 | Canada West (Calgary) | 
|  eu-south-2 | Europe (Spain) | 
|  cn-northwest-1 | China (Ningxia) | 
|  eu-central-2 | Europe (Zurich) | 
| us-gov-east-1 | AWS GovCloud (US-East) | 
| us-gov-west-1 | AWS GovCloud (US-West) | 

# Perform resilience testing with AWS Fault Injection Service
<a name="kinesis-fis"></a>

AWS Fault Injection Service is a fully managed service that helps you perform fault injection experiments on your AWS workloads. AWS FIS integration with Amazon Kinesis Data Streams enables you to test your application resilience against common Amazon Kinesis Data Streams API errors in a controlled environment. This capability allows you to validate error handling, retry logic, and monitor systems before encountering failures. For more information, see [What is AWS Fault Injection Service?](https://docs.aws.amazon.com/fis/latest/userguide/what-is.html).

**Actions**
+ API internal error: This injects internal errors into requests made by the the target IAM role. The specific response depends on each service and API. The action `aws:fis:inject-api-internal-error` creates `InternalFailure` errors (HTTP 500).
+ API throttle error: This injects internal errors into requests made by the the target IAM role. The specific response depends on each service and API. The action `aws:fis:inject-api-throttle-error` creates `ThrottlingException` errors (HTTP 400).
+ API unavailable error: This injects internal errors into requests made by the the target IAM role. The specific response depends on each service and API. The action `aws:fis:inject-api-unavailable-error` creates `ServiceUnavailable` errors (HTTP 503).
+ API provisioned throughput exception: This injects internal errors into requests made by the the target IAM role. The specific response depends on each service and API. The action `aws:kinesis:inject-api-provisioned-throughput-exception` creates `ProvisionedThroughputExceededException` errors (HTTP 400).
+ API expired iterator exception: This injects internal errors into requests made by the the target IAM role. The specific response depends on each service and API. The action `aws:kinesis:inject-api-expired-iterator-exception` creates `ExpiredIteratorException` errors (HTTP 400). 

For more information, see [ Amazon Kinesis Data Streams actions](https://docs.aws.amazon.com/fis/latest/userguide/fis-actions-reference.html#aws-kinesis-actions).

**Considerations**
+ You can use actions above with both provisioned, and on-demand offerings for Amazon Kinesis Data Streams.
+ Your streaming resumes once the experiment completes based on the duration selected. You can also stop a running experiment before it completes. Alternatively, you can define a stop condition to stop the experiment based on alarms that define the application health in a Amazon CloudWatch Application Insights.
+ You can test up to 280 streams.

For more information on regional support, see [AWS Fault Injection Service endpoints and quotas](https://docs.aws.amazon.com/general/latest/gr/fis.html).

# Provisioned throughput exception errors
<a name="kinesis-fis-provisioned-throughput"></a>

Provisioned throughput exceeded exception errors (HTTP 400) occur when the request rate for a Kinesis stream surpasses the throughput limits of one or more shards. Each shard has specific read, and write capacity limits, and exceeding those limits triggers this exception. Scenarios leading to this exception include: sudden spikes in data ingestion or consumption, insufficient shard capacity for the data volume being processed, or uneven distribution of partition keys.

**Recommendations for handling exceptions**
+ Implement exponential back-off and re-try mechanisms.
+ Increase the number of shards to accommodate higher throughput.
+ Ensure that there is proper distribution of partition keys.
+ Monitor stream metrics.

Additionally, using the Kinesis on-demand capacity mode helps to automatically adjust workloads, and minimize the occurrence of this exception. For more information, see [What is AWS Fault Injection Service?](https://docs.aws.amazon.com/fis/latest/userguide/what-is.html)

**Note**  
Improper distribution issues are outside of on-demand mode capability of automatic scaling.

**To perform a basic experiment**

1. Use baseline metrics: record normal throughput patterns before testing.

1. Create an experiment: use the `aws:kinesis:inject-api-provisioned-throughput-exception` action.

1. Configure the intensity: start with 25% request throttling.

1. Monitor the responses: verify re-try logic with exponential back-off.

1. Validate the scaling: confirm that auto-scaling triggers the activation.

1. Check the alarms: ensure that the `CloudWatch` alarms are running as expected.

Applications should implement proper back-off strategies, monitor `WriteProvisionedThroughputExceeded`, and `ReadProvisionedThroughputExceeded` metrics, and trigger shard scaling when appropriate.

**Action details**
+ **Resource Type**: IAM Role ARN
+ **Target Operations**: `PutRecord`, `PutRecords`, `GetRecords`
+ ****Error Code****: `ProvisionedThroughputExceededException`(HTTP 400)
+ ****Description****: simulates scenarios where request rate exceeds shard capacity limits, testing application throttling, and scaling responses.

**Parameters**
+ **IAM Role ARN**: the role that your application uses for Kinesis Data Streams operations.
+ **Operations**: target operations: `PutRecord`, `PutRecords`, `GetRecords`.
+ **Resource List**: the specific stream names or shard identifiers.
+ **Duration**: the experiment duration, which is the duration from one minute to 12 hours. In the AWS FIS API, the value is a string in ISO 8601 format. For example, PT1M represents one minute. In the AWS FIS console, you enter the number of seconds, minutes, or hours.
+ **Intensity**: the percentage of requests to throttle.

**Required permissions**
+ `kinesis:InjectApiError`

Example experiment template

 The following example shows a provisioned throughput exception for all requests up to 5 Kinesis Data streams with the specified tag. AWS FIS selects the streams to affect at random. After 5 minutes the fault is removed.

```
{
    "description": "Kinesis stream experiment",
    "targets": {
        "KinesisStreams-Target-1": {
            "resourceType": "aws:kinesis:stream",
            "resourceTags": {
                   "tag-key": "tag-value"
            },
            "selectionMode": "COUNT(5)"
        }
    },
    "actions": {
         "kinesis": {
              "actionId": "aws:kinesis:stream-provisioned-throughput-exception",
              "description": "my-stream",
              "parameters": {
                   "duration": "PT5M",
                   "percentage": "100",
                   "service": "kinesis"
              },
              "targets": {
                    "KinesisStreams": "KinesisStreams-Target-1"
              }
         }
   },
   "stopConditions": [
         {
              "source": "none"
         }
   ],
   "roleArn": "arn:aws:iam::111122223333:role/role-name",
   "tags": {},
   "experimentOptions": {
       "accountTargeting": "single-account",
       "emptyTargetResolutionMode": "fail"
   }    
}
```

Experiment role permissions example

The following permission allows you to run the `aws:kinesis:stream-provisioned-throughput-exception` and `aws:kinesis:stream-expired-iterator-exception` actions on a specific stream that impact 50% of requests.

# Expired iterator exception errors
<a name="kinesis-fis-expired-iterator"></a>

 Expired iterator exception errors (HTTP 400) occur when the shard iterator is expired, and is no longer used to retrieve stream records when calling `GetRecords`. This occurs when there are delays between read operations, which are caused by long-running data processing tasks, network issues, or application downtime. 

**Note**  
A shard iterator is valid for 5 minutes after the time it's issued.

**Recommendations for handling exceptions**
+ Refreshing shard iterators before they expire.
+ Incorporating error handling to obtain new iterators.
+ Utilizing the Kinesis Kinesis Client Library (KCL) which automatically manages shard iterator expiration.

For more information, see [What is AWS Fault Injection Service?](https://docs.aws.amazon.com/fis/latest/userguide/what-is.html)

**To perform a basic experiment**

1. Create an experiment template: use the AWS FIS console. 

1. Select the action: use the `aws:kinesis:inject-api-expired-iterator-exception` action.

1. Configure the targets: specify the IAM role and Kinesis Data Streamsoperations. 

1. Set the duration: start with 5-10 minutes for initial testing. 

1. Add stop conditions: [stop conditions for AWS FIS](https://docs.aws.amazon.com/fis/latest/userguide/stop-conditions.html).

1. Run the experiment: monitor the application behavior.

**Action details**
+ **Resource Type**: IAM Role ARN
+ **Target Operations**: `GetRecords`
+ ****Error Code****: `ExpiredIteratorException`(HTTP 400)
+ ****Description****: the provided iterator exceeds the maximum age allowed, simulating scenarios where record processing is too slow or checkpointing logic fails.

**Parameters**
+ **IAM Role ARN**: the role that your application uses for Kinesis Data Streams operations.
+ **Operations**: target operations: `GetRecords`
+ **Resource List**: the specific stream names or ARNs.
+ **Duration**: the experiment duration. This is configurable. 
+ **Intensity**: the percentage of requests to throttle.

**Required permissions**
+ `kinesis:InjectApiError`