

# Amazon MSK key features and concepts
<a name="operations"></a>

Amazon MSK Provisioned clusters offer a wide range of features and capabilities to help you optimize your cluster's performance and meet your streaming needs. The topics below describe these functionalities in detail.
+ The [AWS Management Console](https://console.aws.amazon.com/msk)
+ The [Amazon MSK API Reference](https://docs.aws.amazon.com/msk/1.0/apireference)
+ The [ Amazon MSK CLI Command Reference](https://docs.aws.amazon.com/cli/latest/reference/kafka/index.html)

**Topics**
+ [Amazon MSK broker types](broker-instance-types.md)
+ [Amazon MSK broker sizes](broker-instance-sizes.md)
+ [Storage management for Standard brokers](msk-storage-management.md)
+ [Amazon MSK Provisioned configuration](msk-configuration.md)
+ [Intelligent rebalancing for clusters](intelligent-rebalancing.md)
+ [Patching on MSK Provisioned clusters](patching-impact.md)
+ [Broker offline and client failover](troubleshooting-offlinebroker-clientfailover.md)
+ [Security in Amazon MSK](security.md)
+ [Amazon MSK logging](msk-logging.md)
+ [Metadata management](metadata-management.md)
+ [Topic Operations](msk-topic-operations-information.md)
+ [Amazon MSK resources](resources.md)
+ [Apache Kafka versions](kafka-versions.md)
+ [Troubleshoot your Amazon MSK cluster](troubleshooting.md)

# Amazon MSK broker types
<a name="broker-instance-types"></a>

MSK Provisioned offers two broker types - Standard and Express. Standard brokers give you the most flexibility to configure your clusters, while Express brokers offer more elasticity, throughput, resilience, and ease-of-use for running high performance streaming applications.

See the following topics for more details about each offering. The following table also highlights the key feature comparison between Standard and Express brokers.


| Feature | Standard broker | Express broker | 
| --- | --- | --- | 
|  [Storage Management](msk-storage-management.md)  |  Customer managed (Features include EBS storage, Tiered storage, Provisioned storage throughput, Auto-scaling, Storage capacity alerts)  |  Fully MSK managed  | 
|  [Supported instances](broker-instance-sizes.md)  |  T3, M5, M7g  |  M7g  | 
|  [Sizing and scaling considerations](bestpractices-intro.md)  | Throughput, connections, partitions, storage |  Throughput, connections, partitions  | 
| [Broker scaling](msk-update-broker-count.md) | Vertical and horizontal scaling | Vertical and horizontal scaling | 
|  [Kafka versions](kafka-versions.md)  |  See [Apache Kafka versions](kafka-versions.md)  |  Starts at version 3.6  | 
|  [Apache Kafka Configuration](msk-configuration.md)  |  More configurable  |  Mostly MSK managed for higher resilience  | 
| [Security](security.md) |  Encryption, Private/Public access, Authentication & Authorization - IAM, SASL/SCRAM, mTLS, plaintext, Kafka ACLs  |  Encryption, Private/Public access, Authentication & Authorization - IAM, SASL/SCRAM, mTLS, plaintext, Kafka ACLs  | 
| [Monitoring](monitoring.md) |  CloudWatch, Open Monitoring  |  CloudWatch, Open Monitoring  | 

**Note**  
You can't change an MSK Provisioned cluster from a Standard broker type to an Express broker type by switching the broker type using the MSK API. You have to create a new cluster with the desired broker type (Standard or Express).

**Topics**
+ [Amazon MSK Standard brokers](msk-broker-types-standard.md)
+ [Amazon MSK Express brokers](msk-broker-types-express.md)

# Amazon MSK Standard brokers
<a name="msk-broker-types-standard"></a>

Standard brokers for MSK Provisioned offer the most flexibility to configure your cluster's performance. You can choose from a wide range cluster configurations to achieve the availability, durability, throughput, and latency characteristics required for your applications. You can also provision storage capacity and increase it as needed. Amazon MSK handles the hardware maintenance of Standard brokers and attached storage resources, automatically repairing hardware issues that may arise. You can find more details in this document about various topics related to Standard brokers, including topics on [storage management](msk-storage-management.md), [configurations](msk-configuration-standard.md), and [maintenance](patching-impact.md#patching-standard-brokers).

# Amazon MSK Express brokers
<a name="msk-broker-types-express"></a>

Express brokers for MSK Provisioned make Apache Kafka simpler to manage, more cost-effective to run at scale, and more elastic with the low latency you expect. Brokers include pay-as-you-go storage that scales automatically and requires no sizing, provisioning, or proactive monitoring. Depending on the instance size selected, each broker node can provide up to 3x more throughput per broker, scale up to 20x faster, and recover 90% quicker compared to standard Apache Kafka brokers. Express brokers come pre-configured with Amazon MSK’s best practice defaults and enforce client throughput quotas to minimize resource contention between clients and Kafka’s background operations.

Here are some key factors and capabilities to consider when using Express brokers.
+ **No storage management**: Express brokers eliminate the need to [provision or manage any storage resources](msk-storage-management.md). You get elastic, virtually unlimited, pay-as-you-go, and fully managed storage. For high throughput use cases, you do not need to reason about the interactions between compute instances and storage volumes, and the associated throughput bottlenecks. These capabilities simplify cluster management and eliminate storage management operational overhead.
+ **Faster scaling**: Express brokers allow you to scale your cluster and move partitions up to 20x faster than on Standard brokers. This capability is crucial when you need to scale out your cluster to handle upcoming load spikes or scale in your cluster to reduce cost. See the sections on [expanding your cluster](msk-update-broker-count.md), [removing brokers](msk-remove-broker.md), [reassigning partitions](msk-update-broker-type.md), and [setting up LinkedIn’s Cruise Control for rebalancing](cruise-control.md) for more details on scaling your cluster.
+ **Higher throughput**: Express brokers offer up to 3x more throughput per broker than Standard brokers. For example, you can safely write data at up to 500 MBps with each m7g.16xlarge sized Express broker compared to 153.8 MBps on the equivalent Standard broker (both numbers assume sufficient bandwidth allocation towards background operations, such as replication and rebalancing).
+ **Configured for high resilience**: Express brokers automatically offer various best practices to improve your cluster’s resilience. These include guardrails on critical Apache Kafka configurations, throughput quotas, and capacity reservations for background operations and unplanned repairs. These capabilities make it safer and easier to run large scale Apache Kafka applications. See the sections on [Express broker configurations](msk-configuration-express.md) and [Amazon MSK Express broker quota](limits.md#msk-express-quota) for more details.
+ **No Maintenance windows**: There are no maintenance windows for Express brokers. Amazon MSK automatically updates your cluster hardware on an ongoing basis. See [Patching for Express brokers](https://docs.aws.amazon.com/msk/latest/developerguide/patching-impact.html#patching-express-brokers) for more details.

## Additional information about Express brokers
<a name="msk-broker-types-express-notes"></a>
+ Express brokers work with Apache Kafka APIs, but don't yet fully support KStreams API.
+ Express brokers are only available in a 3AZs configuration.
+ Express brokers are only available on select instance sizes. See [Amazon MSK pricing](https://aws.amazon.com/msk/pricing/) for the updated list.
+ Express brokers are supported on the following Apache Kafka versions: 3.6, 3.8, and 3.9.
+ Express brokers can be created with KRaft mode from Apache Kafka version 3.9 onwards.

**See these blogs**  
For more information about MSK Express brokers and to see a real-world example of Express brokers in use, read the following blogs:  
[Introducing Express brokers for Amazon MSK to deliver high throughput and faster scaling for your Kafka clusters](https://aws.amazon.com/blogs/aws/introducing-express-brokers-for-amazon-msk-to-deliver-high-throughput-and-faster-scaling-for-your-kafka-clusters/)
[Express brokers for Amazon MSK: Turbo-charged Kafka scaling with up to 20 times faster performance](https://aws.amazon.com/blogs/big-data/express-brokers-for-amazon-msk-turbo-charged-kafka-scaling-with-up-to-20-times-faster-performance/)  
This blog demonstrates how Express brokers:  
Provide faster throughput, rapid scaling, and improved recovery time from failures
Eliminate storage management complexities

# Amazon MSK broker sizes
<a name="broker-instance-sizes"></a>

When you create an Amazon MSK Provisioned cluster you specify the size of brokers that you want it to have. Depending on the [broker type](broker-instance-types.md), Amazon MSK supports the following broker sizes.

**Standard broker sizes**
+ kafka.t3.small
+ kafka.m5.large, kafka.m5.xlarge, kafka.m5.2xlarge, kafka.m5.4xlarge, kafka.m5.8xlarge, kafka.m5.12xlarge, kafka.m5.16xlarge, kafka.m5.24xlarge
+ kafka.m7g.large, kafka.m7g.xlarge, kafka.m7g.2xlarge, kafka.m7g.4xlarge, kafka.m7g.8xlarge, kafka.m7g.12xlarge, kafka.m7g.16xlarge

**Express broker sizes**
+ express.m7g.large, express.m7g.xlarge, express.m7g.2xlarge, express.m7g.4xlarge, express.m7g.8xlarge, express.m7g.12xlarge, express.m7g.16xlarge

**Note**  
Some broker sizes may not be available in certian AWS Regions. See the updated Broker Instance Pricing Tables on the [Amazon MSK pricing page](https://aws.amazon.com/msk/pricing/) for the latest list of available instances by Region.

## Other notes on broker sizes
<a name="broker-instance-sizes-other-notes"></a>
+ M7g brokers use AWS Graviton processors (custom Arm-based processors built by Amazon Web Services). M7g brokers offer improved price performance relative to comparable M5 instances. M7g brokers consume less power than comparable M5 instances.
+ Amazon MSK supports M7g brokers on MSK Provisioned clusters running 2.8.2 and 3.3.2 and higher Kafka versions.
+ M7g and M5 brokers have higher baseline throughput performance than T3 brokers and are recommended for production workloads. M7g and M5 brokers can also have more partitions per broker than T3 brokers. Use M7g or M5 brokers if you are running larger production-grade workloads or require a greater number of partitions. To learn more about M7g and M5 instance sizes, see [Amazon EC2 General Purpose Instances](https://aws.amazon.com/ec2/instance-types/).
+ T3 brokers have the ability to use CPU credits to temporarily burst performance. Use T3 brokers for low-cost development, if you are testing small to medium streaming workloads, or if you have low-throughput streaming workloads that experience temporary spikes in throughput. We recommend that you run a proof-of-concept test to determine if T3 brokers are sufficient for production or critical workload. To learn more about T3 broker sizes, see [Amazon EC2 T3 Instances](https://aws.amazon.com/ec2/instance-types/t3/).

For more information on how to choose broker sizes, see [Best practices for Standard and Express brokers](bestpractices-intro.md).

# Storage management for Standard brokers
<a name="msk-storage-management"></a>

Amazon MSK provides features to help you with storage management on your MSK clusters.

**Note**  
With [Express brokers](msk-broker-types-express.md), you don't need to provision or manage any storage resoures used for your data. This simplifies cluster management and eliminates one of the common causes of operational issues with Apache Kafka clusters. You also spend less as you don't have to provision idle storage capacity and you only pay for what you use.

**Standard broker type**  
With [Standard brokers](msk-broker-types-standard.md) you can choose from a variety of storage options and capabilities. Amazon MSK provides features to help you with storage management on your MSK clusters.

For information about managing throughput, see [Provision storage throughput for Standard brokers in a Amazon MSK cluster](msk-provision-throughput.md).

**Topics**
+ [Tiered storage for Standard brokers](msk-tiered-storage.md)
+ [Scale up Amazon MSK Standard broker storage](msk-update-storage.md)
+ [Manage storage throughput for Standard brokers in a Amazon MSK cluster](msk-provision-throughput-management.md)

# Tiered storage for Standard brokers
<a name="msk-tiered-storage"></a>

Tiered storage is a low-cost storage tier for Amazon MSK that scales to virtually unlimited storage, making it cost-effective to build streaming data applications.

You can create an Amazon MSK cluster configured with tiered storage that balances performance and cost. Amazon MSK stores streaming data in a performance-optimized primary storage tier until it reaches the Apache Kafka topic retention limits. Then, Amazon MSK automatically moves data into the new low-cost storage tier.

When your application starts reading data from the tiered storage, you can expect an increase in read latency for the first few bytes. As you start reading the remaining data sequentially from the low-cost tier, you can expect latencies that are similar to the primary storage tier. You don't need to provision any storage for the low-cost tiered storage or manage the infrastructure. You can store any amount of data and pay only for what you use. This feature is compatible with the APIs introduced in [KIP-405: Kafka Tiered Storage](https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage).

For information about sizing, monitoring, and optimizing your MSK tiered storage cluster, see [Best practices for running production workloads using Amazon MSK tiered storage](https://aws.amazon.com/blogs/big-data/best-practices-for-running-production-workloads-using-amazon-msk-tiered-storage/).

Here are some of the features of tiered storage:
+ You can scale to virtually unlimited storage. You don't have to guess how to scale your Apache Kafka infrastructure.
+ You can retain data longer in your Apache Kafka topics, or increase your topic storage, without the need to increase the number of brokers.
+ It provides a longer duration safety buffer to handle unexpected delays in processing.
+ You can reprocess old data in its exact production order with your existing stream processing code and Kafka APIs.
+ Partitions rebalance faster because data on secondary storage doesn't require replication across broker disks.
+ Data between brokers and the tiered storage moves within the VPC and doesn't travel through the internet.
+ A client machine can use the same process to connect to new clusters with tiered storage enabled as it does to connect to a cluster without tiered storage enabled. See [Create a client machine](https://docs.aws.amazon.com/msk/latest/developerguide/create-client-machine.html).

## Tiered storage requirements for Amazon MSK clusters
<a name="msk-tiered-storage-requirements"></a>
+ You must use Apache Kafka client version 3.0.0 or higher to create a new topic with tiered storage enabled. To transition an existing topic to tiered storage, you can reconfigure a client machine that uses a Kafka client version lower than 3.0.0 (minimum supported Apache Kafka version is 2.8.2.tiered) to enable tiered storage. See [Step 4: Create a topic in the Amazon MSK cluster](create-topic.md).
+ The Amazon MSK cluster with tiered storage enabled must use version 3.6.0 or higher, or 2.8.2.tiered.

## Tiered storage constraints and limitations for Amazon MSK clusters
<a name="msk-tiered-storage-constraints"></a>

Tiered storage has the following constraints and limitations:
+ Make sure clients are not configured to `read_committed` when reading from the remote\$1tier in Amazon MSK, unless the application is actively using the transactions feature.
+ Tiered storage isn't available in AWS GovCloud (US) regions.
+ Tiered storage applies only to provisioned mode clusters.
+ Tiered storage doesn’t support broker size t3.small.
+ The minimum retention period in low-cost storage is 3 days. There is no minimum retention period for primary storage.
+ Tiered storage doesn’t support Multiple Log directories on a broker (JBOD related features).
+ Tiered storage doesn't support compacted topics. Make sure that all topics that have tiered storage turned on have their cleanup.policy configured to 'DELETE' only.
+ Tiered storage cluster doesn’t support altering the log.cleanup.policy policy for a topic after it’s created.
+ Tiered storage can be disabled for individual topics but not for the entire cluster. Once disabled, tiered storage cannot be re-enabled for a topic.
+ If you use Amazon MSK version 2.8.2.tiered, you can migrate only to another tiered storage-supported Apache Kafka version. If you don't want to continue using a tiered storage-supported version, create a new MSK cluster and migrate your data to it.
+ The kafka-log-dirs tool can't report tiered storage data size. The tool only reports the size of the log segments in primary storage.

For information about default settings and constraints you must be mindful of when you configure tiered storage at the topic level, see [Guidelines for Amazon MSK tiered storage topic-level configuration](msk-guidelines-tiered-storage-topic-level-config.md).

# How log segments are copied to tiered storage for a Amazon MSK topic
<a name="msk-tiered-storage-retention-rules"></a>

When you enable tiered storage for a new or existing topic, Apache Kafka copies closed log segments from primary storage to tiered storage.
+ Apache Kafka only copies closed log segments. It copies all messages within the log segment to tiered storage.
+ Active segments are not eligible for tiering. The log segment size (segment.bytes) or the segment roll time (segment.ms) controls the rate of segment closure, and the rate Apache Kafka then copies them to tiered storage.

Retention settings for a topic with tiered storage enabled are different from settings for a topic without tiered storage enabled. The following rules control the retention of messages in topics with tiered storage enabled:
+ You define retention in Apache Kafka with two settings: log.retention.ms (time) and log.retention.bytes (size). These settings determine the total duration and size of the data that Apache Kafka retains in the cluster. Whether or not you enable tiered storage mode, you set these configurations at the cluster level. You can override the settings at the topic level with topic configurations.
+ When you enable tiered storage, you can additionally specify how long the primary high-performance storage tier stores data. For example, if a topic has overall retention (log.retention.ms) setting of 7 days and local retention (local.retention.ms) of 12 hours, then the cluster primary storage retains data for only the first 12 hours. The low-cost storage tier retains the data for the full 7 days.
+ The usual retention settings apply to the full log. This includes its tiered and primary parts.
+ The local.retention.ms or local.retention.bytes settings control the retention of messages in primary storage. Apache Kafka copies closed log segments to tiered storage as soon as they close (based on segment.bytes or segment.ms), independent of local retention settings. After segments are copied to tiered storage, they remain in primary storage until the local.retention.ms or local.retention.bytes thresholds are reached. At that point, the data is deleted from primary storage but remains available in tiered storage. This allows you to keep recent data on high-performance primary storage for fast access while older data is served from the low-cost tiered storage.
+ When Apache Kafka copies a message in a log segment to tiered storage, it removes the message from the cluster based on retention.ms or retention.bytes settings.

## Example Amazon MSK tiered storage scenario
<a name="msk-tiered-storage-retention-scenario"></a>

This scenario illustrates how an existing topic that has messages in primary storage behaves when tiered storage is enabled. You enable tiered storage on this topic by when you set remote.storage.enable to `true`. In this example, retention.ms is set to 5 days and local.retention.ms is set to 2 days. The following is the sequence of events when a segment expires.

**Time T0 - Before you enable tiered storage.**  
Before you enable tiered storage for this topic, there are two log segments. One of the segments is active for an existing topic partition 0.

![\[Time T0 - Before you enable tiered storage.\]](http://docs.aws.amazon.com/msk/latest/developerguide/images/tiered-storage-segments-1.png)


**Time T1 (< 2 days) - Tiered storage enabled. Segment 0 copied to tiered storage.**  
After you enable tiered storage for this topic, Apache Kafka copies closed log segment 0 to tiered storage as soon as it closes. The segment closes based on segment.bytes or segment.ms settings, not based on retention settings. Apache Kafka retains a copy in primary storage as well. The active segment 1 is not eligible to copy to tiered storage yet because it is still active and hasn't closed. In this timeline, Amazon MSK doesn't apply any of the retention settings yet for any of the messages in segment 0 and segment 1. (local.retention.bytes/ms, retention.ms/bytes)

![\[Time T1 (< 2 days) - Tiered storage enabled. Segment 0 copied to tiered storage.\]](http://docs.aws.amazon.com/msk/latest/developerguide/images/tiered-storage-segments-2.png)


**Time T2 - Local retention in effect.**  
After 2 days, the local retention threshold is reached for segment 0. The setting of local.retention.ms as 2 days determines this. Segment 0 is now deleted from primary storage, but it remains available in tiered storage. Note that segment 0 was already copied to tiered storage at Time T1 when it closed, not at Time T2 when local retention expired. Active segment 1 is neither eligible for deletion nor eligible to copy to tiered storage yet because it is still active.

![\[Time T2 - Local retention in effect.\]](http://docs.aws.amazon.com/msk/latest/developerguide/images/tiered-storage-segments-3.png)


**Time T3 - Overall retention in effect.**  
 After 5 days, retention settings take effect, and Kafka clears log segment 0 and associated messages from tiered storage. Segment 1 is neither eligible for expiration nor eligible to copy over to tiered storage yet because it is active. Segment 1 is not yet closed, so it is ineligible for segment roll.

![\[Time T3 - Overall retention in effect.\]](http://docs.aws.amazon.com/msk/latest/developerguide/images/tiered-storage-segments-4.png)


# Create a Amazon MSK cluster with tiered storage with the AWS Management Console
<a name="msk-create-cluster-tiered-storage-console"></a>

This process describes how to create a tiered storage Amazon MSK cluster using the AWS Management Console.

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. Choose **Create cluster**.

1. Choose **Custom create** for tiered storage.

1. Specify a name for the cluster.

1. In the **Cluster type**, select **Provisioned**.

1. Choose an Amazon Kafka version that supports tiered storage for Amazon MSK to use to create the cluster. 

1. Specify a size of broker other than **kafka.t3.small**.

1. Select the number of brokers that you want Amazon MSK to create in each Availability Zone. The minimum is one broker per Availability Zone, and the maximum is 30 brokers per cluster.

1. Specify the number of zones that brokers are distributed across.

1. Specify the number of Apache Kafka brokers that are deployed per zone.

1. Select **Storage options**. This includes **Tiered storage and EBS storage** to enable tiered storage mode.

1. Follow the remaining steps in the cluster creation wizard. When complete, **Tiered storage and EBS storage** appears as the cluster storage mode in the **Review and create** view.

1. Select **Create cluster**.

# Create an Amazon MSK cluster with tiered storage with the AWS CLI
<a name="msk-create-cluster-tiered-storage-cli"></a>

To enable tiered storage on a cluster, create the cluster with the correct Apache Kafka version and attribute for tiered storage. Follow the code example below. Also, complete the steps in the next section to [Create a Kafka topic with tiered storage enabled with the AWS CLI](#msk-create-topic-tiered-storage-cli).

See [create-cluster](https://docs.aws.amazon.com//cli/latest/reference/kafka/create-cluster.html) for a complete list of supported attributes for cluster creation.

```
aws kafka create-cluster \
 —cluster-name "MessagingCluster" \
 —broker-node-group-info file://brokernodegroupinfo.json \
 —number-of-broker-nodes 3 \
--kafka-version "3.6.0" \
--storage-mode "TIERED"
```

## Create a Kafka topic with tiered storage enabled with the AWS CLI
<a name="msk-create-topic-tiered-storage-cli"></a>

To complete the process that you started when you created a cluster with the tiered storage enabled, also create a topic with tiered storage enabled with the attributes in the later code example. The attributes specifically for tiered storage are the following:
+ `local.retention.ms` (for example, 10 mins) for time-based retention settings or `local.retention.bytes` for log segment size limits.
+ `remote.storage.enable` set to `true` to enable tiered storage.

The following configuration uses local.retention.ms, but you can replace this attribute with local.retention.bytes. This attribute controls the amount of time that can pass or number of bytes that Apache Kafka can copy before Apache Kafka copies the data from primary to tiered storage. See [Topic-level configuration](https://docs.aws.amazon.com//msk/latest/developerguide/msk-configuration-properties.html#msk-topic-confinguration) for more details on supported configuration attributes.

**Note**  
You must use the Apache Kafka client version 3.0.0 and above. These versions support a setting called `remote.storage.enable` only in those client versions of `kafka-topics.sh`. To enable tiered storage on an existing topic that uses an earlier version of Apache Kafka, see the section [Enabling tiered storage on an existing Amazon MSK topic](msk-enable-disable-topic-tiered-storage-cli.md#msk-enable-topic-tiered-storage-cli).

```
bin/kafka-topics.sh --create --bootstrap-server $bs --replication-factor 2 --partitions 6 --topic MSKTutorialTopic --config remote.storage.enable=true --config local.retention.ms=100000 --config retention.ms=604800000 --config segment.bytes=134217728
```

# Enable and disable tiered storage on an existing Amazon MSK topic
<a name="msk-enable-disable-topic-tiered-storage-cli"></a>

These sections cover how to enable and disable tiered storage on a topic that you've already created. To create a new cluster and topic with tiered storage enabled, see [Creating a cluster with tiered storage using the AWS Management Console](https://docs.aws.amazon.com//msk/latest/developerguide/msk-create-cluster-tiered-storage-console).

## Enabling tiered storage on an existing Amazon MSK topic
<a name="msk-enable-topic-tiered-storage-cli"></a>

To enable tiered storage on an existing topic, use the `alter` command syntax in the following example. When you enable tiered storage on an already existing topic, you aren't restricted to a certain Apache Kafka client version.

```
bin/kafka-configs.sh --bootstrap-server $bsrv --alter --entity-type topics --entity-name msk-ts-topic --add-config 'remote.storage.enable=true, local.retention.ms=604800000, retention.ms=15550000000'
```

## Disable tiered storage on an existing Amazon MSK topic
<a name="msk-disable-topic-tiered-storage-cli"></a>

To disable tiered storage on an existing topic, use the `alter` command syntax in the same order as when you enable tiered storage.

```
bin/kafka-configs.sh --bootstrap-server $bs --alter --entity-type topics --entity-name MSKTutorialTopic --add-config 'remote.log.msk.disable.policy=Delete, remote.storage.enable=false'
```

**Note**  
When you disable tiered storage, you completely delete the topic data in tiered storage. Apache Kafka retains primary storage data , but it still applies the primary retention rules based on `local.retention.ms`. After you disable tiered storage on a topic, you can't re-enable it. If you want to disable tiered storage on an existing topic, you aren't restricted to a certain Apache Kafka client version.

# Enable tiered storage on an existing Amazon MSK cluster using AWS CLI
<a name="msk-enable-cluster-tiered-storage-cli"></a>

**Note**  
You can enable tiered storage only if your cluster's log.cleanup.policy is set to `delete`, as compacted topics are not supported on tiered storage. Later, you can configure an individual topic's log.cleanup.policy to `compact` if tiered storage is not enabled on that particular topic. See [Topic-level configuration](https://docs.aws.amazon.com//msk/latest/developerguide/msk-configuration-properties.html#msk-topic-confinguration) for more details on supported configuration attributes.

1. **Update the Kafka version** – Cluster versions aren't simple integers. To find the current version of the cluster, use the `DescribeCluster` operation or the `describe-cluster` AWS CLI command. An example version is `KTVPDKIKX0DER`.

   ```
   aws kafka update-cluster-kafka-version --cluster-arn ClusterArn --current-version Current-Cluster-Version --target-kafka-version 3.6.0
   ```

1. Edit cluster storage mode. The following code example shows editing the cluster storage mode to `TIERED` using the [https://docs.aws.amazon.com/cli/latest/reference/kafka/update-storage.html](https://docs.aws.amazon.com/cli/latest/reference/kafka/update-storage.html) API.

   ```
   aws kafka update-storage --current-version Current-Cluster-Version --cluster-arn Cluster-arn --storage-mode TIERED
   ```

# Update tiered storage on an existing Amazon MSK cluster using the console
<a name="msk-update-tiered-storage-console"></a>

This process describes how to updated a tiered storage Amazon MSK cluster using the AWS Management Console.

Make sure the current Apache Kafka version of your MSK cluster is 2.8.2.tiered. Refer to [updating the Apache Kafka version](https://docs.aws.amazon.com/msk/latest/developerguide/version-upgrades.html) if you need to upgrade your MSK cluster to 2.8.2.tiered version.

**Note**  
You can enable tiered storage only if your cluster's log.cleanup.policy is set to `delete`, as compacted topics are not supported on tiered storage. Later, you can configure an individual topic's log.cleanup.policy to `compact` if tiered storage is not enabled on that particular topic. See [Topic-level configuration](https://docs.aws.amazon.com//msk/latest/developerguide/msk-configuration-properties.html#msk-topic-confinguration) for more details on supported configuration attributes.

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. Go to the cluster summary page and choose **Properties**.

1. Go to the **Storage** section and choose **Edit cluster storage mode**.

1. Choose **Tiered storage and EBS storage** and **Save changes**.

# Scale up Amazon MSK Standard broker storage
<a name="msk-update-storage"></a>

You can increase the amount of EBS storage per broker. You can't decrease the storage. 

Storage volumes remain available during this scaling-up operation.

**Important**  
When storage is scaled for an MSK cluster, the additional storage is made available right away. However, the cluster requires a cool-down period after every storage scaling event. Amazon MSK uses this cool-down period to optimize the cluster before it can be scaled again. This period can range from a minimum of 6 hours to over 24 hours, depending on the cluster's storage size and utilization and on traffic. This is applicable for both auto scaling events and manual scaling using the [UpdateBrokerStorage](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-nodes-storage.html#UpdateBrokerStorage) operation. For information about right-sizing your storage, see [Best practices for Standard brokers](bestpractices.md). 

You can use tiered storage to scale up to unlimited amounts of storage for your broker. See, [Tiered storage for Standard brokers](msk-tiered-storage.md).

**Topics**
+ [Automatic scaling for Amazon MSK clusters](msk-autoexpand.md)
+ [Manual scaling for Standard brokers](manually-expand-storage.md)

# Automatic scaling for Amazon MSK clusters
<a name="msk-autoexpand"></a>

To automatically expand your cluster's storage in response to increased usage, you can configure an Application Auto-Scaling policy for Amazon MSK. In an auto-scaling policy, you set the target disk utilization and the maximum scaling capacity.

Before you use automatic scaling for Amazon MSK, you should consider the following:
+ 
**Important**  
A storage scaling action can occur only once every six hours. 

  We recommend that you start with a right-sized storage volume for your storage demands. For guidance on right-sizing your cluster, see [Right-size your cluster: Number of Standard brokers per cluster](bestpractices.md#brokers-per-cluster).
+ Amazon MSK does not reduce cluster storage in response to reduced usage. Amazon MSK does not support decreasing the size of storage volumes. If you need to reduce the size of your cluster storage, you must migrate your existing cluster to a cluster with smaller storage. For information about migrating a cluster, see [Migrate to MSK cluster](migration.md).
+ Amazon MSK doesn't support automatic scaling in the Asia Pacific (Osaka), Africa (Cape Town), and Asia Pacific (Malaysia) Regions.
+ When you associate an auto-scaling policy with your cluster, Amazon EC2 Auto Scaling automatically creates an Amazon CloudWatch alarm for target tracking. If you delete a cluster with an auto-scaling policy, this CloudWatch alarm persists. To delete the CloudWatch alarm, you should remove an auto-scaling policy from a cluster before you delete the cluster. To learn more about target tracking, see [Target tracking scaling policies for Amazon EC2 Auto Scaling](https://docs.aws.amazon.com/autoscaling/ec2/userguide/as-scaling-target-tracking.html) in the *Amazon EC2 Auto Scaling User Guide*.

**Topics**
+ [Auto-scaling policy details for Amazon MSK](msk-autoexpand-details.md)
+ [Set up automatic scaling for your Amazon MSK cluster](msk-autoexpand-setup.md)

# Auto-scaling policy details for Amazon MSK
<a name="msk-autoexpand-details"></a>

An auto-scaling policy defines the following parameters for your cluster:
+ **Storage Utilization Target**: The storage utilization threshold that Amazon MSK uses to trigger an auto-scaling operation. You can set the utilization target between 10% and 80% of the current storage capacity. We recommend that you set the Storage Utilization Target between 50% and 60%.
+ **Maximum Storage Capacity**: The maximum scaling limit that Amazon MSK can set for your broker storage. You can set the maximum storage capacity up to 16 TiB per broker. For more information, see [Amazon MSK quota](limits.md).

When Amazon MSK detects that your `Maximum Disk Utilization` metric is equal to or greater than the `Storage Utilization Target` setting, it increases your storage capacity by an amount equal to the larger of two numbers: 10 GiB or 10% of current storage. For example, if you have 1000 GiB, that amount is 100 GiB. The service checks your storage utilization every minute. Further scaling operations continue to increase storage by an amount equal to the larger of two numbers: 10 GiB or 10% of current storage.

To determine if auto-scaling operations have occurred, use the [ ListClusterOperations](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-operations.html#ListClusterOperations) operation.

# Set up automatic scaling for your Amazon MSK cluster
<a name="msk-autoexpand-setup"></a>

You can use the Amazon MSK console, the Amazon MSK API, or CloudFormation to implement automatic scaling for storage. CloudFormation support is available through [Application Auto Scaling](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-applicationautoscaling-scalabletarget.html).

**Note**  
You can't implement automatic scaling when you create a cluster. You must first create the cluster, and then create and enable an auto-scaling policy for it. However, you can create the policy while Amazon MSK service creates your cluster.

**Topics**
+ [Set up automatic scaling using the Amazon MSK AWS Management Console](msk-autoexpand-setup-console.md)
+ [Set up automatic scaling using the CLI](msk-autoexpand-setup-cli.md)
+ [Set up automatic-scaling for Amazon MSK using the API](msk-autoexpand-setup-api.md)

# Set up automatic scaling using the Amazon MSK AWS Management Console
<a name="msk-autoexpand-setup-console"></a>

This process describes how to use the Amazon MSK console to implement automatic scaling for storage.

1. Sign in to the AWS Management Console, and open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. In the list of clusters, choose your cluster. This takes you to a page that lists details about the cluster.

1. In the **Auto scaling for storage** section, choose **Configure**.

1. Create and name an auto-scaling policy. Specify the storage utilization target, the maximum storage capacity, and the target metric.

1. Choose `Save changes`.

When you save and enable the new policy, the policy becomes active for the cluster. Amazon MSK then expands the cluster's storage when the storage utilization target is reached.

# Set up automatic scaling using the CLI
<a name="msk-autoexpand-setup-cli"></a>

This process describes how to use the Amazon MSK CLI to implement automatic scaling for storage.

1. Use the [ RegisterScalableTarget](https://docs.aws.amazon.com/cli/latest/reference/application-autoscaling/#available-commands) command to register a storage utilization target.

1. Use the [ PutScalingPolicy](https://docs.aws.amazon.com/cli/latest/reference/application-autoscaling/#available-commands) command to create an auto-expansion policy.

# Set up automatic-scaling for Amazon MSK using the API
<a name="msk-autoexpand-setup-api"></a>

This process describes how to use the Amazon MSK API to implement automatic scaling for storage.

1. Use the [ RegisterScalableTarget](https://docs.aws.amazon.com/autoscaling/application/APIReference/API_RegisterScalableTarget.html) API to register a storage utilization target.

1. Use the [ PutScalingPolicy](https://docs.aws.amazon.com/autoscaling/application/APIReference/API_PutScalingPolicy.html) API to create an auto-expansion policy.

# Manual scaling for Standard brokers
<a name="manually-expand-storage"></a>

To increase storage, wait for the cluster to be in the `ACTIVE` state. Storage scaling has a cool-down period of at least six hours between events. Even though the operation makes additional storage available right away, the service performs optimizations on your cluster that can take up to 24 hours or more. The duration of these optimizations is proportional to your storage size.

## Scaling up broker storage using the AWS Management Console
<a name="update-storage-console"></a>

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. Choose the MSK cluster for which you want to update broker storage.

1. In the **Storage** section, choose **Edit**.

1. Specify the storage volume you want. You can only increase the amount of storage, you can't decrease it.

1. Choose **Save changes**.

## Scaling up broker storage using the AWS CLI
<a name="update-storage-cli"></a>

Run the following command, replacing *ClusterArn* with the Amazon Resource Name (ARN) that you obtained when you created your cluster. If you don't have the ARN for your cluster, you can find it by listing all clusters. For more information, see [List Amazon MSK clusters](msk-list-clusters.md). 

Replace *Current-Cluster-Version* with the current version of the cluster. 

**Important**  
Cluster versions aren't simple integers. To find the current version of the cluster, use the [DescribeCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html#DescribeCluster) operation or the [describe-cluster](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kafka/describe-cluster.html) AWS CLI command. An example version is `KTVPDKIKX0DER`.

The *Target-Volume-in-GiB* parameter represents the amount of storage that you want each broker to have. It is only possible to update the storage for all the brokers. You can't specify individual brokers for which to update storage. The value you specify for *Target-Volume-in-GiB* must be a whole number that is greater than 100 GiB. The storage per broker after the update operation can't exceed 16384 GiB.

```
aws kafka update-broker-storage --cluster-arn ClusterArn --current-version Current-Cluster-Version --target-broker-ebs-volume-info '{"KafkaBrokerNodeId": "All", "VolumeSizeGB": Target-Volume-in-GiB}' 
```

## Scaling up broker storage using the API
<a name="update-storage-api"></a>

To update a broker storage using the API, see [UpdateBrokerStorage](https://docs.aws.amazon.com//msk/1.0/apireference/clusters-clusterarn-nodes-storage.html#UpdateBrokerStorage).

# Manage storage throughput for Standard brokers in a Amazon MSK cluster
<a name="msk-provision-throughput-management"></a>

For information on how to provision throughput using the Amazon MSK console, CLI, and API, see [Provision storage throughput for Standard brokers in a Amazon MSK cluster](msk-provision-throughput.md).

**Topics**
+ [Amazon MSK broker throughput bottlenecks and maximum throughput settings](#throughput-bottlenecks)
+ [Measure storage throughput of a Amazon MSK cluster](#throughput-metrics)
+ [Configuration update values for provisioned storage in a Amazon MSK cluster](#provisioned-throughput-config)
+ [Provision storage throughput for Standard brokers in a Amazon MSK cluster](msk-provision-throughput.md)

## Amazon MSK broker throughput bottlenecks and maximum throughput settings
<a name="throughput-bottlenecks"></a>

There are multiple causes of bottlenecks in broker throughput: volume throughput, Amazon EC2 to Amazon EBS network throughput, and Amazon EC2 egress throughput. You can enable provisioned storage throughput to adjust volume throughput. However, broker throughput limitations can be caused by Amazon EC2 to Amazon EBS network throughput and Amazon EC2 egress throughput. 

Amazon EC2 egress throughput is impacted by the number of consumer groups and consumers per consumer groups. Also, both Amazon EC2 to Amazon EBS network throughput and Amazon EC2 egress throughput are higher for larger broker sizes.

For volume sizes of 10 GiB or larger, you can provision storage throughput of 250 MiB per second or greater. 250 MiB per second is the default. To provision storage throughput, you must choose broker size kafka.m5.4xlarge or larger (or kafka.m7g.2xlarge or larger), and you can specify maximum throughput as shown in the following table.


****  

| broker size | Maximum storage throughput (MiB/second) | 
| --- | --- | 
| kafka.m5.4xlarge | 593 | 
| kafka.m5.8xlarge | 850 | 
| kafka.m5.12xlarge | 1000 | 
| kafka.m5.16xlarge | 1000 | 
| kafka.m5.24xlarge | 1000 | 
| kafka.m7g.2xlarge | 312.5 | 
| kafka.m7g.4xlarge | 625 | 
| kafka.m7g.8xlarge | 1000 | 
| kafka.m7g.12xlarge | 1000 | 
| kafka.m7g.16xlarge | 1000 | 

## Measure storage throughput of a Amazon MSK cluster
<a name="throughput-metrics"></a>

You can use the `VolumeReadBytes` and `VolumeWriteBytes` metrics to measure the average storage throughput of a cluster. The sum of these two metrics gives the average storage throughput in bytes. To get the average storage throughput for a cluster, set these two metrics to SUM and the period to 1 minute, then use the following formula.

```
Average storage throughput in MiB/s = (Sum(VolumeReadBytes) + Sum(VolumeWriteBytes)) / (60 * 1024 * 1024)
```

For information about the `VolumeReadBytes` and `VolumeWriteBytes` metrics, see [`PER_BROKER` Level monitoring](metrics-details.md#broker-metrics).

## Configuration update values for provisioned storage in a Amazon MSK cluster
<a name="provisioned-throughput-config"></a>

You can update your Amazon MSK configuration either before or after you turn on provisioned throughput. However, you won't see the desired throughput until you perform both actions: update the `num.replica.fetchers` configuration parameter and turn on provisioned throughput.

In the default Amazon MSK configuration, `num.replica.fetchers` has a value of 2. To update your `num.replica.fetchers`, you can use the suggested values from the following table. These values are for guidance purposes. We recommend that you adjust these values based on your use case.


****  

| broker size | num.replica.fetchers | 
| --- | --- | 
| kafka.m5.4xlarge | 4 | 
| kafka.m5.8xlarge | 8 | 
| kafka.m5.12xlarge | 14 | 
| kafka.m5.16xlarge | 16 | 
| kafka.m5.24xlarge | 16 | 

Your updated configuration may not take effect for up to 24 hours, and may take longer when a source volume is not fully utilized. However, transitional volume performance at least equals the performance of source storage volumes during the migration period. A fully-utilized 1 TiB volume typically takes about six hours to migrate to an updated configuration.

# Provision storage throughput for Standard brokers in a Amazon MSK cluster
<a name="msk-provision-throughput"></a>

Amazon MSK brokers persist data on storage volumes. Storage I/O is consumed when producers write to the cluster, when data is replicated between brokers, and when consumers read data that isn't in memory. The volume storage throughput is the rate at which data can be written into and read from a storage volume. Provisioned storage throughput is the ability to specify that rate for the brokers in your cluster.

You can specify the provisioned throughput rate in MiB per second for clusters whose brokers are of size `kafka.m5.4xlarge` or larger and if the storage volume is 10 GiB or greater. It is possible to specify provisioned throughput during cluster creation. You can also enable or disable provisioned throughput for a cluster that is in the `ACTIVE` state.

For information about managing throughput, see [Manage storage throughput for Standard brokers in a Amazon MSK cluster](msk-provision-throughput-management.md).

**Topics**
+ [Provision Amazon MSK cluster storage throughput using the AWS Management Console](#provisioned-throughput-console)
+ [Provision Amazon MSK cluster storage throughput using the AWS CLI](#provisioned-throughput-cli)
+ [Provision storage throughput when creating a Amazon MSK cluster using the API](#provisioned-throughput-api)

## Provision Amazon MSK cluster storage throughput using the AWS Management Console
<a name="provisioned-throughput-console"></a>

This process shows an example of how you can use the AWS Management Console to create a Amazon MSK cluster with provisioned throughput enabled.

1. Sign in to the AWS Management Console, and open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. Choose **Create cluster**.

1. Choose **Custom create**.

1. Specify a name for the cluster.

1. In the **Storage** section, choose **Enable**.

1. Choose a value for storage throughput per broker.

1. Choose a VPC, zones and subnets, and a security group.

1. Choose **Next**.

1. At the bottom of the **Security** step, choose **Next**.

1. At the bottom of the **Monitoring and tags** step, choose **Next**.

1. Review the cluster settings, then choose **Create cluster**.

## Provision Amazon MSK cluster storage throughput using the AWS CLI
<a name="provisioned-throughput-cli"></a>

This process shows an example of how you can use the AWS CLI to create a cluster with provisioned throughput enabled.

1. Copy the following JSON and paste it into a file. Replace the subnet IDs and security group ID placeholders with values from your account. Name the file `cluster-creation.json` and save it.

   ```
   {
       "Provisioned": {
           "BrokerNodeGroupInfo":{
               "InstanceType":"kafka.m5.4xlarge",
               "ClientSubnets":[
                   "Subnet-1-ID",
                   "Subnet-2-ID"
               ],
               "SecurityGroups":[
                   "Security-Group-ID"
               ],
               "StorageInfo": {
                   "EbsStorageInfo": {
                       "VolumeSize": 10,
                       "ProvisionedThroughput": {
                           "Enabled": true,
                           "VolumeThroughput": 250
                       }
                   }
               }
           },
           "EncryptionInfo": {
               "EncryptionInTransit": {
                   "InCluster": false,
                   "ClientBroker": "PLAINTEXT"
               }
           },
           "KafkaVersion":"2.8.1",
           "NumberOfBrokerNodes": 2
       },
       "ClusterName": "provisioned-throughput-example"
   }
   ```

1. Run the following AWS CLI command from the directory where you saved the JSON file in the previous step.

   ```
   aws kafka create-cluster-v2 --cli-input-json file://cluster-creation.json
   ```

## Provision storage throughput when creating a Amazon MSK cluster using the API
<a name="provisioned-throughput-api"></a>

To configure provisioned storage throughput while creating a cluster, use [CreateClusterV2](https://docs.aws.amazon.com/MSK/2.0/APIReference/v2-clusters.html#CreateClusterV2).

# Amazon MSK Provisioned configuration
<a name="msk-configuration"></a>

Amazon MSK provides default configurations for brokers, topics, and metadata nodes. You can also create custom configurations and use them to create new MSK clusters or to update existing clusters. An MSK configuration consists of a set of properties and their corresponding values. Depending on the broker type you use in your cluster, there are a different set of configuration defaults and a different set of configurations you can modify. See the sections below for more details on how to configure your Standard and Express brokers.

**Topics**
+ [Standard broker configurations](msk-configuration-standard.md)
+ [Express broker configurations](msk-configuration-express.md)
+ [Broker configuration operations](msk-configuration-operations.md)

# Standard broker configurations
<a name="msk-configuration-standard"></a>

This section describes configuration properties for Standard brokers.

**Topics**
+ [Custom Amazon MSK configurations](msk-configuration-properties.md)
+ [Default Amazon MSK configuration](msk-default-configuration.md)
+ [Guidelines for Amazon MSK tiered storage topic-level configuration](msk-guidelines-tiered-storage-topic-level-config.md)

# Custom Amazon MSK configurations
<a name="msk-configuration-properties"></a>

You can use Amazon MSK to create a custom MSK configuration where you set the following Apache Kafka configuration properties. Properties that you don't set explicitly get the values they have in [Default Amazon MSK configuration](msk-default-configuration.md). For more information about configuration properties, see [Apache Kafka Configuration](https://kafka.apache.org/documentation/#configuration).


| Name | Description | 
| --- | --- | 
| allow.everyone.if.no.acl.found | If you want to set this property tofalse, first make sure you define Apache Kafka ACLs for your cluster. If you set this property to falseand you don't first define Apache Kafka ACLs, you lose access to the cluster. If that happens, you can update the configuration again and set this property to true to regain access to the cluster. | 
| auto.create.topics.enable | Enables topic auto-creation on the server. | 
| compression.type | The final compression type for a given topic. You can set this property to the standard compression codecs (gzip, snappy, lz4, and zstd). It additionally accepts uncompressed. This value is equivalent to no compression. If you set the value to producer, it means retain the original compression codec that the producer sets. | 
|  connections.max.idle.ms  | Idle connections timeout in milliseconds. The server socket processor threads close the connections that are idle for more than the value that you set for this property. | 
| default.replication.factor | The default replication factor for automatically created topics. | 
| delete.topic.enable | Enables the delete topic operation. If you turn off this setting, you can't delete a topic through the admin tool. | 
| group.initial.rebalance.delay.ms | Amount of time the group coordinator waits for more data consumers to join a new group before the group coordinator performs the first rebalance. A longer delay means potentially fewer rebalances, but this increases the time until processing begins. | 
| group.max.session.timeout.ms | Maximum session timeout for registered consumers. Longer timeouts give consumers more time to process messages between heartbeats at the cost of a longer time to detect failures. | 
| group.min.session.timeout.ms | Minimum session timeout for registered consumers. Shorter timeouts result in quicker failure detection at the cost of more frequent consumer heartbeats. This can overwhelm broker resources. | 
| leader.imbalance.per.broker.percentage | The ratio of leader imbalance allowed per broker. The controller triggers a leader balance if it exceeds this value per broker. This value is specified in percentage. | 
| log.cleaner.delete.retention.ms | Amount of time that you want Apache Kafka to retain deleted records. The minimum value is 0. | 
| log.cleaner.min.cleanable.ratio |  This configuration property can have values between 0 and 1. This value determines how frequently the log compactor attempts to clean the log (if log compaction is enabled). By default, Apache Kafka avoids cleaning a log if more than 50% of the log has been compacted. This ratio bounds the maximum space that  the log wastes with duplicates (at 50%, this means at most 50% of the log could be duplicates). A higher ratio means fewer, more efficient cleanings, but more wasted space in the log.  | 
| log.cleanup.policy | The default cleanup policy for segments beyond the retention window. A comma-separated list of valid policies. Valid policies are delete and compact. For Tiered Storage enabled clusters, valid policy is delete only. | 
| log.flush.interval.messages | Number of messages that accumulate on a log partition before messages are flushed to disk. | 
| log.flush.interval.ms | Maximum time in milliseconds that a message in any topic remains in memory before flushed to disk. If you don't set this value, the value in log.flush.scheduler.interval.ms is used. The minimum value is 0. | 
| log.message.timestamp.difference.max.ms | This configuration is deprecated in Kafka 3.6.0. Two configurations, log.message.timestamp.before.max.ms and log.message.timestamp.after.max.ms, have been added. The maximum time difference between the timestamp when a broker receives a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime, a message is rejected if the difference in timestamp exceeds this threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime. | 
| log.message.timestamp.type | Specifies if the timestamp in the message is the message creation time or the log append time. The allowed values are CreateTime and LogAppendTime. | 
| log.retention.bytes | Maximum size of the log before deleting it. | 
| log.retention.hours | Number of hours to keep a log file before deleting it, tertiary to the log.retention.ms property. | 
| log.retention.minutes | Number of minutes to keep a log file before deleting it, secondary to log.retention.ms property. If you don't set this value, the value in log.retention.hours is used. | 
| log.retention.ms | Number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in log.retention.minutes is used. | 
| log.roll.ms | Maximum time before a new log segment is rolled out (in milliseconds). If you don't set this property, the value in log.roll.hours is used. The minimum possible value for this property is 1. | 
| log.segment.bytes | Maximum size of a single log file. | 
| max.incremental.fetch.session.cache.slots | Maximum number of incremental fetch sessions that are maintained. | 
| message.max.bytes |  Largest record batch size that Kafka allows. If you increase this value and there are consumers older than 0.10.2, you must also increase the fetch size of the consumers so that they can fetch record batches this large. The latest message format version always groups messages into batches for efficiency. Previous message format versions don't group uncompressed records into batches, and in such a case, this limit only applies to a single record. You can set this value per topic with the topic level max.message.bytes config.  | 
| min.insync.replicas |  When a producer sets acks to `"all"` (or `"-1"`), the value in min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, the producer raises an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend). You can use values in min.insync.replicas and acks to enforce greater durability guarantees. For example, you might create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of `"all"`. This ensures that the producer raises an exception if a majority of replicas don't receive a write.  | 
| num.io.threads | The number of threads that the server uses for processing requests, which may include disk I/O. | 
| num.network.threads | The number of threads that the server uses to receive requests from the network and send responses to it. | 
| num.partitions | Default number of log partitions per topic. | 
| num.recovery.threads.per.data.dir | The number of threads per data directory to be used to recover logs at startup and and to flush them at shutdown. | 
| num.replica.fetchers | The number of fetcher threads used to replicate messages from a source broker. If you increase this value, you can increase the degree of I/O parallelism in the follower broker. | 
| offsets.retention.minutes | After a consumer group loses all its consumers (that is, it becomes empty) its offsets are kept for this retention period before getting discarded. For standalone consumers (that is,those that use manual assignment), offsets expire after the time of the last commit plus this retention period. | 
| offsets.topic.replication.factor | The replication factor for the offsets topic. Set this value higher to ensure availability. Internal topic creation fails until the cluster size meets this replication factor requirement. | 
| replica.fetch.max.bytes | Number of bytes of messages to attempt to fetch for each partition. This is not an absolute maximum. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is returned to ensure progress. The message.max.bytes (broker config) or max.message.bytes (topic config) defines the maximum record batch size that the broker accepts. | 
| replica.fetch.response.max.bytes | The maximum number of bytes expected for the entire fetch response. Records are fetched in batches, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure progress. This isn't an absolute maximum. The message.max.bytes (broker config) or max.message.bytes (topic config) properties specify the maximum record batch size that the broker accepts. | 
| replica.lag.time.max.ms | If a follower hasn't sent any fetch requests or hasn't consumed up to the leader's log end offset for at least this number of milliseconds, the leader removes the follower from the ISR.MinValue: 10000MaxValue = 30000 | 
| replica.selector.class | The fully-qualified class name that implements ReplicaSelector. The broker uses this value to find the preferred read replica. If you use Apache Kafka version 2.4.1 or higher, and want to allow consumers to fetch from the closest replica, set this property to org.apache.kafka.common.replica.RackAwareReplicaSelector. For more information, see [Apache Kafka version 2.4.1 (use 2.4.1.1 instead)](supported-kafka-versions.md#2.4.1). | 
| replica.socket.receive.buffer.bytes | The socket receive buffer for network requests. | 
| socket.receive.buffer.bytes | The SO\$1RCVBUF buffer of the socket server sockets. The minimum value that you can set for this property is -1. If the value is -1, Amazon MSK uses the OS default. | 
| socket.request.max.bytes | The maximum number of bytes in a socket request. | 
| socket.send.buffer.bytes | The SO\$1SNDBUF buffer of the socket server sockets. The minimum value that you can set for this property is -1. If the value is -1, Amazon MSK uses the OS default. | 
| transaction.max.timeout.ms | Maximum timeout for transactions. If the requested transaction time of a client exceeds this value, the broker returns an error in InitProducerIdRequest. This prevents a client from too large of a timeout, and this can stall consumers that read from topics included in the transaction. | 
| transaction.state.log.min.isr | Overridden min.insync.replicas configuration for the transaction topic. | 
| transaction.state.log.replication.factor | The replication factor for the transaction topic. Set this property to a higher value to increase availability. Internal topic creation fails until the cluster size meets this replication factor requirement. | 
| transactional.id.expiration.ms | The time in milliseconds that the transaction coordinator waits to receive any transaction status updates for the current transaction before the coordinator expires its transactional ID. This setting also influences producer ID expiration because it causes producer IDs expire when this time elapses after the last write with the given producer ID. Producer IDs might expire sooner if the last write from the producer ID is deleted because of the  retention settings for the topic. The minimum value for this property is 1 millisecond. | 
| unclean.leader.election.enable | Indicates if replicas not in the ISR set should serve as leader as a last resort, even though this might result in data loss. | 
| zookeeper.connection.timeout.ms | ZooKeeper mode clusters. Maximum time that the client waits to establish a connection to ZooKeeper. If you don't set this value, the value in zookeeper.session.timeout.ms is used. MinValue = 6000 MaxValue (inclusive) = 18000 We recommend that you set this value to 10,000 on T3.small to avoid cluster downtime.  | 
| zookeeper.session.timeout.ms |  ZooKeeper mode clusters. The Apache ZooKeeper session timeout in milliseconds. MinValue = 6000 MaxValue (inclusive) = 18000  | 

To learn how you can create a custom MSK configuration, list all configurations, or describe them, see [Broker configuration operations](msk-configuration-operations.md). To create an MSK cluster with a custom MSK configuration, or to update a cluster with a new custom configuration, see [Amazon MSK key features and concepts](operations.md).

When you update your existing MSK cluster with a custom MSK configuration, Amazon MSK does rolling restarts when necessary, and uses best practices to minimize customer downtime. For example, after Amazon MSK restarts each broker, Amazon MSK tries to let the broker catch up on data that the broker might have missed during the configuration update before it moves to the next broker.

## Dynamic Amazon MSK configuration
<a name="msk-dynamic-confinguration"></a>

In addition to the configuration properties that Amazon MSK provides, you can dynamically set cluster-level and broker-level configuration properties that don't require a broker restart. You can dynamically set some configuration properties. These are the properties not marked as read-only in the table under [Broker Configs](https://kafka.apache.org/documentation/#brokerconfigs) in the Apache Kafka documentation. For information on dynamic configuration and example commands, see [Updating Broker Configs](https://kafka.apache.org/documentation/#dynamicbrokerconfigs) in the Apache Kafka documentation.

**Note**  
You can set the `advertised.listeners` property, but not the `listeners` property.

## Topic-level Amazon MSK configuration
<a name="msk-topic-confinguration"></a>

You can use Apache Kafka commands to set or modify topic-level configuration properties for new and existing topics. For more information on topic-level configuration properties and examples on how to set them, see [Topic-Level Configs](https://kafka.apache.org/documentation/#topicconfigs) in the Apache Kafka documentation.

# Default Amazon MSK configuration
<a name="msk-default-configuration"></a>

When you create an MSK cluster and don't specify a custom MSK configuration, Amazon MSK creates and uses a default configuration with the values shown in the following table. For properties that aren't in this table, Amazon MSK uses the defaults associated with your version of Apache Kafka. For a list of these default values, see [Apache Kafka Configuration](https://kafka.apache.org/documentation/#configuration). 


| Name | Description | Default value for non-tiered storage cluster | Default value for tiered storage-enabled cluster | 
| --- | --- | --- | --- | 
| allow.everyone.if.no.acl.found | If no resource patterns match a specific resource, the resource has no associated ACLs. In this case, if you set this property to true, all users can access the resource, not just the super users. | true | true | 
| auto.create.topics.enable | Enables autocreation of a topic on the server. | false | false | 
| auto.leader.rebalance.enable | Enables auto leader balancing. A background thread checks and initiates leader balance at regular intervals, if necessary. | true | true | 
| default.replication.factor | Default replication factors for automatically created topics. | 3 for clusters in 3 Availability Zones, and 2 for clusters in 2 Availability Zones. | 3 for clusters in 3 Availability Zones, and 2 for clusters in 2 Availability Zones. | 
|  local.retention.bytes  |  The maximum size of local log segments for a partition before it deletes the old segments. If you don't set this value, the value in log.retention.bytes is used. The effective value should always be less than or equal to the log.retention.bytes value. The default value of -2 indicates that there is no limit on local retention. This corresponds to the retention.ms/bytes setting of -1. The properties local.retention.ms and local.retention.bytes are similar to log.retention as they are used to determine how long the log segments should remain in local storage. Existing log.retention.\$1 configurations are retention configurations for the topic partition. This includes both local and remote storage. Valid values: integers in [-2; \$1Inf]  | -2 for unlimited | -2 for unlimited | 
|  local.retention.ms  | The number of milliseconds to retain the local log segment before deletion. If you don't set this value, Amazon MSK uses the value in log.retention.ms. The effective value should always be less than or equal to the log.retention.bytes value. The default value of -2 indicates that there is no limit on local retention. This corresponds to the retention.ms/bytes setting of -1.The values local.retention.ms and local.retention.bytes are similar to log.retention. MSK uses this configuration to determine how long the log segments should remain in local storage. Existing log.retention.\$1 configurations are retention configurations for the topic partition. This includes both local and remote storage. Valid values are integers greater than 0. | -2 for unlimited | -2 for unlimited | 
|  log.message.timestamp.difference.max.ms  | This configuration is deprecated in Kafka 3.6.0. Two configurations, log.message.timestamp.before.max.ms and log.message.timestamp.after.max.ms, have been added. The maximum difference allowed between the timestamp when a broker receives a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime, a message will be rejected if the difference in timestamp exceeds this threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime. The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling. | 9223372036854775807 | 86400000 for Kafka 2.8.2.tiered and Kafka 3.7.x tiered. | 
| log.segment.bytes | The maximum size of a single log file. | 1073741824 | 134217728 | 
| min.insync.replicas |  When a producer sets the value of acks (acknowledgement producer gets from Kafka broker) to `"all"` (or `"-1"`), the value in min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this value doesn't meet this minimum, the producer raises an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend). When you use the values in min.insync.replicas and acks together, you can enforce greater durability guarantees. For example, you might create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of `"all"`. This ensures that the producer raises an exception if a majority of replicas don't receive a write.  | 2 for clusters in 3 Availability Zones, and 1 for clusters in 2 Availability Zones. | 2 for clusters in 3 Availability Zones, and 1 for clusters in 2 Availability Zones. | 
| num.io.threads | Number of threads that the server uses to produce requests, which may include disk I/O. | 8 | max(8, vCPUs) where vCPUs depends on the instance size of broker | 
| num.network.threads | Number of threads that the server uses to receive requests from the network and send responses to the network. | 5 | max(5, vCPUs / 2) where vCPUs depends on the instance size of broker | 
| num.partitions | Default number of log partitions per topic. | 1 | 1 | 
| num.replica.fetchers | Number of fetcher threads used to replicate messages from a source broker.If you increase this value, you can increase the degree of I/O parallelism in the follower broker. | 2 | max(2, vCPUs / 4) where vCPUs depends on the instance size of broker | 
|  remote.log.msk.disable.policy  |  Used with remote.storage.enable to disable tiered storage. Set this policy to Delete, to indicate that data in tiered storage is deleted when you set remote.storage.enable to false.  | N/A | None | 
| remote.log.reader.threads | Remote log reader thread pool size, which is used in scheduling tasks to fetch data from remote storage. | N/A | max(10, vCPUs \$1 0.67) where vCPUs depends on the instance size of broker | 
|  remote.storage.enable  | Enables tiered (remote) storage for a topic if set to true. Disables topic level tiered storage if set to false and remote.log.msk.disable.policy is set to Delete. When you disable tiered storage, you delete data from remote storage. When you disable tiered storage for a topic, you can't enable it again. | false | false | 
| replica.lag.time.max.ms | If a follower hasn't sent any fetch requests or hasn't consumed up to the leader's log end offset for at least this number of milliseconds, the leader removes the follower from the ISR. | 30000 | 30000 | 
|  retention.ms  |  Mandatory field. Minimum time is 3 days. There is no default because the setting is mandatory. Amazon MSK uses the retention.ms value with local.retention.ms to determine when data moves from local to tiered storage. The local.retention.ms value specifies when to move data from local to tiered storage. The retention.ms value specifies when to remove data from tiered storage (that is, removed from the cluster). Valid values: integers in [-1; \$1Inf]  | Minimum 259,200,000 milliseconds (3 days). -1 for infinite retention. | Minimum 259,200,000 milliseconds (3 days). -1 for infinite retention. | 
| socket.receive.buffer.bytes | The SO\$1RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default is used. | 102400 | 102400 | 
| socket.request.max.bytes | Maximum number of bytes in a socket request. | 104857600 | 104857600 | 
| socket.send.buffer.bytes | The SO\$1SNDBUF buffer of the socket sever sockets. If the value is -1, the OS default is used. | 102400 | 102400 | 
| unclean.leader.election.enable | Indicates if you want replicas not in the ISR set to serve as leader as a last resort, even though this might result in data loss. | true | false | 
| zookeeper.session.timeout.ms |  The Apache ZooKeeper session timeout in milliseconds.  | 18000 | 18000 | 
| zookeeper.set.acl | The set client to use secure ACLs. | false | false | 

For information about how to specify custom configuration values, see [Custom Amazon MSK configurations](msk-configuration-properties.md).

# Guidelines for Amazon MSK tiered storage topic-level configuration
<a name="msk-guidelines-tiered-storage-topic-level-config"></a>

The following are default settings and limitations when you configure tiered storage at the topic level.
+ Amazon MSK doesn't support smaller log segment sizes for topics with tiered storage activated. If you want to create a segment, there is a minimum log segment size of 48 MiB, or a minimum segment roll time of 10 minutes. These values map to the segment.bytes and segment.ms properties.
+ The value of local.retention.ms/bytes can't equal or exceed the retention.ms/bytes. This is the tiered storage retention setting.
+ The default value for for local.retention.ms/bytes is -2. This means that the retention.ms value is used for local.retention.ms/bytes. In this case, data remains in both local storage and tiered storage (one copy in each), and they expire together. For this option, a copy of the local data is persisted to the remote storage. In this case, the data read from consume traffic comes from the local storage.
+ The default value for retention.ms is 7 days. There is no default size limit for retention.bytes.
+ The minimum value for retention.ms/bytes is -1. This means infinite retention.
+ The minimum value for local.retention.ms/bytes is -2. This means infinite retention for local storage. It matches with the retention.ms/bytes setting as -1.
+ The topic-level configuration retention.ms is mandatory for topics with tiered storage activated. The minimum retention.ms is 3 days.

For more information about tiered storage contraints, see [Tiered storage constraints and limitations for Amazon MSK clusters](msk-tiered-storage.md#msk-tiered-storage-constraints).

# Express broker configurations
<a name="msk-configuration-express"></a>

Apache Kafka has hundreds of broker configurations that you can use to tune the performance of your MSK Provisioned cluster. Setting erroneous or sub-optimal values can affect cluster reliability and performance. Express brokers improve the availability and durability of your MSK Provisioned clusters by setting optimal values for critical configurations and protecting them from common misconfiguration. There are three categories of configurations based on read and write access: [read/write (editable)](msk-configuration-express-read-write.md), [read only](msk-configuration-express-read-only.md), and non-read/write configurations. Some configurations still use Apache Kafka’s default value for the Apache Kafka version the cluster is running. We mark those as Apache Kafka Default.

**Topics**
+ [Custom MSK Express broker configurations (Read/Write access)](msk-configuration-express-read-write.md)
+ [Express brokers read-only configurations](msk-configuration-express-read-only.md)

# Custom MSK Express broker configurations (Read/Write access)
<a name="msk-configuration-express-read-write"></a>

You can update read/write broker configurations either by using Amazon MSK’s [update configuration feature](msk-update-cluster-config.md) or using Apache Kafka’s AlterConfig API. Apache Kafka broker configurations are either static or dynamic. Static configurations require a broker restart for the configuration to be applied, while dynamic configurations do not need a broker restart. For more information about configuration properties and update modes, see [Updating broker configs](https://kafka.apache.org/documentation/#dynamicbrokerconfigs).

**Topics**
+ [Static configurations on MSK Express brokers](#msk-configuration-express-static-configuration)
+ [Dynamic configurations on Express Brokers](#msk-configuration-express-dynamic-configuration)
+ [Topic-level configurations on Express Brokers](#msk-configuration-express-topic-configuration)

## Static configurations on MSK Express brokers
<a name="msk-configuration-express-static-configuration"></a>

You can use Amazon MSK to create a custom MSK configuration file to set the following static properties. Amazon MSK sets and manages all other properties that you do not set. You can create and update static configuration files from the MSK console or using the [configurations command](msk-configuration-operations-create.md).


| Property | Description | Default Value | 
| --- | --- | --- | 
|  allow.everyone.if.no.acl.found  |  If you want to set this property to false, first make sure you define Apache Kafka ACLs for your cluster. If you set this property to false and you don't first define Apache Kafka ACLs, you lose access to the cluster. If that happens, you can update the configuration again and set this property to true to regain access to the cluster.  |  true  | 
|  auto.create.topics.enable  |  Enables autocreation of a topic on the server.  |  false  | 
| compression.type |  Specify the final compression type for a given topic. This configuration accepts the standard compression codecs: gzip, snappy, lz4, zstd. This configuration additionally accepts `uncompressed`, which is equivalent to no compression; and `producer`, which means retain the original compression codec set by the producer. | Apache Kafka Default | 
|  connections.max.idle.ms  |  Idle connections timeout in milliseconds. The server socket processor threads close the connections that are idle for more than the value that you set for this property.  |  Apache Kafka Default  | 
|  delete.topic.enable  |  Enables the delete topic operation. If you turn off this setting, you can't delete a topic through the admin tool.  |  Apache Kafka Default  | 
|  group.initial.rebalance.delay.ms  |   Amount of time the group coordinator waits for more data consumers to join a new group before the group coordinator performs the first rebalance. A longer delay means potentially fewer rebalances, but this increases the time until processing begins.  |  Apache Kafka Default  | 
|  group.max.session.timeout.ms  |  Maximum session timeout for registered consumers. Longer timeouts give consumers more time to process messages between heartbeats at the cost of a longer time to detect failures.  |  Apache Kafka Default  | 
|  leader.imbalance.per.broker.percentage  |  The ratio of leader imbalance allowed per broker. The controller triggers a leader balance if it exceeds this value per broker. This value is specified in percentage.  |  Apache Kafka Default  | 
| log.cleanup.policy | The default cleanup policy for segments beyond the retention window. A comma-separated list of valid policies. Valid policies are delete and compact. For tiered storage-enabled clusters, valid policy is delete only. | Apache Kafka Default | 
| log.message.timestamp.after.max.ms |  The allowable timestamp difference between the message timestamp and the broker's timestamp. The message timestamp can be later than or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this configuration. If `log.message.timestamp.type=CreateTime`, the message will be rejected if the difference in timestamps exceeds this specified threshold. This configuration is ignored if `log.message.timestamp.type=LogAppendTime`.  | 86400000 (24 \$1 60 \$1 60 \$1 1000 ms, that is, 1 day) | 
| log.message.timestamp.before.max.ms |  The allowable timestamp difference between the broker's timestamp and the message timestamp. The message timestamp can be earlier than or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this configuration. If `log.message.timestamp.type=CreateTime`, the message will be rejected if the difference in timestamps exceeds this specified threshold. This configuration is ignored if `log.message.timestamp.type=LogAppendTime`.  | 86400000 (24 \$1 60 \$1 60 \$1 1000 ms, that is, 1 day) | 
| log.message.timestamp.type | Specifies if the timestamp in the message is the message creation time or the log append time. The allowed values are CreateTime and LogAppendTime. | Apache Kafka Default | 
| log.retention.bytes | Maximum size of the log before deleting it. | Apache Kafka Default | 
| log.retention.ms | Number of milliseconds to keep a log file before deleting it. | Apache Kafka Default | 
| max.connections.per.ip | The maximum number of connections allowed from each IP address. This can be set to 0 if there are overrides configured using the max.connections.per.ip.overrides property. New connections from the IP address are dropped if the limit is reached. | Apache Kafka Default | 
|  max.incremental.fetch.session.cache.slots  |  Maximum number of incremental fetch sessions that are maintained.  |  Apache Kafka Default  | 
| message.max.bytes |  Largest record batch size that Kafka allows. If you increase this value and there are consumers older than 0.10.2, you must also increase the fetch size of the consumers so that they can fetch record batches this large. The latest message format version always groups messages into batches for efficiency. Previous message format versions don't group uncompressed records into batches, and in such a case, this limit only applies to a single record. You can set this value per topic with the topic level `max.message.bytes` config.  | Apache Kafka Default | 
|  num.partitions  |  Default number of partitions per topic.  |  1  | 
|  offsets.retention.minutes  |  After a consumer group loses all its consumers (that is, it becomes empty) its offsets are kept for this retention period before getting discarded. For standalone consumers (that is, those that use manual assignment), offsets expire after the time of the last commit plus this retention period.  |  Apache Kafka Default  | 
|  replica.fetch.max.bytes  |  Number of bytes of messages to attempt to fetch for each partition. This is not an absolute maximum. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is returned to ensure progress. The message.max.bytes (broker config) or max.message.bytes (topic config) defines the maximum record batch size that the broker accepts.  |  Apache Kafka Default  | 
|  replica.selector.class  |  The fully-qualified class name that implements ReplicaSelector. The broker uses this value to find the preferred read replica. If you want to allow consumers to fetch from the closest replica, set this property to `org.apache.kafka.common.replica.RackAwareReplicaSelector`.  |  Apache Kafka Default  | 
|  socket.receive.buffer.bytes  |  The SO\$1RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default is used.  |  102400  | 
|  socket.request.max.bytes  |  Maximum number of bytes in a socket request.  |  104857600  | 
|  socket.send.buffer.bytes  |  The SO\$1SNDBUF buffer of the socket sever sockets. If the value is -1, the OS default is used.  |  102400  | 
|  transaction.max.timeout.ms  |  Maximum timeout for transactions. If the requested transaction time of a client exceeds this value, the broker returns an error in InitProducerIdRequest. This prevents a client from too large of a timeout, and this can stall consumers that read from topics included in the transaction.  |  Apache Kafka Default  | 
|  transactional.id.expiration.ms  |  The time in milliseconds that the transaction coordinator waits to receive any transaction status updates for the current transaction before the coordinator expires its transactional ID. This setting also influences producer ID expiration because it causes producer IDs to expire when this time elapses after the last write with the given producer ID. Producer IDs might expire sooner if the last write from the producer ID is deleted because of the retention settings for the topic. The minimum value for this property is 1 millisecond.  |  Apache Kafka Default  | 

## Dynamic configurations on Express Brokers
<a name="msk-configuration-express-dynamic-configuration"></a>

You can use Apache Kafka AlterConfig API or the Kafka-configs.sh tool to edit the following dynamic configurations. Amazon MSK sets and manages all other properties that you do not set. You can dynamically set cluster-level and broker-level configuration properties that don't require a broker restart.


| Property | Description | Default value | 
| --- | --- | --- | 
|  advertised.listeners  |  Listeners to publish for clients to use, if different than the `listeners` config property. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, the value for listeners will be used. Unlike listeners, it is not valid to advertise the 0.0.0.0 meta-address. Also unlike `listeners`, there can be duplicated ports in this property, so that one listener can be configured to advertise another listener's address. This can be useful in some cases where external load balancers are used. This property is set at a per-broker level.  |  null  | 
|  compression.type  |  The final compression type for a given topic. You can set this property to the standard compression codecs (`gzip`, `snappy`, `lz4`, and `zstd`). It additionally accepts `uncompressed`. This value is equivalent to no compression. If you set the value to `producer`, it means retain the original compression codec that the producer sets.  | Apache Kafka Default | 
| log.cleaner.delete.retention.ms | The amount of time to retain delete tombstone markers for log compacted topics. This setting also gives a bound on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage. Else, delete tombstones might be collected before they complete their scan. | 86400000 (24 \$1 60 \$1 60 \$1 1000 ms, that is, 1 day), Apache Kafka Default | 
| log.cleaner.min.compaction.lag.ms | The minimum time a message will remain uncompacted in the log. This setting is only applicable for logs that are being compacted. | 0, Apache Kafka Default | 
| log.cleaner.max.compaction.lag.ms | The maximum time a message will remain ineligible for compaction in the log. This setting is only applicable for logs that are being compacted. This configuration would be bounded in the range of [7 days, Long.Max]. | 9223372036854775807, Apache Kafka Default | 
|  log.cleanup.policy  |  The default cleanup policy for segments beyond the retention window. A comma-separated list of valid policies. Valid policies are `delete` and `compact`. For tiered storage-enabled clusters, valid policy is `delete` only.  | Apache Kafka Default | 
|  log.message.timestamp.after.max.ms  |  The allowable timestamp difference between the message timestamp and the broker's timestamp. The message timestamp can be later than or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this configuration. If `log.message.timestamp.type=CreateTime`, the message will be rejected if the difference in timestamps exceeds this specified threshold. This configuration is ignored if `log.message.timestamp.type=LogAppendTime`.  | 86400000 (24 \$1 60 \$1 60 \$1 1000 ms, that is, 1 day) | 
|  log.message.timestamp.before.max.ms  |  The allowable timestamp difference between the broker's timestamp and the message timestamp. The message timestamp can be earlier than or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this configuration. If `log.message.timestamp.type=CreateTime`, the message will be rejected if the difference in timestamps exceeds this specified threshold. This configuration is ignored if `log.message.timestamp.type=LogAppendTime`.  | 86400000 (24 \$1 60 \$1 60 \$1 1000 ms, that is, 1 day) | 
|  log.message.timestamp.type  |  Specifies if the timestamp in the message is the message creation time or the log append time. The allowed values are `CreateTime` and `LogAppendTime`.  | Apache Kafka Default | 
|  log.retention.bytes  |  Maximum size of the log before deleting it.  |  Apache Kafka Default  | 
|  log.retention.ms  |  Number of milliseconds to keep a log file before deleting it.  |  Apache Kafka Default  | 
|  max.connection.creation.rate  |  The maximum connection creation rate allowed in the broker at any time.  |  Apache Kafka Default  | 
|  max.connections  |  The maximum number of connections allowed in the broker at any time. This limit is applied in addition to any per-ip limits configured using `max.connections.per.ip`.  |  Apache Kafka Default  | 
|  max.connections.per.ip  |  The maximum number of connections allowed from each ip address. This can be set to `0` if there are overrides configured using max.connections.per.ip.overrides property. New connections from the ip address are dropped if the limit is reached.  |  Apache Kafka Default  | 
|  max.connections.per.ip.overrides  |  A comma-separated list of per-ip or hostname overrides to the default maximum number of connections. An example value is `hostName:100,127.0.0.1:200`  | Apache Kafka Default | 
|  message.max.bytes  |  Largest record batch size that Kafka allows. If you increase this value and there are consumers older than 0.10.2, you must also increase the fetch size of the consumers so that they can fetch record batches this large. The latest message format version always groups messages into batches for efficiency. Previous message format versions don't group uncompressed records into batches, and in such a case, this limit only applies to a single record. You can set this value per topic with the topic level `max.message.bytes` config.  | Apache Kafka Default | 
|  producer.id.expiration.ms  |  The time in ms that a topic partition leader will wait before expiring producer IDs. Producer IDs will not expire while a transaction associated to them is still ongoing. Note that producer IDs may expire sooner if the last write from the producer ID is deleted due to the topic's retention settings. Setting this value the same or higher than `delivery.timeout.ms` can help prevent expiration during retries and protect against message duplication, but the default should be reasonable for most use cases.  | Apache Kafka Default | 

## Topic-level configurations on Express Brokers
<a name="msk-configuration-express-topic-configuration"></a>

You can use Apache Kafka commands to set or modify topic-level configuration properties for new and existing topics. If you can't give any topic-level, configuration, Amazon MSK uses the broker default. As with broker-level configurations, Amazon MSK protects some of the topic-level configuration properties from change. Examples include replication factor, `min.insync.replicas` and `unclean.leader.election.enable`. If you try to create a topic with a replication factor value other than `3`, Amazon MSK will create the topic with a replication factor of `3` by default. For more information on topic-level configuration properties and examples on how to set them, see [Topic-Level Configs](https://kafka.apache.org/documentation/#topicconfigs) in the Apache Kafka documentation.


| Property | Description | 
| --- | --- | 
|  cleanup.policy  |  This config designates the retention policy to use on log segments. The "delete" policy (which is the default) will discard old segments when their retention time or size limit has been reached. The "compact" policy will enable log compaction, which retains the latest value for each key. It is also possible to specify both policies in a comma-separated list (for example, "delete,compact"). In this case, old segments will be discarded per the retention time and size configuration, while retained segments will be compacted. Compaction on Express brokers is triggered after the data in a partition reaches 256 MB.  | 
|  compression.type  |  Specify the final compression type for a given topic. This configuration accepts the standard compression codecs (`gzip`, `snappy`, `lz4`, `zstd`). It additionally accepts `uncompressed` which is equivalent to no compression; and `producer` which means retain the original compression codec set by the producer.  | 
| delete.retention.ms |  The amount of time to retain delete tombstone markers for log compacted topics. This setting also gives a bound on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage. Else, delete tombstones might be collected before they complete their scan. The default value for this setting is 86400000 (24 \$1 60 \$1 60 \$1 1000 ms, that is, 1 day), Apache Kafka Default  | 
|  max.message.bytes  |  The largest record batch size allowed by Kafka (after compression, if compression is enabled). If this is increased and there are consumers older than `0.10.2`, the consumers' fetch size must also be increased so that they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case. This can be set per topic with the topic level `max.message.bytes config`.  | 
|  message.timestamp.after.max.ms  |  This configuration sets the allowable timestamp difference between the message timestamp and the broker's timestamp. The message timestamp can be later than or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this configuration. If `message.timestamp.type=CreateTime`, the message will be rejected if the difference in timestamps exceeds this specified threshold. This configuration is ignored if `message.timestamp.type=LogAppendTime`.  | 
|  message.timestamp.before.max.ms  |  This configuration sets the allowable timestamp difference between the broker's timestamp and the message timestamp. The message timestamp can be earlier than or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this configuration. If `message.timestamp.type=CreateTime`, the message will be rejected if the difference in timestamps exceeds this specified threshold. This configuration is ignored if `message.timestamp.type=LogAppendTime`.  | 
|  message.timestamp.type  |  Define whether the timestamp in the message is message create time or log append time. The value should be either `CreateTime` or `LogAppendTime`  | 
| min.compaction.lag.ms |  The minimum time a message will remain uncompacted in the log. This setting is only applicable for logs that are being compacted. The default value for this setting is 0, Apache Kafka Default  | 
| max.compaction.lag.ms |  The maximum time a message will remain ineligible for compaction in the log. This setting is only applicable for logs that are being compacted. This configuration would be bounded in the range of [7 days, Long.Max]. The default value for this setting is 9223372036854775807, Apache Kafka Default.  | 
|  retention.bytes  |  This configuration controls the maximum size a partition (which consists of log segments) can grow to before we will discard old log segments to free up space if we are using the "delete" retention policy. By default there is no size limit only a time limit. Since this limit is enforced at the partition level, multiply it by the number of partitions to compute the topic retention in bytes. Additionally, `retention.bytes configuration` operates independently of `segment.ms` and `segment.bytes` configurations. Moreover, it triggers the rolling of new segment if the `retention.bytes` is configured to zero.  | 
|  retention.ms  |  This configuration controls the maximum time we will retain a log before we will discard old log segments to free up space if we are using the "delete" retention policy. This represents an SLA on how soon consumers must read their data. If set to `-1`, no time limit is applied. Additionally, `retention.ms` configuration operates independently of `segment.ms` and `segment.bytes` configurations. Moreover, it triggers the rolling of new segment if the `retention.ms` condition is satisfied.  | 

# Express brokers read-only configurations
<a name="msk-configuration-express-read-only"></a>

Amazon MSK sets the values for these configurations and protects them from change that may affect the availability of your cluster. These values may change depending on the Apache Kafka version running on the cluster, so remember to check the values from your specific cluster.

The following table lists the read-only configurations for Express brokers.


| Property | Description | Express Broker Value | 
| --- | --- | --- | 
| broker.id | The broker id for this server. | 1,2,3... | 
| broker.rack | Rack of the broker. This will be used in rack aware replication assignment for fault tolerance. Examples: `RACK1`, `us-east-1d` | AZ ID or Subnet ID | 
|  default.replication.factor  |  Default replication factors for all topics.  |  3  | 
| fetch.max.bytes | The maximum number of bytes we will return for a fetch request. | Apache Kafka Default | 
| group.max.size | The maximum number of consumers that a single consumer group can accommodate. | Apache Kafka Default | 
| inter.broker.listener.name | Name of listener used for communication between brokers. | REPLICATION\$1SECURE or REPLICATION | 
| inter.broker.protocol.version | Specifies which version of the inter-broker protocol is used. | Apache Kafka Default | 
| listeners | Listener List - Comma-separated list of URIs we will listen on and the listener names. You can set the advertised.listeners property, but not the listeners property. | MSK-generated | 
| log.message.format.version | Specify the message format version the broker will use to append messages to the logs. | Apache Kafka Default | 
| min.insync.replicas | When a producer sets acks to `all` (or `-1`), the value in `min.insync.replicas` specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, the producer raises an exception (either `NotEnoughReplicas` or `NotEnoughReplicasAfterAppend`). You can use value of acks from your producer to enforce greater durability guarantees. By setting acks to "all". This ensures that the producer raises an exception if a majority of replicas don't receive a write. | 2 | 
| num.io.threads | Number of threads that the server uses to produce requests, which may include disk I/O. (m7g.large, 8), (m7g.xlarge, 8), (m7g.2xlarge, 16), (m7g.4xlarge, 32), (m7g.8xlarge, 64), (m7g.12xlarge, 96), (m7g.16xlarge, 128) | Based on instance type. =Math.max(8, 2 \$1 vCPUs) | 
| num.network.threads | Number of threads that the server uses to receive requests from the network and send responses to the network. (m7g.large, 8), (m7g.xlarge, 8), (m7g.2xlarge, 8), (m7g.4xlarge, 16), (m7g.8xlarge, 32), (m7g.12xlarge, 48), (m7g.16xlarge, 64) | Based on instance type. =Math.max(8, vCPUs) | 
| replica.fetch.response.max.bytes | The maximum number of bytes expected for the entire fetch response. Records are fetched in batches, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure progress. This isn't an absolute maximum. The message.max.bytes (broker config) or max.message.bytes (topic config) properties specify the maximum record batch size that the broker accepts. | Apache Kafka Default | 
| request.timeout.ms | The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses, the client will resend the request if necessary or fail the request if retries are exhausted. | Apache Kafka Default | 
| transaction.state.log.min.isr | Overridden min.insync.replicas configuration for the transaction topic. | 2 | 
| transaction.state.log.replication.factor | The replication factor for the transaction topic. | Apache Kafka Default | 
| unclean.leader.election.enable | Allows replicas not in the ISR set to serve as leader as a last resort, even though this might result in data loss. | FALSE | 

# Broker configuration operations
<a name="msk-configuration-operations"></a>

Apache Kafka broker configurations are either static or dynamic. Static configurations require a broker restart for the configuration to be applied. Dynamic configurations do not need a broker restart for the configuration to be updated. For more information about configuration properties and update modes, see Apache Kafka Configuration. 

This topic describes how to create custom MSK configurations and how to perform operations on them. For information about how to use MSK configurations to create or update clusters, see [Amazon MSK key features and concepts](operations.md).

**Topics**
+ [Create a configuration](msk-configuration-operations-create.md)
+ [Update configuration](msk-configuration-operations-update.md)
+ [Delete configuration](msk-configuration-operations-delete.md)
+ [Get configuration metadata](msk-configuration-operations-describe.md)
+ [Get details about configuration revision](msk-configuration-operations-describe-revision.md)
+ [List configurations in your account for the current Region](msk-configuration-operations-list.md)
+ [Amazon MSK configuration states](msk-configuration-states.md)

# Create a configuration
<a name="msk-configuration-operations-create"></a>

This process describes how to create a custom Amazon MSK configuration and how to perform operations on it.

1. Create a file where you specify the configuration properties that you want to set and the values that you want to assign to them. The following are the contents of an example configuration file.

   ```
   auto.create.topics.enable = true
   
   log.roll.ms = 604800000
   ```

1. Run the following AWS CLI command, and replace *config-file-path* with the path to the file where you saved your configuration in the previous step.
**Note**  
The name that you choose for your configuration must match the following regex: "^[0-9A-Za-z][0-9A-Za-z-]\$10,\$1\$1".

   ```
   aws kafka create-configuration --name "ExampleConfigurationName" --description "Example configuration description." --kafka-versions "1.1.1" --server-properties fileb://config-file-path
   ```

   The following is an example of a successful response after you run this command.

   ```
   {
       "Arn": "arn:aws:kafka:us-east-1:123456789012:configuration/SomeTest/abcdabcd-1234-abcd-1234-abcd123e8e8e-1",
       "CreationTime": "2019-05-21T19:37:40.626Z",
       "LatestRevision": {
           "CreationTime": "2019-05-21T19:37:40.626Z",
           "Description": "Example configuration description.",
           "Revision": 1
       },
       "Name": "ExampleConfigurationName"
   }
   ```

1. The previous command returns an Amazon Resource Name (ARN) for your new configuration. Save this ARN because you need it to refer to this configuration in other commands. If you lose your configuration ARN, you can list all the configurations in your account to find it again.

# Update configuration
<a name="msk-configuration-operations-update"></a>

This process describes how to update a custom Amazon MSK configuration.

1. Create a file where you specify the configuration properties that you want to update and the values that you want to assign to them. The following are the contents of an example configuration file.

   ```
   auto.create.topics.enable = true
   
   min.insync.replicas = 2
   ```

1. Run the following AWS CLI command, and replace *config-file-path* with the path to the file where you saved your configuration in the previous step.

   Replace *configuration-arn* with the ARN that you obtained when you created the configuration. If you didn't save the ARN when you created the configuration, you can use the `list-configurations` command to list all configuration in your account. The configuration that you want in the list appears in the response. The ARN of the configuration also appears in that list.

   ```
   aws kafka update-configuration --arn configuration-arn --description "Example configuration revision description." --server-properties fileb://config-file-path
   ```

1. The following is an example of a successful response after you run this command.

   ```
   {
       "Arn": "arn:aws:kafka:us-east-1:123456789012:configuration/SomeTest/abcdabcd-1234-abcd-1234-abcd123e8e8e-1",
       "LatestRevision": {
           "CreationTime": "2020-08-27T19:37:40.626Z",
           "Description": "Example configuration revision description.",
           "Revision": 2
       }
   }
   ```

# Delete configuration
<a name="msk-configuration-operations-delete"></a>

The following procedure shows how to delete a configuration that isn't attached to a cluster. You can't delete a configuration that's attached to a cluster.

1. To run this example, replace *configuration-arn* with the ARN that you obtained when you created the configuration. If you didn't save the ARN when you created the configuration, you can use the `list-configurations` command to list all configuration in your account. The configuration that you want in the list appears in the response. The ARN of the configuration also appears in that list.

   ```
   aws kafka delete-configuration --arn configuration-arn
   ```

1. The following is an example of a successful response after you run this command.

   ```
   {
       "arn": " arn:aws:kafka:us-east-1:123456789012:configuration/SomeTest/abcdabcd-1234-abcd-1234-abcd123e8e8e-1",
       "state": "DELETING"
   }
   ```

# Get configuration metadata
<a name="msk-configuration-operations-describe"></a>

The following procedure shows how to describe an Amazon MSK configuration to get metadata about the configuration.

1. The following command returns metadata about the configuration. To get a detailed description of the configuration, run the `describe-configuration-revision`.

   To run this example, replace *configuration-arn* with the ARN that you obtained when you created the configuration. If you didn't save the ARN when you created the configuration, you can use the `list-configurations` command to list all configuration in your account. The configuration that you want in the list appears in the response. The ARN of the configuration also appears in that list.

   ```
   aws kafka describe-configuration --arn configuration-arn
   ```

1. The following is an example of a successful response after you run this command.

   ```
   {
       "Arn": "arn:aws:kafka:us-east-1:123456789012:configuration/SomeTest/abcdabcd-abcd-1234-abcd-abcd123e8e8e-1",
       "CreationTime": "2019-05-21T00:54:23.591Z",
       "Description": "Example configuration description.",
       "KafkaVersions": [
           "1.1.1"
       ],
       "LatestRevision": {
           "CreationTime": "2019-05-21T00:54:23.591Z",
           "Description": "Example configuration description.",
           "Revision": 1
       },
       "Name": "SomeTest"
   }
   ```

# Get details about configuration revision
<a name="msk-configuration-operations-describe-revision"></a>

This process gets you a detailed description of the Amazon MSK configuration revision.

If you use the `describe-configuration` command to describe an MSK configuration, you see the metadata of the configuration. To get a description of the configuration, use the command, `describe-configuration-revision`.
+ Run the following command and replace *configuration-arn* with the ARN that you obtained when you created the configuration. If you didn't save the ARN when you created the configuration, you can use the `list-configurations` command to list all configuration in your account. The configuration that you want in the list that appears in the response. The ARN of the configuration also appears in that list.

  ```
  aws kafka describe-configuration-revision --arn configuration-arn --revision 1
  ```

  The following is an example of a successful response after you run this command.

  ```
  {
      "Arn": "arn:aws:kafka:us-east-1:123456789012:configuration/SomeTest/abcdabcd-abcd-1234-abcd-abcd123e8e8e-1",
      "CreationTime": "2019-05-21T00:54:23.591Z",
      "Description": "Example configuration description.",
      "Revision": 1,
      "ServerProperties": "YXV0by5jcmVhdGUudG9waWNzLmVuYWJsZSA9IHRydWUKCgp6b29rZWVwZXIuY29ubmVjdGlvbi50aW1lb3V0Lm1zID0gMTAwMAoKCmxvZy5yb2xsLm1zID0gNjA0ODAwMDAw"
  }
  ```

  The value of `ServerProperties` is encoded with base64. If you use a base64 decoder (for example, https://www.base64decode.org/) to decode it manually, you get the contents of the original configuration file that you used to create the custom configuration. In this case, you get the following:

  ```
  auto.create.topics.enable = true
  
  log.roll.ms = 604800000
  ```

# List configurations in your account for the current Region
<a name="msk-configuration-operations-list"></a>

This process describes how to list all Amazon MSK configurations in your account for the current AWS Region.
+ Run the following command.

  ```
  aws kafka list-configurations
  ```

  The following is an example of a successful response after you run this command.

  ```
  {
      "Configurations": [
          {
              "Arn": "arn:aws:kafka:us-east-1:123456789012:configuration/SomeTest/abcdabcd-abcd-1234-abcd-abcd123e8e8e-1",
              "CreationTime": "2019-05-21T00:54:23.591Z",
              "Description": "Example configuration description.",
              "KafkaVersions": [
                  "1.1.1"
              ],
              "LatestRevision": {
                  "CreationTime": "2019-05-21T00:54:23.591Z",
                  "Description": "Example configuration description.",
                  "Revision": 1
              },
              "Name": "SomeTest"
          },
          {
              "Arn": "arn:aws:kafka:us-east-1:123456789012:configuration/SomeTest/abcdabcd-1234-abcd-1234-abcd123e8e8e-1",
              "CreationTime": "2019-05-03T23:08:29.446Z",
              "Description": "Example configuration description.",
              "KafkaVersions": [
                  "1.1.1"
              ],
              "LatestRevision": {
                  "CreationTime": "2019-05-03T23:08:29.446Z",
                  "Description": "Example configuration description.",
                  "Revision": 1
              },
              "Name": "ExampleConfigurationName"
          }
      ]
  }
  ```

# Amazon MSK configuration states
<a name="msk-configuration-states"></a>

An Amazon MSK configuration can be in one of the following states. To perform an operation on a configuration, the configuration must be in the `ACTIVE` or `DELETE_FAILED` state:
+ `ACTIVE`
+ `DELETING`
+ `DELETE_FAILED`

# Intelligent rebalancing for clusters
<a name="intelligent-rebalancing"></a>

Amazon MSK provides intelligent rebalancing for all new MSK Provisioned clusters with Express brokers. This feature automatically manages partition distribution and cluster scaling operations, eliminating the need for third-party tools. Intelligent rebalancing automatically rebalances partitions when you scale clusters up or down. It also continuously monitors your cluster’s health for resource imbalance or overload and redistributes workload.

Intelligent rebalancing provides fast scaling operations that complete within 30 minutes and doesn’t impact cluster availability during scaling. It’s turned on by default for all new MSK Express-based Provisioned clusters and works with the recommended maximum partition limit of 20,000 partitions per broker. Additionally, this feature is available at no additional cost and doesn’t require any configurations.

Effective 20th Nov 2025, Intelligent Rebalancing is available in all AWS Regions where Amazon MSK Express brokers are supported.

**Topics**
+ [How intelligent rebalancing works](#how-intelligent-rebalancing-works)
+ [Monitoring intelligent rebalancing metrics](#intelligent-rebalancing-metrics)
+ [Considerations to use intelligent rebalancing](#intelligent-rebalancing-considerations)
+ [Scaling Amazon MSK clusters up and down with a single operation](intelligent-rebalancing-scaling-clusters.md)
+ [Steady state rebalancing for Amazon MSK clusters](intelligent-rebalancing-self-balancing-paritions.md)

## How intelligent rebalancing works
<a name="how-intelligent-rebalancing-works"></a>

Intelligent rebalancing is turned on by default for all new MSK Provisioned clusters with Express brokers. It includes support for the following situations:
+ **Scaling up and down**: Lets you add or remove brokers to your MSK Express-based clusters with a single click. Once you specify the brokers to add or remove, intelligent rebalancing automatically redistributes partitions across the new cluster setup based on internal AWS best practices.
+ **Steady state rebalancing**: At steady state, this feature monitors your cluster’s health continuously and automatically rebalances partitions when:
  + Resource utilization becomes skewed across brokers.
  + Brokers become over-provisioned or under-utilized.
  + New brokers are added or existing brokers are removed.

**Note**  
If intelligent rebalancing is turned on, you won’t be able to use third-party tools, such as Cruise Control, for partition rebalancing. You must first pause intelligent rebalancing to use the partition reassignment API provided by these third-party tools.

You can use this feature in the Amazon MSK console. You can also use this feature using the AWS CLI, Amazon MSK APIs or AWS SDK, and AWS CloudFormation. For more information, see [Scaling Amazon MSK clusters](intelligent-rebalancing-scaling-clusters.md) and [Steady state rebalancing](intelligent-rebalancing-self-balancing-paritions.md).

## Monitoring intelligent rebalancing metrics
<a name="intelligent-rebalancing-metrics"></a>

You can monitor the status of ongoing and historical intelligent rebalancing operations using the following Amazon CloudWatch metrics:
+ `RebalanceInProgress`: This metric is published every minute with a value of 1 when rebalancing is ongoing and 0 otherwise.
+ `UnderProvisioned`: Indicates that a cluster is currently under provisioned and any partition rebalancing can’t be performed. You either need to add more brokers or scale-up your cluster’s instance type.

For information about monitoring an MSK Provisioned cluster, see [](monitoring.md) and [](cloudwatch-metrics.md).

## Considerations to use intelligent rebalancing
<a name="intelligent-rebalancing-considerations"></a>
+ Support for intelligent rebalancing is only available for new MSK Provisioned clusters with Express brokers.
+ For automatic partition reassignment, maximum support for up to 20,000 partitions per broker is available.
+ You can’t use partition reassignment APIs or third-party rebalancing tools when intelligent rebalancing is enabled. To use such APIs or third-party tools, you must first pause intelligent rebalancing for your MSK Express-based cluster.

# Scaling Amazon MSK clusters up and down with a single operation
<a name="intelligent-rebalancing-scaling-clusters"></a>

With intelligent rebalancing, you can scale your clusters up or down by editing the broker count in your clusters in a single action. You can do this in the Amazon MSK console, or by using the AWS CLI, Amazon MSK APIs or AWS SDK, and AWS CloudFormation. When you change the broker count, Amazon MSK does the following:
+ Automatically distributes partitions to new brokers.
+ Moves partitions from brokers being removed.

As you scale your clusters up and down, cluster availability for clients to produce and consume data remains unaffected.

**Topics**

------
#### [ Scaling clusters using AWS Management Console ]

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. On the **Clusters** page, choose a newly created Express-based cluster. For information about creating a provisioned Express-based cluster, see [Step 1: Create an MSK Provisioned cluster](create-cluster.md).

1. On the **Actions** dropdown list, choose **Edit number of brokers**.

1. On the **Edit number of brokers per zone** page, do one of the following:
   + To add more brokers in your cluster, choose **Add brokers to each Availability Zone**, and then enter the number of brokers you want to add.
   + To remove brokers from your cluster, choose **Remove one broker from each Availability Zone**.

1. Choose **Save changes**.

------
#### [ Scaling clusters using AWS CLI ]

You can scale your clusters up or down by editing their broker count. To do this in the AWS CLI, use the [update-broker-count](https://docs.aws.amazon.com/cli/latest/reference/kafka/update-broker-count.html) command, as shown in the following example. In this command, specify the number of brokers you want in your cluster in the `target-broker-count` parameter.

```
aws msk update-broker-count --cluster-arn arn:aws:kafka:us-east-1:123456789012:cluster/myCluster/abcd1234-5678-90ef-ghij-klmnopqrstuv-1 --current-version ABCDEF1GHIJK0L --target-broker-count 6
```

------
#### [ Scaling clusters using AWS SDK ]

You can scale your clusters up or down by programmatically editing the broker count. To do this using the AWS SDK, use the [UpdateBrokerCount](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-nodes-count.html#UpdateBrokerCount) API, as shown in the following example. For the `TargetNumberOfBrokerNodes` parameter, specify the number of brokers you want in your cluster.

```
update_broker_count_response = client.update_broker_count(
    ClusterArn='arn:aws:kafka:us-east-1:123456789012:cluster/myCluster/abcd1234-5678-90ef-ghij-klmnopqrstuv-1',
    CurrentVersion='ABCDEF1GHIJK0L',
    TargetNumberOfBrokerNodes=6
)
```

------

# Steady state rebalancing for Amazon MSK clusters
<a name="intelligent-rebalancing-self-balancing-paritions"></a>

Steady state rebalancing is a part of the intelligent rebalancing feature, which is turned on by default for all new MSK Provisioned clusters with Express brokers. As you scale your clusters up or down, Amazon MSK automatically handles partition management by distributing partitions to new brokers and moving partitions from brokers due for removal. To ensure optimal distribution of workload across brokers, intelligent rebalancing uses Amazon MSK best practices to determine thresholds for automatically initiating rebalancing for your brokers.

You can pause and resume steady state rebalancing when needed. Steady state rebalancing continuously monitors your cluster and does the following:
+ Tracks broker resource usage (CPU, network, storage).
+ Adjusts partition placement automatically without any impact on data availability.
+ Completes rebalancing operations up to 180x faster for Express brokers as compared to Standard brokers.
+ Maintains cluster performance.

**Topics**

------
#### [ Pause and resume steady state rebalancing in AWS Management Console ]

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. On the **Clusters** page, choose an Express-based cluster. For information about creating a provisioned Express-based cluster, see [Step 1: Create an MSK Provisioned cluster](create-cluster.md).

1. On the Cluster detail page, verify that the **Intelligent rebalancing** status is **Active**. If Intelligent rebalancing isn’t available or the status is **Paused**, create a new Express-based cluster.

1. On the **Actions** dropdown list, choose **Edit intelligent rebalancing**.

1. On the **Edit intelligent rebalancing** page, do the following:

   1. Choose **Paused**.

   1. Choose **Save changes**.

------
#### [ Pause and resume steady state rebalancing using AWS CLI ]

To set the rebalancing status of a cluster to **ACTIVE** using the AWS CLI, use the [update-rebalancing](https://docs.aws.amazon.com/cli/latest/reference/kafka/update-rebalancing.html) command, as shown in the following example. In this command, specify the status with the `rebalancing` parameter.

```
aws msk update-rebalancing --cluster-arn arn:aws:kafka:us-east-1:123456789012:cluster/myCluster/abcd1234-5678-90ef-ghij-klmnopqrstuv-1 --current-version ABCDEF1GHIJK0L --rebalancing "{\"Rebalancing\":{\"Status\":\"ACTIVE\"}}"
```

------
#### [ Pause and resume steady state rebalancing using AWS SDK ]

You can also set the rebalancing status of a cluster using the [UpdateRebalancingRequest](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-rebalancing.html#UpdateRebalancing) API to programmatically modify the broker count. The following examples show how to set the rebalancing status to **ACTIVE** and **PAUSED**.

```
final UpdateRebalancingRequest updateRebalancingRequest = new UpdateRebalancingRequest()
    .withClusterArn(arn:aws:kafka:us-east-1:123456789012:cluster/myCluster/abcd1234-5678-90ef-ghij-klmnopqrstuv-1)
    .withCurrentVersion(ABCDEF1GHIJK0L)
    .withRebalancing(new Rebalancing().withStatus("ACTIVE"));
```

```
final UpdateRebalancingRequest updateRebalancingRequest = new UpdateRebalancingRequest()
    .withClusterArn(arn:aws:kafka:us-east-1:123456789012:cluster/myCluster/abcd1234-5678-90ef-ghij-klmnopqrstuv-1)
    .withCurrentVersion(ABCDEF1GHIJK0L)
    .withRebalancing(new Rebalancing().withStatus("PAUSED"));
```

------

# Patching on MSK Provisioned clusters
<a name="patching-impact"></a>

Periodically, Amazon MSK updates software on the brokers in your cluster. Maintenance includes planned updates or unplanned repairs. Planned maintenance includes operating system updates, security updates, and other software updates required to maintain the health, security, and performance of your cluster. We perform unplanned maintenance to resolve sudden infrastructure degradation. We perform maintenance on Standard and Express brokers, but the experiences are different.

## Patching for Standard brokers
<a name="patching-standard-brokers"></a>

Updates to your Standard brokers have no impact on your applications' writes and reads if you follow [best practices](bestpractices.md).

Amazon MSK uses rolling updates for software to maintain high availability of your clusters. During this process, brokers are rebooted one at a time, and Kafka automatically moves leadership to another online broker. When you view cluster operations in the AWS Management Console or through the `DescribeClusterOperation` and `ListClusterOperations` APIs, these maintenance operations appear with an operation type of `SECURITY_PATCHING`. Kafka clients have built-in mechanisms to automatically detect the change in leadership for the partitions and continue to write and read data into a MSK cluster. Follow [Best practices for Apache Kafka clients](bestpractices-kafka-client.md) for smooth operation of your cluster at all times, including during patching.

Following a broker going offline, it is normal to see transient disconnect errors on your clients. You will also observe for a brief window (up to 2 mins, typically less) some spikes in p99 read and write latency (typically high milliseconds, up to \$12 seconds). These spikes are expected and are caused by the client re-reconnecting to a new leader broker; it does not impact your produce or consume and will resolve following the re-connect. For more information, see [Broker offline and client failover](https://docs.aws.amazon.com/msk/latest/developerguide/troubleshooting-offlinebroker-clientfailover.html).

You will also observe an increase in the metric `UnderReplicatedPartitions`, which is expected as the partitions on the broker that was shut down are no longer replicating data. This has no impact on applications' writes and reads as replicas for these partitions that are hosted on other brokers are now serving the requests.

After the software update, when the broker comes back online, it needs to "catch up" on the messages produced while it was offline. During catch up, you may also observe an increase in usage of the volume throughput and CPU. These should have no impact on writes and reads into the cluster if you have enough CPU, memory, network, and volume resources on your brokers.

## Patching for Express brokers
<a name="patching-express-brokers"></a>

There are no maintenance windows for Express brokers. Amazon MSK automatically updates your cluster on an ongoing basis in a time distributed manner, meaning you can expect occasional and singular broker reboots across the month. This ensures you do not need to make any plans or accommodations around one-time cluster-wide maintenance windows. As always, traffic will remain uninterrupted during a broker reboot as leadership will change to other brokers that will continue serving requests. When you view cluster operations in the AWS Management Console or through the `DescribeClusterOperation` and `ListClusterOperations` APIs, these maintenance operations appear with an operation type of `BROKER_UPDATE`.

Express brokers come configured with best practice settings and guardrails that make your cluster resilient to load changes that may occur during maintenance. Amazon MSK sets throughput quotas on your Express brokers to mitigate the impact of overloading your cluster which can lead to issues during broker restarts. These improvements eliminate the need for advance notifications, planning, and maintenance windows when you use Express brokers.

Express brokers always replicate your data three ways so your clients automatically failover during reboots. You don't need to worry about topics becoming unavailable because of replication factor set to 1 or 2. Also, catch up for a restarted Express broker is faster than on Standard brokers. The faster patching speed on Express brokers means that there will be minimal planning disruption to any control plane activities you may have scheduled for your cluster.

As with all Apache Kafka applications, there is still a shared client-server contract for clients connecting to Express brokers. It's still critical to configure your clients to handle leadership failover between brokers. Follow the [Best practices for Apache Kafka clients](bestpractices-kafka-client.md) for a smooth operation of your cluster at all times, including during patching. Following a broker restart, it is normal to see transient [ disconnect errors on your clients](troubleshooting-offlinebroker-clientfailover.md). This will not affect your produce and consume as follower brokers will take over partition leadership. Your Apache Kafka clients will automatically fail-over and start sending requests to the new leader brokers.

# Broker offline and client failover
<a name="troubleshooting-offlinebroker-clientfailover"></a>

Kafka allows for an offline broker; a single offline broker in a healthy and balanced cluster following best practices will not see impact or cause failure to produce or consume. This is because another broker will take over partition leadership and because the Kafka client lib will automatically fail-over and start sending requests to the new leader brokers. 

**Client server contract**  
This results in a shared contract between the client library and server-side behavior; the server must successfully assign one or more new leaders and the client must change brokers to send requests to the new leaders in a timely manner.

Kafka uses exceptions to control this flow:

**An example procedure**

1. Broker A enters an offline state.

1. Kafka client receives an exception (typically network disconnect or not\$1leader\$1for\$1partition).

1. These exceptions trigger the Kafka client to update its metadata so that it knows about the latest leaders. 

1. Kafka client resumes sending requests to the new partition leaders on other brokers.

This process typically takes less than 2 seconds with the vended Java client and default configurations. The client side errors are verbose and repetitive but not cause for concern, as denoted by the “WARN” level.

**Example: Exception 1**  
`10:05:25.306 [kafka-producer-network-thread | producer-1] WARN o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1] Got error produce response with correlation id 864845 on topic-partition msk-test-topic-1-0, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION. Error Message: Disconnected from node 2`

**Example: Exception 2**  
`10:05:25.306 [kafka-producer-network-thread | producer-1] WARN o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition msk-test-topic-1-41 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now"`

Kafka clients will automatically resolve these errors typically within 1 second and at most 3 seconds. This presents as produce/consume latency at p99 in client side metrics (typically high milliseconds in the 100’s). Any longer than this typically indicates an issue with client configuration or server-side controller load. Please see the troubleshooting section.

A successful fail-over can be verified by checking the `BytesInPerSec` and `LeaderCount` metrics increase on other brokers which proves that the traffic and leadership moved as expected. You will also observe an increase in the `UnderReplicatedPartitions` metric, which is expected when replicas are offline with the shutdown broker.

**Troubleshooting**  
The above flow can be disrupted by breaking the client-server contract. The most common reasons for issue include:
+ Misconfiguration or incorrect usage of Kafka client libs.
+ Unexpected default behaviours and bugs with 3rd party client libs.
+ Overloaded controller resulting in slower partition leader assignment.
+ New controller is being elected resulting in slower partition leader assignment.

In order to ensure correct behaviour to handle leadership fail-over, we recommend:
+ Server side [best practices](https://docs.aws.amazon.com/msk/latest/developerguide/bestpractices.html) must be followed to ensure that the controller broker is scaled appropriately to avoid slow leadership assignment.
+ Client libraries must have retries enabled to ensure that client handles the failover.
+ Client libraries must have retry.backoff.ms configured (default 100) to avoid connection/request storms.
+ Client libraries must set request.timeout.ms and delivery.timeout.ms to values inline with the applications’ SLA. Higher values will result in slower fail-over for certain failure types.
+ Client libraries must ensure that bootstrap.servers contains at least 3 random brokers to avoid an availability impact on initial discovery.
+ Some client libraries are lower level than others and expect the application developer to implement retry logic and exception handling themselves. Please refer to client lib specific documentation for example usage, and ensure that correct reconnect/retry logic is followed.
+ We recommend monitoring client side latency for produces, successful request count, and error count for non-retryable errors.
+ We have observed that older 3rd party golang and ruby libraries remain verbose during an entire broker offline time period despite produces and consume requests being unaffected. We recommend you always monitor your business level metrics besides request metrics for success and errors to determine if there is real impact vs noise in your logs.
+ Customers should not alarm on transient exceptions for network/not\$1leader as they are normal, non-impacting, and expected as part of the kafka protocol.
+ Customers should not alarm on UnderReplicatedPartitions as they are normal, non-impacting, and expected during a single offline broker.

# Security in Amazon MSK
<a name="security"></a>

Cloud security at AWS is the highest priority. As an AWS customer, you benefit from a data center and network architecture that is built to meet the requirements of the most security-sensitive organizations.

Security is a shared responsibility between AWS and you. The [shared responsibility model](https://aws.amazon.com/compliance/shared-responsibility-model/) describes this as security *of* the cloud and security *in* the cloud:
+ **Security of the cloud** – AWS is responsible for protecting the infrastructure that runs AWS services in the AWS Cloud. AWS also provides you with services that you can use securely. Third-party auditors regularly test and verify the effectiveness of our security as part of the [AWS Compliance Programs](https://aws.amazon.com/compliance/programs/). To learn about the compliance programs that apply to Amazon Managed Streaming for Apache Kafka, see [Amazon Web Services in Scope by Compliance Program](https://aws.amazon.com/compliance/services-in-scope/).
+ **Security in the cloud** – Your responsibility is determined by the AWS service that you use. You are also responsible for other factors including the sensitivity of your data, your company's requirements, and applicable laws and regulations. 

This documentation helps you understand how to apply the shared responsibility model when using Amazon MSK. The following topics show you how to configure Amazon MSK to meet your security and compliance objectives. You also learn how to use other Amazon Web Services that help you to monitor and secure your Amazon MSK resources. 

**Topics**
+ [Data protection in Amazon Managed Streaming for Apache Kafka](data-protection.md)
+ [Authentication and authorization for Amazon MSK APIs](security-iam.md)
+ [Authentication and authorization for Apache Kafka APIs](kafka_apis_iam.md)
+ [Changing an Amazon MSK cluster's security group](change-security-group.md)
+ [Control access to Apache ZooKeeper nodes in your Amazon MSK cluster](zookeeper-security.md)
+ [Compliance validation for Amazon Managed Streaming for Apache Kafka](MSK-compliance.md)
+ [Resilience in Amazon Managed Streaming for Apache Kafka](disaster-recovery-resiliency.md)
+ [Infrastructure security in Amazon Managed Streaming for Apache Kafka](infrastructure-security.md)

# Data protection in Amazon Managed Streaming for Apache Kafka
<a name="data-protection"></a>

The AWS [shared responsibility model](https://aws.amazon.com/compliance/shared-responsibility-model/) applies to data protection in Amazon Managed Streaming for Apache Kafka. As described in this model, AWS is responsible for protecting the global infrastructure that runs all of the AWS Cloud. You are responsible for maintaining control over your content that is hosted on this infrastructure. You are also responsible for the security configuration and management tasks for the AWS services that you use. For more information about data privacy, see the [Data Privacy FAQ](https://aws.amazon.com/compliance/data-privacy-faq/). For information about data protection in Europe, see the [AWS Shared Responsibility Model and GDPR](https://aws.amazon.com/blogs/security/the-aws-shared-responsibility-model-and-gdpr/) blog post on the *AWS Security Blog*.

For data protection purposes, we recommend that you protect AWS account credentials and set up individual users with AWS IAM Identity Center or AWS Identity and Access Management (IAM). That way, each user is given only the permissions necessary to fulfill their job duties. We also recommend that you secure your data in the following ways:
+ Use multi-factor authentication (MFA) with each account.
+ Use SSL/TLS to communicate with AWS resources. We require TLS 1.2 and recommend TLS 1.3.
+ Set up API and user activity logging with AWS CloudTrail. For information about using CloudTrail trails to capture AWS activities, see [Working with CloudTrail trails](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-trails.html) in the *AWS CloudTrail User Guide*.
+ Use AWS encryption solutions, along with all default security controls within AWS services.
+ Use advanced managed security services such as Amazon Macie, which assists in discovering and securing sensitive data that is stored in Amazon S3.
+ If you require FIPS 140-3 validated cryptographic modules when accessing AWS through a command line interface or an API, use a FIPS endpoint. For more information about the available FIPS endpoints, see [Federal Information Processing Standard (FIPS) 140-3](https://aws.amazon.com/compliance/fips/).

We strongly recommend that you never put confidential or sensitive information, such as your customers' email addresses, into tags or free-form text fields such as a **Name** field. This includes when you work with Amazon MSK or other AWS services using the console, API, AWS CLI, or AWS SDKs. Any data that you enter into tags or free-form text fields used for names may be used for billing or diagnostic logs. If you provide a URL to an external server, we strongly recommend that you do not include credentials information in the URL to validate your request to that server.

**Topics**
+ [Amazon MSK encryption](msk-encryption.md)
+ [Get started with Amazon MSK encryption](msk-working-with-encryption.md)
+ [Use Amazon MSK APIs with Interface VPC Endpoints](privatelink-vpc-endpoints.md)

# Amazon MSK encryption
<a name="msk-encryption"></a>

Amazon MSK provides data encryption options that you can use to meet strict data management requirements. The certificates that Amazon MSK uses for encryption must be renewed every 13 months. Amazon MSK automatically renews these certificates for all clusters. Express broker clusters remain in `ACTIVE` state when Amazon MSK starts the certificate-update operation. For standard brokers clusters, Amazon MSK sets the state of the cluster to `MAINTENANCE` when it starts the certificate-update operation. MSK sets it back to `ACTIVE` when the update is done. While a cluster is in the certificate-update operation, you can continue to produce and consume data, but you can't perform any update operations on it.

## Amazon MSK encryption at rest
<a name="msk-encryption-at-rest"></a>

Amazon MSK integrates with [AWS Key Management Service](https://docs.aws.amazon.com/kms/latest/developerguide/) (KMS) to offer transparent server-side encryption. Amazon MSK always encrypts your data at rest. When you create an MSK cluster, you can specify the AWS KMS key that you want Amazon MSK to use to encrypt your data at rest. If you don't specify a KMS key, Amazon MSK creates an [AWS managed key](https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#aws-managed-cmk) for you and uses it on your behalf. For more information about KMS keys, see [AWS KMS keys](https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#kms_keys) in the *AWS Key Management Service Developer Guide*.

## Amazon MSK encryption in transit
<a name="msk-encryption-in-transit"></a>

Amazon MSK uses TLS 1.2. By default, it encrypts data in transit between the brokers of your MSK cluster. You can override this default at the time you create the cluster. 

For communication between clients and brokers, you must specify one of the following three settings:
+ Only allow TLS encrypted data. This is the default setting.
+ Allow both plaintext, as well as TLS encrypted data.
+ Only allow plaintext data.

Amazon MSK brokers use public AWS Certificate Manager certificates. Therefore, any truststore that trusts Amazon Trust Services also trusts the certificates of Amazon MSK brokers.

While we highly recommend enabling in-transit encryption, it can add additional CPU overhead and a few milliseconds of latency. Most use cases aren't sensitive to these differences, however, and the magnitude of impact depends on the configuration of your cluster, clients, and usage profile. 

# Get started with Amazon MSK encryption
<a name="msk-working-with-encryption"></a>

When creating an MSK cluster, you can specify encryption settings in JSON format. The following is an example.

```
{
   "EncryptionAtRest": {
       "DataVolumeKMSKeyId": "arn:aws:kms:us-east-1:123456789012:key/abcdabcd-1234-abcd-1234-abcd123e8e8e"
    },
   "EncryptionInTransit": {
        "InCluster": true,
        "ClientBroker": "TLS"
    }
}
```

For `DataVolumeKMSKeyId`, you can specify a [customer managed key](https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#customer-cmk) or the AWS managed key for MSK in your account (`alias/aws/kafka`). If you don't specify `EncryptionAtRest`, Amazon MSK still encrypts your data at rest under the AWS managed key. To determine which key your cluster is using, send a `GET` request or invoke the `DescribeCluster` API operation. 

For `EncryptionInTransit`, the default value of `InCluster` is true, but you can set it to false if you don't want Amazon MSK to encrypt your data as it passes between brokers.

To specify the encryption mode for data in transit between clients and brokers, set `ClientBroker` to one of three values: `TLS`, `TLS_PLAINTEXT`, or `PLAINTEXT`.

**Topics**
+ [Specify encryption settings when creating a Amazon MSK cluster](msk-working-with-encryption-cluster-create.md)
+ [Test Amazon MSK TLS encryption](msk-working-with-encryption-test-tls.md)

# Specify encryption settings when creating a Amazon MSK cluster
<a name="msk-working-with-encryption-cluster-create"></a>

This process describes how to specify encryption settings when creating a Amazon MSK cluster.

**Specify encryption settings when creating a cluster**

1. Save the contents of the previous example in a file and give the file any name that you want. For example, call it `encryption-settings.json`.

1. Run the `create-cluster` command and use the `encryption-info` option to point to the file where you saved your configuration JSON. The following is an example. Replace *\$1YOUR MSK VERSION\$1* with a version that matches the Apache Kafka client version. For information on how to find your MSK cluster version, see [Determining your MSK cluster version](create-topic.md#find-msk-cluster-version). Be aware that using an Apache Kafka client version that is not the same as your MSK cluster version may lead to Apache Kafka data corruption, loss and down time.

   ```
   aws kafka create-cluster --cluster-name "ExampleClusterName" --broker-node-group-info file://brokernodegroupinfo.json --encryption-info file://encryptioninfo.json --kafka-version "{YOUR MSK VERSION}" --number-of-broker-nodes 3
   ```

   The following is an example of a successful response after running this command.

   ```
   {
       "ClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/SecondTLSTest/abcdabcd-1234-abcd-1234-abcd123e8e8e",
       "ClusterName": "ExampleClusterName",
       "State": "CREATING"
   }
   ```

# Test Amazon MSK TLS encryption
<a name="msk-working-with-encryption-test-tls"></a>

This process describes how to test TLS encryption on Amazon MSK.

**To test TLS encryption**

1. Create a client machine following the guidance in [Step 3: Create a client machine](create-client-machine.md).

1. Install Apache Kafka on the client machine.

1. In this example we use the JVM truststore to talk to the MSK cluster. To do this, first create a folder named `/tmp` on the client machine. Then, go to the `bin` folder of the Apache Kafka installation, and run the following command. (Your JVM path might be different.)

   ```
   cp /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.201.b09-0.amzn2.x86_64/jre/lib/security/cacerts /tmp/kafka.client.truststore.jks
   ```

1. While still in the `bin` folder of the Apache Kafka installation on the client machine, create a text file named `client.properties` with the following contents.

   ```
   security.protocol=SSL
   ssl.truststore.location=/tmp/kafka.client.truststore.jks
   ```

1. Run the following command on a machine that has the AWS CLI installed, replacing *clusterARN* with the ARN of your cluster.

   ```
   aws kafka get-bootstrap-brokers --cluster-arn clusterARN
   ```

   A successful result looks like the following. Save this result because you need it for the next step.

   ```
   {
       "BootstrapBrokerStringTls": "a-1.example.g7oein.c2.kafka.us-east-1.amazonaws.com:0123,a-3.example.g7oein.c2.kafka.us-east-1.amazonaws.com:0123,a-2.example.g7oein.c2.kafka.us-east-1.amazonaws.com:0123"
   }
   ```

1. Run the following command, replacing *BootstrapBrokerStringTls* with one of the broker endpoints that you obtained in the previous step.

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list BootstrapBrokerStringTls --producer.config client.properties --topic TLSTestTopic
   ```

1. Open a new command window and connect to the same client machine. Then, run the following command to create a console consumer.

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-consumer.sh --bootstrap-server BootstrapBrokerStringTls --consumer.config client.properties --topic TLSTestTopic
   ```

1. In the producer window, type a text message followed by a return, and look for the same message in the consumer window. Amazon MSK encrypted this message in transit.

For more information about configuring Apache Kafka clients to work with encrypted data, see [Configuring Kafka Clients](https://kafka.apache.org/documentation/#security_configclients).

# Use Amazon MSK APIs with Interface VPC Endpoints
<a name="privatelink-vpc-endpoints"></a>

You can use an Interface VPC Endpoint, powered by AWS PrivateLink, to prevent traffic between your Amazon VPC and Amazon MSK APIs from leaving the Amazon network. Interface VPC Endpoints don't require an internet gateway, NAT device, VPN connection, or AWS Direct Connect connection. [AWS PrivateLink](https://docs.aws.amazon.com/vpc/latest/privatelink/what-is-privatelink.html) is an AWS technology that enables private communication between AWS services using an elastic network interface with private IPs in your Amazon VPC. For more information, see [Amazon Virtual Private Cloud](https://docs.aws.amazon.com/vpc/latest/userguide/what-is-amazon-vpc.html) and [Interface VPC Endpoints (AWS PrivateLink)](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html#create-interface-endpoint).

Your applications can connect with Amazon MSK Provisioned and MSK Connect APIs using AWS PrivateLink. To get started, create an Interface VPC Endpoint for your Amazon MSK API to start traffic flowing from and to your Amazon VPC resources through the Interface VPC Endpoint. FIPS-enabled Interface VPC endpoints are available for US Regions. For more information, see [Create an Interface Endpoint](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html#create-interface-endpoint).

Using this feature, your Apache Kafka clients can dynamically fetch the connection strings to connect with MSK Provisioned or MSK Connect resources without traversing the internet to retrieve the connection strings.

When creating an Interface VPC Endpoint, choose one of the following service name endpoints:

**For MSK Provisioned:**
+ The following service name endpoints are no longer supported for new connections:
  + com.amazonaws.region.kafka
  + com.amazonaws.region.kafka-fips (FIPS-enabled)
+ Dualstack endpoint service supporting both IPv4 and IPv6 traffic are:
  + aws.api.region.kafka-api
  + aws.api.region.kafka-api-fips (FIPS-enabled)

To set up the dualstack endpoints you must follow [Dual-stack and FIPS endpoints](https://docs.aws.amazon.com/sdkref/latest/guide/feature-endpoints.html) guidelines.

Where region is your region name. Choose this service name to work with MSK Provisioned-compatible APIs. For more information, see [Operations](https://docs.aws.amazon.com/msk/1.0/apireference/operations.html) in the *https://docs.aws.amazon.com/msk/1.0/apireference/*.

**For MSK Connect:**
+ com.amazonaws.region.kafkaconnect

Where region is your region name. Choose this service name to work with MSK Connect-compatible APIs. For more information, see [Actions](https://docs.aws.amazon.com/MSKC/latest/mskc/API_Operations.html) in the *Amazon MSK Connect API Reference*.

For more information, including step-by-step instructions to create an interface VPC endpoint, see [Creating an interface endpoint](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html#create-interface-endpoint) in the *AWS PrivateLink Guide*.

## Control access to VPC endpoints for Amazon MSK Provisioned or MSK Connect APIs
<a name="vpc-endpoints-control-access"></a>

VPC endpoint policies let you control access by either attaching a policy to a VPC endpoint or by using additional fields in a policy that is attached to an IAM user, group, or role to restrict access to occur only through the specified VPC endpoint. Use the appropriate example policy to define access permissions for either MSK Provisioned or MSK Connect service.

If you don't attach a policy when you create an endpoint, Amazon VPC attaches a default policy for you that allows full access to the service. An endpoint policy doesn't override or replace IAM identity-based policies or service-specific policies. It's a separate policy for controlling access from the endpoint to the specified service.

For more information, see [Controlling Access to Services with VPC Endpoints](https://docs.aws.amazon.com/vpc/latest/privatelink/vpc-endpoints-access.html) in the *AWS PrivateLink Guide*.

------
#### [ MSK Provisioned — VPC policy example ]

**Read-only access**  
This sample policy can be attached to a VPC endpoint. (For more information, see Controlling Access to Amazon VPC Resources). It restricts actions to only listing and describing operations through the VPC endpoint to which it is attached.

```
{
  "Statement": [
    {
      "Sid": "MSKReadOnly",
      "Principal": "*",
      "Action": [
        "kafka:List*",
        "kafka:Describe*"
      ],
      "Effect": "Allow",
      "Resource": "*"
    }
  ]
}
```

**MSK Provisioned — VPC endpoint policy example**  
Restrict access to a specific MSK cluster

This sample policy can be attached to a VPC endpoint. It restricts access to a specific Kafka cluster through the VPC endpoint to which it is attached.

```
{
  "Statement": [
    {
      "Sid": "AccessToSpecificCluster",
      "Principal": "*",
      "Action": "kafka:*",
      "Effect": "Allow",
      "Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/MyCluster"
    }
  ]
}
```

------
#### [ MSK Connect — VPC endpoint policy example ]

**List connectors and create a new connector**  
The following is an example of an endpoint policy for MSK Connect. This policy allows the specified role to list connectors and create a new connector.

```
{
    "Version": "2012-10-17", 		 	 	 		 	 	 
    "Statement": [
        {
            "Sid": "MSKConnectPermissions",
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:ListConnectors",
                "kafkaconnect:CreateConnector"
            ],
            "Resource": "*",
            "Principal": {
                "AWS": [
                    "arn:aws:iam::111122223333:role/MyMSKConnectExecutionRole"
                ]
            }
        }
    ]
}
```

**MSK Connect — VPC endpoint policy example**  
Allows only requests from a specific IP address in the specified VPC

The following example shows a policy that only allows requests coming from a specified IP address in the specified VPC to succeed. Requests from other IP addresses will fail.

```
{
    "Statement": [
        {
            "Action": "kafkaconnect:*",
            "Effect": "Allow",
            "Principal": "*",
            "Resource": "*",
            "Condition": {
                "IpAddress": {
                    "aws:VpcSourceIp": "192.0.2.123"
                },
        "StringEquals": {
                    "aws:SourceVpc": "vpc-555555555555"
                }
            }
        }
    ]
}
```

------

# Authentication and authorization for Amazon MSK APIs
<a name="security-iam"></a>

AWS Identity and Access Management (IAM) is an AWS service that helps an administrator securely control access to AWS resources. IAM administrators control who can be *authenticated* (signed in) and *authorized* (have permissions) to use Amazon MSK resources. IAM is an AWS service that you can use with no additional charge.

**Topics**
+ [How Amazon MSK works with IAM](security_iam_service-with-iam.md)
+ [Amazon MSK identity-based policy examples](security_iam_id-based-policy-examples.md)
+ [Service-linked roles for Amazon MSK](using-service-linked-roles.md)
+ [AWS managed policies for Amazon MSK](security-iam-awsmanpol.md)
+ [Troubleshoot Amazon MSK identity and access](security_iam_troubleshoot.md)

# How Amazon MSK works with IAM
<a name="security_iam_service-with-iam"></a>

Before you use IAM to manage access to Amazon MSK, you should understand what IAM features are available to use with Amazon MSK. To get a high-level view of how Amazon MSK and other AWS services work with IAM, see [AWS Services That Work with IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_aws-services-that-work-with-iam.html) in the *IAM User Guide*.

**Topics**
+ [Amazon MSK identity-based policies](security_iam_service-with-iam-id-based-policies.md)
+ [Amazon MSK resource-based policies](security_iam_service-with-iam-resource-based-policies.md)
+ [Authorization based on Amazon MSK tags](security_iam_service-with-iam-tags.md)
+ [Amazon MSK IAM roles](security_iam_service-with-iam-roles.md)

# Amazon MSK identity-based policies
<a name="security_iam_service-with-iam-id-based-policies"></a>

With IAM identity-based policies, you can specify allowed or denied actions and resources as well as the conditions under which actions are allowed or denied. Amazon MSK supports specific actions, resources, and condition keys. To learn about all of the elements that you use in a JSON policy, see [IAM JSON Policy Elements Reference](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html) in the *IAM User Guide*.

## Actions for Amazon MSK identity-based policies
<a name="security_iam_service-with-iam-id-based-policies-actions"></a>

Administrators can use AWS JSON policies to specify who has access to what. That is, which **principal** can perform **actions** on what **resources**, and under what **conditions**.

The `Action` element of a JSON policy describes the actions that you can use to allow or deny access in a policy. Include actions in a policy to grant permissions to perform the associated operation.

Policy actions in Amazon MSK use the following prefix before the action: `kafka:`. For example, to grant someone permission to describe an MSK cluster with the Amazon MSK `DescribeCluster` API operation, you include the `kafka:DescribeCluster` action in their policy. Policy statements must include either an `Action` or `NotAction` element. Amazon MSK defines its own set of actions that describe tasks that you can perform with this service.

Please note, policy actions for MSK topic APIs use the `kafka-cluster` prefix before the action, refer to the [Semantics of IAM authorization policy actions and resources](kafka-actions.md).

To specify multiple actions in a single statement, separate them with commas as follows:

```
"Action": ["kafka:action1", "kafka:action2"]
```

You can specify multiple actions using wildcards (\$1). For example, to specify all actions that begin with the word `Describe`, include the following action:

```
"Action": "kafka:Describe*"
```



To see a list of Amazon MSK actions, see [Actions, resources, and condition keys for Amazon Managed Streaming for Apache Kafka](https://docs.aws.amazon.com/service-authorization/latest/reference/list_amazonmanagedstreamingforapachekafka.html) in the *IAM User Guide*.

## Resources for Amazon MSK identity-based policies
<a name="security_iam_service-with-iam-id-based-policies-resources"></a>

Administrators can use AWS JSON policies to specify who has access to what. That is, which **principal** can perform **actions** on what **resources**, and under what **conditions**.

The `Resource` JSON policy element specifies the object or objects to which the action applies. As a best practice, specify a resource using its [Amazon Resource Name (ARN)](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html). For actions that don't support resource-level permissions, use a wildcard (\$1) to indicate that the statement applies to all resources.

```
"Resource": "*"
```



The Amazon MSK instance resource has the following ARN:

```
arn:${Partition}:kafka:${Region}:${Account}:cluster/${ClusterName}/${UUID}
```

For more information about the format of ARNs, see [Amazon Resource Names (ARNs) and AWS Service Namespaces](https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html).

For example, to specify the `CustomerMessages` instance in your statement, use the following ARN:

```
"Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/CustomerMessages/abcd1234-abcd-dcba-4321-a1b2abcd9f9f-2"
```

To specify all instances that belong to a specific account, use the wildcard (\$1):

```
"Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/*"
```

Some Amazon MSK actions, such as those for creating resources, cannot be performed on a specific resource. In those cases, you must use the wildcard (\$1).

```
"Resource": "*"
```

To specify multiple resources in a single statement, separate the ARNs with commas. 

```
"Resource": ["resource1", "resource2"]
```

To see a list of Amazon MSK resource types and their ARNs, see [Resources Defined by Amazon Managed Streaming for Apache Kafka](https://docs.aws.amazon.com/IAM/latest/UserGuide/list_amazonmanagedstreamingforkafka.html#amazonmanagedstreamingforkafka-resources-for-iam-policies) in the *IAM User Guide*. To learn with which actions you can specify the ARN of each resource, see [Actions Defined by Amazon Managed Streaming for Apache Kafka](https://docs.aws.amazon.com/IAM/latest/UserGuide/list_amazonmanagedstreamingforkafka.html#amazonmanagedstreamingforkafka-actions-as-permissions).

## Condition keys for Amazon MSK identity-based policies
<a name="security_iam_service-with-iam-id-based-policies-conditionkeys"></a>

Administrators can use AWS JSON policies to specify who has access to what. That is, which **principal** can perform **actions** on what **resources**, and under what **conditions**.

The `Condition` element specifies when statements execute based on defined criteria. You can create conditional expressions that use [condition operators](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_condition_operators.html), such as equals or less than, to match the condition in the policy with values in the request. To see all AWS global condition keys, see [AWS global condition context keys](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html) in the *IAM User Guide*.

Amazon MSK defines its own set of condition keys and also supports using some global condition keys. To see all AWS global condition keys, see [AWS Global Condition Context Keys](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html) in the *IAM User Guide*.



To see a list of Amazon MSK condition keys, see [Condition Keys for Amazon Managed Streaming for Apache Kafka](https://docs.aws.amazon.com/IAM/latest/UserGuide/list_amazonmanagedstreamingforkafka.html#amazonmanagedstreamingforkafka-policy-keys) in the *IAM User Guide*. To learn with which actions and resources you can use a condition key, see [Actions Defined by Amazon Managed Streaming for Apache Kafka](https://docs.aws.amazon.com/IAM/latest/UserGuide/list_amazonmanagedstreamingforkafka.html#amazonmanagedstreamingforkafka-actions-as-permissions).

## Examples for Amazon MSK identity-based policies
<a name="security_iam_service-with-iam-id-based-policies-examples"></a>



To view examples of Amazon MSK identity-based policies, see [Amazon MSK identity-based policy examples](security_iam_id-based-policy-examples.md).

# Amazon MSK resource-based policies
<a name="security_iam_service-with-iam-resource-based-policies"></a>

Amazon MSK supports a cluster policy (also known as a resource-based policy) for use with Amazon MSK clusters. You can use a cluster policy to define which IAM principals have cross-account permissions to set up private connectivity to your Amazon MSK cluster. When used with IAM client authentication, you can also use the cluster policy to granularly define Kafka data plane permissions for the connecting clients.

The maximum size supported for a cluster policy is 20 KB.

To view an example of how to configure a cluster policy, refer to [Step 2: Attach a cluster policy to the MSK cluster](mvpc-cluster-owner-action-policy.md). 

# Authorization based on Amazon MSK tags
<a name="security_iam_service-with-iam-tags"></a>

You can attach tags to Amazon MSK clusters. To control access based on tags, you provide tag information in the [condition element](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_condition.html) of a policy using the `kafka:ResourceTag/key-name`, `aws:RequestTag/key-name`, or `aws:TagKeys` condition keys. For information about tagging Amazon MSK resources, see [Tag an Amazon MSK cluster](msk-tagging.md).

You can only control cluster access with the help of tags. To tag topics and consumer groups, you need to add a separate statement in your policies without tags.

To view example of an identity-based policy for limiting access to a cluster based on the tags on that cluster, see [Accessing Amazon MSK clusters based on tags](security_iam_id-based-policy-examples-view-widget-tags.md).

You can use conditions in your identity-based policy to control access to Amazon MSK resources based on tags. The following example shows a policy that allows a user to describe the cluster, get its bootstrap brokers, list its broker nodes, update it, and delete it. However, this policy grants permission only if the cluster tag `Owner` has the value of that user's `username`. The second statement in the following policy allows access to the topics on the cluster. The first statement in this policy doesn't authorize any topic access.

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Sid": "AccessClusterIfOwner",
      "Effect": "Allow",
      "Action": [
        "kafka:Describe*",
        "kafka:Get*",
        "kafka:List*",
        "kafka:Update*",
        "kafka:Delete*"
      ],
      "Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/*",
      "Condition": {
        "StringEquals": {
          "aws:ResourceTag/Owner": "${aws:username}"
        }
      }
    },
    {
      "Effect": "Allow",
      "Action": [
        "kafka-cluster:*Topic*",
        "kafka-cluster:WriteData",
        "kafka-cluster:ReadData"
      ],
      "Resource": [
        "arn:aws:kafka:us-east-1:123456789012:topic/*"
      ]
    }
  ]
}
```

------

# Amazon MSK IAM roles
<a name="security_iam_service-with-iam-roles"></a>

An [IAM role](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html) is an entity within your Amazon Web Services account that has specific permissions.

## Using temporary credentials with Amazon MSK
<a name="security_iam_service-with-iam-roles-tempcreds"></a>

You can use temporary credentials to sign in with federation, assume an IAM role, or to assume a cross-account role. You obtain temporary security credentials by calling AWS STS API operations such as [AssumeRole](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html) or [GetFederationToken](https://docs.aws.amazon.com/STS/latest/APIReference/API_GetFederationToken.html). 

Amazon MSK supports using temporary credentials. 

## Service-linked roles
<a name="security_iam_service-with-iam-roles-service-linked"></a>

[Service-linked roles](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_terms-and-concepts.html#iam-term-service-linked-role) allow Amazon Web Services to access resources in other services to complete an action on your behalf. Service-linked roles appear in your IAM account and are owned by the service. An administrator can view but not edit the permissions for service-linked roles.

Amazon MSK supports service-linked roles. For details about creating or managing Amazon MSK service-linked roles, [Service-linked roles for Amazon MSK](using-service-linked-roles.md).

# Amazon MSK identity-based policy examples
<a name="security_iam_id-based-policy-examples"></a>

By default, IAM users and roles don't have permission to execute Amazon MSK API actions. An administrator must create IAM policies that grant users and roles permission to perform specific API operations on the specified resources they need. The administrator must then attach those policies to the IAM users or groups that require those permissions.

To learn how to create an IAM identity-based policy using these example JSON policy documents, see [Creating Policies on the JSON Tab](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_create.html#access_policies_create-json-editor) in the *IAM User Guide*.

**Topics**
+ [Policy best practices](security_iam_service-with-iam-policy-best-practices.md)
+ [Allow users to view their own permissions](security_iam_id-based-policy-examples-view-own-permissions.md)
+ [Accessing one Amazon MSK cluster](security_iam_id-based-policy-examples-access-one-cluster.md)
+ [Accessing Amazon MSK clusters based on tags](security_iam_id-based-policy-examples-view-widget-tags.md)

# Policy best practices
<a name="security_iam_service-with-iam-policy-best-practices"></a>

Identity-based policies determine whether someone can create, access, or delete Amazon MSK resources in your account. These actions can incur costs for your AWS account. When you create or edit identity-based policies, follow these guidelines and recommendations:
+ **Get started with AWS managed policies and move toward least-privilege permissions** – To get started granting permissions to your users and workloads, use the *AWS managed policies* that grant permissions for many common use cases. They are available in your AWS account. We recommend that you reduce permissions further by defining AWS customer managed policies that are specific to your use cases. For more information, see [AWS managed policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html#aws-managed-policies) or [AWS managed policies for job functions](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_job-functions.html) in the *IAM User Guide*.
+ **Apply least-privilege permissions** – When you set permissions with IAM policies, grant only the permissions required to perform a task. You do this by defining the actions that can be taken on specific resources under specific conditions, also known as *least-privilege permissions*. For more information about using IAM to apply permissions, see [ Policies and permissions in IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies.html) in the *IAM User Guide*.
+ **Use conditions in IAM policies to further restrict access** – You can add a condition to your policies to limit access to actions and resources. For example, you can write a policy condition to specify that all requests must be sent using SSL. You can also use conditions to grant access to service actions if they are used through a specific AWS service, such as CloudFormation. For more information, see [ IAM JSON policy elements: Condition](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_condition.html) in the *IAM User Guide*.
+ **Use IAM Access Analyzer to validate your IAM policies to ensure secure and functional permissions** – IAM Access Analyzer validates new and existing policies so that the policies adhere to the IAM policy language (JSON) and IAM best practices. IAM Access Analyzer provides more than 100 policy checks and actionable recommendations to help you author secure and functional policies. For more information, see [Validate policies with IAM Access Analyzer](https://docs.aws.amazon.com/IAM/latest/UserGuide/access-analyzer-policy-validation.html) in the *IAM User Guide*.
+ **Require multi-factor authentication (MFA)** – If you have a scenario that requires IAM users or a root user in your AWS account, turn on MFA for additional security. To require MFA when API operations are called, add MFA conditions to your policies. For more information, see [ Secure API access with MFA](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_mfa_configure-api-require.html) in the *IAM User Guide*.

For more information about best practices in IAM, see [Security best practices in IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html) in the *IAM User Guide*.

# Allow users to view their own permissions
<a name="security_iam_id-based-policy-examples-view-own-permissions"></a>

This example shows how you might create a policy that allows IAM users to view the inline and managed policies that are attached to their user identity. This policy includes permissions to complete this action on the console or programmatically using the AWS CLI or AWS API.

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "ViewOwnUserInfo",
            "Effect": "Allow",
            "Action": [
                "iam:GetUserPolicy",
                "iam:ListGroupsForUser",
                "iam:ListAttachedUserPolicies",
                "iam:ListUserPolicies",
                "iam:GetUser"
            ],
            "Resource": ["arn:aws:iam::*:user/${aws:username}"]
        },
        {
            "Sid": "NavigateInConsole",
            "Effect": "Allow",
            "Action": [
                "iam:GetGroupPolicy",
                "iam:GetPolicyVersion",
                "iam:GetPolicy",
                "iam:ListAttachedGroupPolicies",
                "iam:ListGroupPolicies",
                "iam:ListPolicyVersions",
                "iam:ListPolicies",
                "iam:ListUsers"
            ],
            "Resource": "*"
        }
    ]
}
```

# Accessing one Amazon MSK cluster
<a name="security_iam_id-based-policy-examples-access-one-cluster"></a>

In this example, you want to grant an IAM user in your Amazon Web Services account access to one of your clusters, `purchaseQueriesCluster`. This policy allows the user to describe the cluster, get its bootstrap brokers, list its broker nodes, and update it.

------
#### [ JSON ]

****  

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Sid":"UpdateCluster",
         "Effect":"Allow",
         "Action":[
            "kafka:Describe*",
            "kafka:Get*",
            "kafka:List*",
            "kafka:Update*"
         ],
         "Resource":"arn:aws:kafka:us-east-1:012345678012:cluster/purchaseQueriesCluster/abcdefab-1234-abcd-5678-cdef0123ab01-2"
      }
   ]
}
```

------

# Accessing Amazon MSK clusters based on tags
<a name="security_iam_id-based-policy-examples-view-widget-tags"></a>

You can use conditions in your identity-based policy to control access to Amazon MSK resources based on tags. This example shows how you might create a policy that allows the user to describe the cluster, get its bootstrap brokers, list its broker nodes, update it, and delete it. However, permission is granted only if the cluster tag `Owner` has the value of that user's user name.

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Sid": "AccessClusterIfOwner",
      "Effect": "Allow",
      "Action": [
        "kafka:Describe*",
        "kafka:Get*",
        "kafka:List*",
        "kafka:Update*",
        "kafka:Delete*"
      ],
      "Resource": "arn:aws:kafka:us-east-1:012345678012:cluster/*",
      "Condition": {
        "StringEquals": {
          "aws:ResourceTag/Owner": "${aws:username}"
        }
      }
    }
  ]
}
```

------

You can attach this policy to the IAM users in your account. If a user named `richard-roe` attempts to update an MSK cluster, the cluster must be tagged `Owner=richard-roe` or `owner=richard-roe`. Otherwise, he is denied access. The condition tag key `Owner` matches both `Owner` and `owner` because condition key names are not case-sensitive. For more information, see [IAM JSON Policy Elements: Condition](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_condition.html) in the *IAM User Guide*.

# Service-linked roles for Amazon MSK
<a name="using-service-linked-roles"></a>

Amazon MSK uses AWS Identity and Access Management (IAM) [ service-linked roles](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_terms-and-concepts.html#iam-term-service-linked-role). A service-linked role is a unique type of IAM role that is linked directly to Amazon MSK. Service-linked roles are predefined by Amazon MSK and include all the permissions that the service requires to call other AWS services on your behalf. 

A service-linked role makes setting up Amazon MSK easier because you do not have to manually add the necessary permissions. Amazon MSK defines the permissions of its service-linked roles. Unless defined otherwise, only Amazon MSK can assume its roles. The defined permissions include the trust policy and the permissions policy, and that permissions policy cannot be attached to any other IAM entity.

For information about other services that support service-linked roles, see [Amazon Web Services That Work with IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_aws-services-that-work-with-iam.html), and look for the services that have **Yes **in the **Service-Linked Role** column. Choose a **Yes** with a link to view the service-linked role documentation for that service.

**Topics**
+ [Service-linked role permissions](slr-permissions.md)
+ [Create a service-linked role](create-slr.md)
+ [Edit a service-linked role](edit-slr.md)
+ [Supported Regions for service-linked roles](slr-regions.md)

# Service-linked role permissions for Amazon MSK
<a name="slr-permissions"></a>

Amazon MSK uses the service-linked role named **AWSServiceRoleForKafka**. Amazon MSK uses this role to access your resources and perform operations such as:
+ `*NetworkInterface` – create and manage network interfaces in the customer account that make cluster brokers accessible to clients in the customer VPC.
+ `*VpcEndpoints` – manage VPC endpoints in the customer account that make cluster brokers accessible to clients in the customer VPC using AWS PrivateLink. Amazon MSK uses permissions to `DescribeVpcEndpoints`, `ModifyVpcEndpoint` and `DeleteVpcEndpoints`.
+ `secretsmanager` – manage client credentials with AWS Secrets Manager.
+ `GetCertificateAuthorityCertificate` – retrieve the certificate for your private certificate authority.
+ `*Ipv6Addresses` – assign and unassign IPv6 addresses to network interfaces in customer account to enable IPv6 connectivity for MSK clusters.
+ `ModifyNetworkInterfaceAttribute` – modify network interface attributes in customer account to configure IPv6 settings for MSK cluster connectivity.

This service-linked role is attached to the following managed policy: `KafkaServiceRolePolicy`. For updates to this policy, see [KafkaServiceRolePolicy](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/KafkaServiceRolePolicy.html).

The AWSServiceRoleForKafka service-linked role trusts the following services to assume the role:
+ `kafka.amazonaws.com`

The role permissions policy allows Amazon MSK to complete the following actions on resources.

You must configure permissions to allow an IAM entity (such as a user, group, or role) to create, edit, or delete a service-linked role. For more information, see [Service-Linked Role Permissions](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#service-linked-role-permissions) in the *IAM User Guide*.

# Create a service-linked role for Amazon MSK
<a name="create-slr"></a>

You don't need to create a service-linked role manually. When you create an Amazon MSK cluster in the AWS Management Console, the AWS CLI, or the AWS API, Amazon MSK creates the service-linked role for you. 

If you delete this service-linked role, and then need to create it again, you can use the same process to recreate the role in your account. When you create an Amazon MSK cluster, Amazon MSK creates the service-linked role for you again. 

# Edit a service-linked role for Amazon MSK
<a name="edit-slr"></a>

Amazon MSK does not allow you to edit the AWSServiceRoleForKafka service-linked role. After you create a service-linked role, you cannot change the name of the role because various entities might reference the role. However, you can edit the description of the role using IAM. For more information, see [Editing a Service-Linked Role](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#edit-service-linked-role) in the *IAM User Guide*.

# Supported Regions for Amazon MSK service-linked roles
<a name="slr-regions"></a>

Amazon MSK supports using service-linked roles in all of the Regions where the service is available. For more information, see [AWS Regions and Endpoints](https://docs.aws.amazon.com/general/latest/gr/rande.html).

# AWS managed policies for Amazon MSK
<a name="security-iam-awsmanpol"></a>

An AWS managed policy is a standalone policy that is created and administered by AWS. AWS managed policies are designed to provide permissions for many common use cases so that you can start assigning permissions to users, groups, and roles.

Keep in mind that AWS managed policies might not grant least-privilege permissions for your specific use cases because they're available for all AWS customers to use. We recommend that you reduce permissions further by defining [ customer managed policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html#customer-managed-policies) that are specific to your use cases.

You cannot change the permissions defined in AWS managed policies. If AWS updates the permissions defined in an AWS managed policy, the update affects all principal identities (users, groups, and roles) that the policy is attached to. AWS is most likely to update an AWS managed policy when a new AWS service is launched or new API operations become available for existing services.

For more information, see [AWS managed policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html#aws-managed-policies) in the *IAM User Guide*.

# AWS managed policy: AmazonMSKFullAccess
<a name="security-iam-awsmanpol-AmazonMSKFullAccess"></a>

This policy grants administrative permissions that allow a principal full access to all Amazon MSK actions. The permissions in this policy are grouped as follows:
+ The Amazon MSK permissions allow all Amazon MSK actions.
+ **`Amazon EC2` permissions** – in this policy are required to validate the passed resources in an API request. This is to make sure Amazon MSK is able to successfully use the resources with a cluster. The rest of the Amazon EC2 permissions in this policy allow Amazon MSK to create AWS resources that are needed to make it possible for you to connect to your clusters.
+ **`AWS KMS` permissions** – are used during API calls to validate the passed resources in a request. They are required for Amazon MSK to be able to use the passed key with the Amazon MSK cluster.
+ **`CloudWatch Logs, Amazon S3, and Amazon Data Firehose` permissions** – are required for Amazon MSK to be able to ensure that the log delivery destinations are reachable, and that they are valid for broker log use.
+ **`IAM` permissions** – are required for Amazon MSK to be able to a create service-linked role in your account and to allow you to pass a service execution role to Amazon MSK.

------
#### [ JSON ]

****  

```
    {
    	"Version":"2012-10-17",		 	 	 
    	"Statement": [{
    			"Effect": "Allow",
    			"Action": [
    				"kafka:*",
    				"ec2:DescribeSubnets",
    				"ec2:DescribeVpcs",
    				"ec2:DescribeSecurityGroups",
    				"ec2:DescribeRouteTables",
    				"ec2:DescribeVpcEndpoints",
    				"ec2:DescribeVpcAttribute",
    				"kms:DescribeKey",
    				"kms:CreateGrant",
    				"logs:CreateLogDelivery",
    				"logs:GetLogDelivery",
    				"logs:UpdateLogDelivery",
    				"logs:DeleteLogDelivery",
    				"logs:ListLogDeliveries",
    				"logs:PutResourcePolicy",
    				"logs:DescribeResourcePolicies",
    				"logs:DescribeLogGroups",
    				"S3:GetBucketPolicy",
    				"firehose:TagDeliveryStream"
    			],
    			"Resource": "*"
    		},
    		{
    			"Effect": "Allow",
    			"Action": [
    				"ec2:CreateVpcEndpoint"
    			],
    			"Resource": [
    				"arn:*:ec2:*:*:vpc/*",
    				"arn:*:ec2:*:*:subnet/*",
    				"arn:*:ec2:*:*:security-group/*"
    			]
    		},
    		{
    			"Effect": "Allow",
    			"Action": [
    				"ec2:CreateVpcEndpoint"
    			],
    			"Resource": [
    				"arn:*:ec2:*:*:vpc-endpoint/*"
    			],
    			"Condition": {
    				"StringEquals": {
    					"aws:RequestTag/AWSMSKManaged": "true"
    				},
    				"StringLike": {
    					"aws:RequestTag/ClusterArn": "*"
    				}
    			}
    		},
    		{
    			"Effect": "Allow",
    			"Action": [
    				"ec2:CreateTags"
    			],
    			"Resource": "arn:*:ec2:*:*:vpc-endpoint/*",
    			"Condition": {
    				"StringEquals": {
    					"ec2:CreateAction": "CreateVpcEndpoint"
    				}
    			}
    		},
    		{
    			"Effect": "Allow",
    			"Action": [
    				"ec2:DeleteVpcEndpoints"
    			],
    			"Resource": "arn:*:ec2:*:*:vpc-endpoint/*",
    			"Condition": {
    				"StringEquals": {
    					"ec2:ResourceTag/AWSMSKManaged": "true"
    				},
    				"StringLike": {
    					"ec2:ResourceTag/ClusterArn": "*"
    				}
    			}
    		},
    		{
    			"Effect": "Allow",
    			"Action": "iam:PassRole",
    			"Resource": "*",
    			"Condition": {
    				"StringEquals": {
    					"iam:PassedToService": "kafka.amazonaws.com"
    				}
    			}
    		},
    		{
    			"Effect": "Allow",
    			"Action": "iam:CreateServiceLinkedRole",
    			"Resource": "arn:aws:iam::*:role/aws-service-role/kafka.amazonaws.com/AWSServiceRoleForKafka*",
    			"Condition": {
    				"StringLike": {
    					"iam:AWSServiceName": "kafka.amazonaws.com"
    				}
    			}
    		},
    		{
    			"Effect": "Allow",
    			"Action": [
    				"iam:AttachRolePolicy",
    				"iam:PutRolePolicy"
    			],
    			"Resource": "arn:aws:iam::*:role/aws-service-role/kafka.amazonaws.com/AWSServiceRoleForKafka*"
    		},
    		{
    			"Effect": "Allow",
    			"Action": "iam:CreateServiceLinkedRole",
    			"Resource": "arn:aws:iam::*:role/aws-service-role/delivery.logs.amazonaws.com/AWSServiceRoleForLogDelivery*",
    			"Condition": {
    				"StringLike": {
    					"iam:AWSServiceName": "delivery.logs.amazonaws.com"
    				}
    			}
    		}

    	]
    }
```

------

# AWS managed policy: AmazonMSKReadOnlyAccess
<a name="security-iam-awsmanpol-AmazonMSKReadOnlyAccess"></a>

This policy grants read-only permissions that allow users to view information in Amazon MSK. Principals with this policy attached can't make any updates or delete exiting resources, nor can they create new Amazon MSK resources. For example, principals with these permissions can view the list of clusters and configurations associated with their account, but cannot change the configuration or settings of any clusters. The permissions in this policy are grouped as follows:
+ **`Amazon MSK` permissions** – allow you to list Amazon MSK resources, describe them, and get information about them.
+ **`Amazon EC2` permissions** – are used to describe the Amazon VPC, subnets, security groups, and ENIs that are associated with a cluster.
+ **`AWS KMS` permission** – is used to describe the key that is associated with the cluster.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Action": [
                "kafka:Describe*",
                "kafka:List*",
                "kafka:Get*",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "kms:DescribeKey"
            ],
            "Effect": "Allow",
            "Resource": "*"
        }
    ]
}
```

------

# AWS managed policy: KafkaServiceRolePolicy
<a name="security-iam-awsmanpol-KafkaServiceRolePolicy"></a>

You can't attach KafkaServiceRolePolicy to your IAM entities. This policy is attached to a service-linked role that allows Amazon MSK to perform actions such as managing VPC endpoints (connectors) on MSK clusters, managing network interfaces, and managing cluster credentials with AWS Secrets Manager. For more information, see [Service-linked roles for Amazon MSK](using-service-linked-roles.md).

The following table describes updates to the KafkaServiceRolePolicy managed policy since Amazon MSK started tracking changes.


| Change | Description | Date | 
| --- | --- | --- | 
|  [IPv6 connectivity support added to KafkaServiceRolePolicy](#security-iam-awsmanpol-KafkaServiceRolePolicy) – Update to an existing policy  |  Amazon MSK added permissions to KafkaServiceRolePolicy to enable IPv6 connectivity for MSK clusters. These permissions allow Amazon MSK to assign and unassign IPv6 addresses to network interfaces and modify network interface attributes in customer account.  | November 17, 2025 | 
|  [KafkaServiceRolePolicy](#security-iam-awsmanpol-KafkaServiceRolePolicy) – Update to an existing policy  |  Amazon MSK added permissions to support multi-VPC private connectivity.  | March 8, 2023 | 
|  Amazon MSK started tracking changes  |  Amazon MSK started tracking changes for KafkaServiceRolePolicy managed policy.  | March 8, 2023 | 

# AWS managed policy: AWSMSKReplicatorExecutionRole
<a name="security-iam-awsmanpol-AWSMSKReplicatorExecutionRole"></a>

The `AWSMSKReplicatorExecutionRole` policy grants permissions to the Amazon MSK replicator to replicate data between MSK clusters. The permissions in this policy are grouped as follows:
+ **`cluster`** – Grants the Amazon MSK Replicator permissions to connect to the cluster using IAM authentication. Also grants permissions to describe and alter the cluster.
+ **`topic`** – Grants the Amazon MSK Replicator permissions to describe, create, and alter a topic, and to alter the topic's dynamic configuration.
+ **`consumer group`** – Grants the Amazon MSK Replicator permissions to describe and alter consumer groups, to read and write date from an MSK cluster, and to delete internal topics created by the replicator.

------
#### [ JSON ]

****  

```
{
	"Version":"2012-10-17",		 	 	 
	"Statement": [
		{
			"Sid": "ClusterPermissions",
			"Effect": "Allow",
			"Action": [
				"kafka-cluster:Connect",
				"kafka-cluster:DescribeCluster",
				"kafka-cluster:AlterCluster",
				"kafka-cluster:DescribeTopic",
				"kafka-cluster:CreateTopic",
				"kafka-cluster:AlterTopic",
				"kafka-cluster:WriteData",
				"kafka-cluster:ReadData",
				"kafka-cluster:AlterGroup",
				"kafka-cluster:DescribeGroup",
				"kafka-cluster:DescribeTopicDynamicConfiguration",
				"kafka-cluster:AlterTopicDynamicConfiguration",
				"kafka-cluster:WriteDataIdempotently"
			],
			"Resource": [
				"arn:aws:kafka:*:*:cluster/*"
			]
		},
		{
			"Sid": "TopicPermissions",
			"Effect": "Allow",
			"Action": [
				"kafka-cluster:DescribeTopic",
				"kafka-cluster:CreateTopic",
				"kafka-cluster:AlterTopic",
				"kafka-cluster:WriteData",
				"kafka-cluster:ReadData",
				"kafka-cluster:DescribeTopicDynamicConfiguration",
				"kafka-cluster:AlterTopicDynamicConfiguration",
				"kafka-cluster:AlterCluster"
			],
			"Resource": [
				"arn:aws:kafka:*:*:topic/*/*"
			]
		},
		{
			"Sid": "GroupPermissions",
			"Effect": "Allow",
			"Action": [
				"kafka-cluster:AlterGroup",
				"kafka-cluster:DescribeGroup"
			],
			"Resource": [
				"arn:aws:kafka:*:*:group/*/*"
			]
		}
	]
}
```

------

# Amazon MSK updates to AWS managed policies
<a name="security-iam-awsmanpol-updates"></a>

View details about updates to AWS managed policies for Amazon MSK since this service began tracking these changes.


| Change | Description | Date | 
| --- | --- | --- | 
|  [WriteDataIdempotently permission added to AWSMSKReplicatorExecutionRole](security-iam-awsmanpol-AWSMSKReplicatorExecutionRole.md) – Update to an existing policy  |  Amazon MSK added WriteDataIdempotently permission to AWSMSKReplicatorExecutionRole policy to support data replication between MSK clusters.  | March 12, 2024 | 
|  [AWSMSKReplicatorExecutionRole](security-iam-awsmanpol-AWSMSKReplicatorExecutionRole.md) – New policy  |  Amazon MSK added AWSMSKReplicatorExecutionRole policy to support Amazon MSK Replicator.  | December 4, 2023 | 
|  [AmazonMSKFullAccess](security-iam-awsmanpol-AmazonMSKFullAccess.md) – Update to an existing policy  |  Amazon MSK added permissions to support Amazon MSK Replicator.  | September 28, 2023 | 
|  [KafkaServiceRolePolicy](security-iam-awsmanpol-KafkaServiceRolePolicy.md) – Update to an existing policy  |  Amazon MSK added permissions to support multi-VPC private connectivity.  | March 8, 2023 | 
| [AmazonMSKFullAccess](security-iam-awsmanpol-AmazonMSKFullAccess.md) – Update to an existing policy |  Amazon MSK added new Amazon EC2 permissions to make it possible to connect to a cluster.  | November 30, 2021 | 
|  [AmazonMSKFullAccess](security-iam-awsmanpol-AmazonMSKFullAccess.md) – Update to an existing policy  |  Amazon MSK added a new permission to allow it to describe Amazon EC2 route tables.  | November 19, 2021 | 
|  Amazon MSK started tracking changes  |  Amazon MSK started tracking changes for its AWS managed policies.  | November 19, 2021 | 

# Troubleshoot Amazon MSK identity and access
<a name="security_iam_troubleshoot"></a>

Use the following information to help you diagnose and fix common issues that you might encounter when working with Amazon MSK and IAM.

**Topics**
+ [I Am not authorized to perform an action in Amazon MSK](#security_iam_troubleshoot-no-permissions)

## I Am not authorized to perform an action in Amazon MSK
<a name="security_iam_troubleshoot-no-permissions"></a>

If the AWS Management Console tells you that you're not authorized to perform an action, then you must contact your administrator for assistance. Your administrator is the person that provided you with your sign-in credentials.

The following example error occurs when the `mateojackson` IAM user tries to use the console to delete a cluster but does not have `kafka:DeleteCluster` permissions.

```
User: arn:aws:iam::123456789012:user/mateojackson is not authorized to perform: kafka:DeleteCluster on resource: purchaseQueriesCluster
```

In this case, Mateo asks his administrator to update his policies to allow him to access the `purchaseQueriesCluster` resource using the `kafka:DeleteCluster` action.

# Authentication and authorization for Apache Kafka APIs
<a name="kafka_apis_iam"></a>

You can use IAM to authenticate clients and to allow or deny Apache Kafka actions. Alternatively, you can use TLS or SASL/SCRAM to authenticate clients, and Apache Kafka ACLs to allow or deny actions.

For information on how to control who can perform [Amazon MSK operations](https://docs.aws.amazon.com/msk/1.0/apireference/operations.html) on your cluster, see [Authentication and authorization for Amazon MSK APIs](security-iam.md).

**Topics**
+ [IAM access control](iam-access-control.md)
+ [Mutual TLS client authentication for Amazon MSK](msk-authentication.md)
+ [Sign-in credentials authentication with AWS Secrets Manager](msk-password.md)
+ [Apache Kafka ACLs](msk-acls.md)

# IAM access control
<a name="iam-access-control"></a>

IAM access control for Amazon MSK enables you to handle both authentication and authorization for your MSK cluster. This eliminates the need to use one mechanism for authentication and another for authorization. For example, when a client tries to write to your cluster, Amazon MSK uses IAM to check whether that client is an authenticated identity and also whether it is authorized to produce to your cluster.

IAM access control works for Java and non-Java clients, including Kafka clients written in Python, Go, JavaScript, and .NET. IAM access control for non-Java clients is available for MSK clusters with Kafka version 2.7.1 or above.

To make IAM access control possible, Amazon MSK makes minor modifications to Apache Kafka source code. These modifications won't cause a noticeable difference in your Apache Kafka experience. Amazon MSK logs access events so you can audit them.

You can invoke Apache Kafka ACL APIs for an MSK cluster that uses IAM access control. However, Apache Kafka ACLs have no effect on authorization for IAM identities. You must use IAM policies to control access for IAM identities.

**Important considerations**  
When you use IAM access control with your MSK cluster, keep in mind the following important considerations:  
IAM access control doesn't apply to Apache ZooKeeper nodes. For information about how you can control access to those nodes, see [Control access to Apache ZooKeeper nodes in your Amazon MSK cluster](zookeeper-security.md).
The `allow.everyone.if.no.acl.found` Apache Kafka setting has no effect if your cluster uses IAM access control. 
You can invoke Apache Kafka ACL APIs for an MSK cluster that uses IAM access control. However, Apache Kafka ACLs have no effect on authorization for IAM identities. You must use IAM policies to control access for IAM identities.

# How IAM access control for Amazon MSK works
<a name="how-to-use-iam-access-control"></a>

To use IAM access control for Amazon MSK, perform the following steps, which are described in detail in these topics:
+ [Create a Amazon MSK cluster that uses IAM access control](create-iam-access-control-cluster-in-console.md) 
+ [Configure clients for IAM access control](configure-clients-for-iam-access-control.md)
+ [Create authorization policies for the IAM role](create-iam-access-control-policies.md)
+ [Get the bootstrap brokers for IAM access control](get-bootstrap-brokers-for-iam.md)

# Create a Amazon MSK cluster that uses IAM access control
<a name="create-iam-access-control-cluster-in-console"></a>

This section explains how you can use the AWS Management Console, the API, or the AWS CLI to create a Amazon MSK cluster that uses IAM access control. For information about how to turn on IAM access control for an existing cluster, see [Update security settings of a Amazon MSK cluster](msk-update-security.md).

**Use the AWS Management Console to create a cluster that uses IAM access control**

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. Choose **Create cluster**.

1. Choose **Create cluster with custom settings**.

1. In the **Authentication** section, choose **IAM access control**.

1. Complete the rest of the workflow for creating a cluster.

**Use the API or the AWS CLI to create a cluster that uses IAM access control**
+ To create a cluster with IAM access control enabled, use the [CreateCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters.html#CreateCluster) API or the [create-cluster](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kafka/create-cluster.html) CLI command, and pass the following JSON for the `ClientAuthentication` parameter: `"ClientAuthentication": { "Sasl": { "Iam": { "Enabled": true } }`. 

# Configure clients for IAM access control
<a name="configure-clients-for-iam-access-control"></a>

To enable clients to communicate with an MSK cluster that uses IAM access control, you can use either of these mechanisms:
+ Non-Java client configuration using SASL\$1OAUTHBEARER mechanism
+ Java client configuration using SASL\$1OAUTHBEARER mechanism or AWS\$1MSK\$1IAM mechanism

## Use the SASL\$1OAUTHBEARER mechanism to configure IAM
<a name="configure-clients-for-iam-access-control-sasl-oauthbearer"></a>

1. Edit your client.properties configuration file using the following Python Kafka client example. Configuration changes are similar in other languages.

   ```
   from kafka import KafkaProducer
   from kafka.errors import KafkaError
   from kafka.sasl.oauth import AbstractTokenProvider
   import socket
   import time
   from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
   
   class MSKTokenProvider():
       def token(self):
           token, _ = MSKAuthTokenProvider.generate_auth_token('<my AWS Region>')
           return token
   
   tp = MSKTokenProvider()
   
   producer = KafkaProducer(
       bootstrap_servers='<myBootstrapString>',
       security_protocol='SASL_SSL',
       sasl_mechanism='OAUTHBEARER',
       sasl_oauth_token_provider=tp,
       client_id=socket.gethostname(),
   )
   
   topic = "<my-topic>"
   while True:
       try:
           inp=input(">")
           producer.send(topic, inp.encode())
           producer.flush()
           print("Produced!")
       except Exception:
           print("Failed to send message:", e)
   
   producer.close()
   ```

1. Download the helper library for your chosen configuration language and follow the instructions in the *Getting started* section of that language library’s homepage.
   + JavaScript: [https://github.com/aws/aws-msk-iam-sasl-signer-js\$1getting-started](https://github.com/aws/aws-msk-iam-sasl-signer-js#getting-started)
   + Python: [https://github.com/aws/aws-msk-iam-sasl-signer-python\$1get-started](https://github.com/aws/aws-msk-iam-sasl-signer-python#get-started)
   + Go: [https://github.com/aws/aws-msk-iam-sasl-signer-go\$1getting-started](https://github.com/aws/aws-msk-iam-sasl-signer-go#getting-started)
   + .NET: [https://github.com/aws/aws-msk-iam-sasl-signer-net\$1getting-started](https://github.com/aws/aws-msk-iam-sasl-signer-net#getting-started)
   + JAVA: SASL\$1OAUTHBEARER support for Java is available through the [https://github.com/aws/aws-msk-iam-auth/releases](https://github.com/aws/aws-msk-iam-auth/releases) jar file

## Use the MSK custom AWS\$1MSK\$1IAM mechanism to configure IAM
<a name="configure-clients-for-iam-access-control-msk-iam"></a>

1. Add the following to the `client.properties` file. Replace *<PATH\$1TO\$1TRUST\$1STORE\$1FILE>* with the fully-qualified path to the trust store file on the client.
**Note**  
If you don't want to use a specific certificate, you can remove `ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE>` from your `client.properties` file. When you don't specify a value for `ssl.truststore.location`, the Java process uses the default certificate.

   ```
   ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE>
   security.protocol=SASL_SSL
   sasl.mechanism=AWS_MSK_IAM
   sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
   sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
   ```

   To use a named profile that you created for AWS credentials, include `awsProfileName="your profile name";` in your client configuration file. For information about named profiles, see [Named profiles](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html) in the AWS CLI documentation.

1. Download the latest stable [aws-msk-iam-auth](https://github.com/aws/aws-msk-iam-auth/releases) JAR file, and place it in the class path. If you use Maven, add the following dependency, adjusting the version number as needed:

   ```
   <dependency>
       <groupId>software.amazon.msk</groupId>
       <artifactId>aws-msk-iam-auth</artifactId>
       <version>1.0.0</version>
   </dependency>
   ```

The Amazon MSK client plugin is open-sourced under the Apache 2.0 license.

# Create authorization policies for the IAM role
<a name="create-iam-access-control-policies"></a>

Attach an authorization policy to the IAM role that corresponds to the client. In an authorization policy, you specify which actions to allow or deny for the role. If your client is on an Amazon EC2 instance, associate the authorization policy with the IAM role for that Amazon EC2 instance. Alternatively, you can configure your client to use a named profile, and then you associate the authorization policy with the role for that named profile. [Configure clients for IAM access control](configure-clients-for-iam-access-control.md) describes how to configure a client to use a named profile.

For information about how to create an IAM policy, see [Creating IAM policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_create.html). 

The following is an example authorization policy for a cluster named MyTestCluster. To understand the semantics of the `Action` and `Resource` elements, see [Semantics of IAM authorization policy actions and resources](kafka-actions.md).

**Important**  
Changes that you make to an IAM policy are reflected in the IAM APIs and the AWS CLI immediately. However, it can take noticeable time for the policy change to take effect. In most cases, policy changes take effect in less than a minute. Network conditions may sometimes increase the delay.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:111122223333:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/MyTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:group/MyTestCluster/*"
            ]
        }
    ]
}
```

------

To learn how to create a policy with action elements that correspond to common Apache Kafka use cases, like producing and consuming data, see [Common use cases for client authorization policy](iam-access-control-use-cases.md).

For Kafka versions 2.8.0 and above, the **WriteDataIdempotently** permission is deprecated ([KIP-679](https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default)). By default,`enable.idempotence = true` is set. Therefore, for Kafka versions 2.8.0 and above, IAM doesn't offer the same functionality as Kafka ACLs. It isn't possible to `WriteDataIdempotently` to a topic by only providing `WriteData` access to that topic. This doesn't affect the case when `WriteData` is provided to **ALL** topics. In that case, `WriteDataIdempotently` is allowed. This is due to differences in implementation of IAM logic and how the Kafka ACLs are implemented. Additonally, writing to a topic idempotently also requires access to `transactional-ids`.

To work around this, we recommend using a policy similar to the following policy.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster",
                "kafka-cluster:WriteDataIdempotently"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1/TestTopic",
                "arn:aws:kafka:us-east-1:123456789012:transactional-id/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1/*"
            ]
        }
    ]
}
```

------

In this case, `WriteData` allows writes to `TestTopic`, while `WriteDataIdempotently` allows idempotent writes to the cluster. This policy also adds access to the `transactional-id` resources that will be needed.

Because `WriteDataIdempotently` is a cluster level permission, you can't use it at the topic level. If `WriteDataIdempotently` is restricted to the topic level, this policy won't work.

# Get the bootstrap brokers for IAM access control
<a name="get-bootstrap-brokers-for-iam"></a>

See [Get the bootstrap brokers for an Amazon MSK cluster](msk-get-bootstrap-brokers.md).

# Semantics of IAM authorization policy actions and resources
<a name="kafka-actions"></a>

**Note**  
For clusters running Apache Kafka version 3.8 or later, IAM access control supports the WriteTxnMarkers API for terminating transactions. For clusters running Kafka versions earlier than 3.8, IAM access control doesn't support internal cluster actions including WriteTxnMarkers. For these earlier versions, to terminate transactions, use SCRAM or mTLS authentication with appropriate ACLs instead of IAM authentication.

This section explains the semantics of the action and resource elements that you can use in an IAM authorization policy. For an example policy, see [Create authorization policies for the IAM role](create-iam-access-control-policies.md).

## Authorization policy actions
<a name="actions"></a>

The following table lists the actions that you can include in an authorization policy when you use IAM access control for Amazon MSK. When you include in your authorization policy an action from the *Action* column of the table, you must also include the corresponding actions from the *Required actions* column. 


| Action | Description | Required actions | Required resources | Applicable to serverless clusters | 
| --- | --- | --- | --- | --- | 
| kafka-cluster:Connect | Grants permission to connect and authenticate to the cluster. | None | cluster | Yes | 
| kafka-cluster:DescribeCluster | Grants permission to describe various aspects of the cluster, equivalent to Apache Kafka's DESCRIBE CLUSTER ACL. |  `kafka-cluster:Connect`  | cluster | Yes | 
| kafka-cluster:AlterCluster | Grants permission to alter various aspects of the cluster, equivalent to Apache Kafka's ALTER CLUSTER ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeCluster`  | cluster | No | 
| kafka-cluster:DescribeClusterDynamicConfiguration | Grants permission to describe the dynamic configuration of a cluster, equivalent to Apache Kafka's DESCRIBE\$1CONFIGS CLUSTER ACL. |  `kafka-cluster:Connect`  | cluster | No | 
| kafka-cluster:AlterClusterDynamicConfiguration | Grants permission to alter the dynamic configuration of a cluster, equivalent to Apache Kafka's ALTER\$1CONFIGS CLUSTER ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeClusterDynamicConfiguration`  | cluster | No | 
| kafka-cluster:WriteDataIdempotently | Grants permission to write data idempotently on a cluster, equivalent to Apache Kafka's IDEMPOTENT\$1WRITE CLUSTER ACL. |  `kafka-cluster:Connect` `kafka-cluster:WriteData`  | cluster | Yes | 
| kafka-cluster:CreateTopic | Grants permission to create topics on a cluster, equivalent to Apache Kafka's CREATE CLUSTER/TOPIC ACL. |  `kafka-cluster:Connect`  | topic | Yes | 
| kafka-cluster:DescribeTopic | Grants permission to describe topics on a cluster, equivalent to Apache Kafka's DESCRIBE TOPIC ACL. |  `kafka-cluster:Connect`  | topic | Yes | 
| kafka-cluster:AlterTopic | Grants permission to alter topics on a cluster, equivalent to Apache Kafka's ALTER TOPIC ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic`  | topic | Yes | 
| kafka-cluster:DeleteTopic | Grants permission to delete topics on a cluster, equivalent to Apache Kafka's DELETE TOPIC ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic`  | topic | Yes | 
| kafka-cluster:DescribeTopicDynamicConfiguration | Grants permission to describe the dynamic configuration of topics on a cluster, equivalent to Apache Kafka's DESCRIBE\$1CONFIGS TOPIC ACL. |  `kafka-cluster:Connect`  | topic | Yes | 
| kafka-cluster:AlterTopicDynamicConfiguration | Grants permission to alter the dynamic configuration of topics on a cluster, equivalent to Apache Kafka's ALTER\$1CONFIGS TOPIC ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopicDynamicConfiguration`  | topic | Yes | 
| kafka-cluster:ReadData | Grants permission to read data from topics on a cluster, equivalent to Apache Kafka's READ TOPIC ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:AlterGroup`  | topic | Yes | 
| kafka-cluster:WriteData | Grants permission to write data to topics on a cluster, equivalent to Apache Kafka's WRITE TOPIC ACL |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic`  | topic | Yes | 
| kafka-cluster:DescribeGroup | Grants permission to describe groups on a cluster, equivalent to Apache Kafka's DESCRIBE GROUP ACL. |  `kafka-cluster:Connect`  | group | Yes | 
| kafka-cluster:AlterGroup | Grants permission to join groups on a cluster, equivalent to Apache Kafka's READ GROUP ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeGroup`  | group | Yes | 
| kafka-cluster:DeleteGroup | Grants permission to delete groups on a cluster, equivalent to Apache Kafka's DELETE GROUP ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeGroup`  | group | Yes | 
| kafka-cluster:DescribeTransactionalId | Grants permission to describe transactional IDs on a cluster, equivalent to Apache Kafka's DESCRIBE TRANSACTIONAL\$1ID ACL. |  `kafka-cluster:Connect`  | transactional-id | Yes | 
| kafka-cluster:AlterTransactionalId | Grants permission to alter transactional IDs on a cluster, equivalent to Apache Kafka's WRITE TRANSACTIONAL\$1ID ACL. |  `kafka-cluster:Connect` `kafka-cluster:DescribeTransactionalId` `kafka-cluster:WriteData`  | transactional-id | Yes | 

You can use the asterisk (\$1) wildcard any number of times in an action after the colon. The following are examples.
+ `kafka-cluster:*Topic` stands for `kafka-cluster:CreateTopic`, `kafka-cluster:DescribeTopic`, `kafka-cluster:AlterTopic`, and `kafka-cluster:DeleteTopic`. It doesn't include `kafka-cluster:DescribeTopicDynamicConfiguration` or `kafka-cluster:AlterTopicDynamicConfiguration`.
+ `kafka-cluster:*` stands for all permissions.

## Authorization policy resources
<a name="msk-iam-resources"></a>

The following table shows the four types of resources that you can use in an authorization policy when you use IAM access control for Amazon MSK. You can get the cluster Amazon Resource Name (ARN) from the AWS Management Console or by using the [DescribeCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html#DescribeCluster) API or the [describe-cluster](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kafka/describe-cluster.html) AWS CLI command. You can then use the cluster ARN to construct topic, group, and transactional ID ARNs. To specify a resource in an authorization policy, use that resource's ARN.


| Resource | ARN format | 
| --- | --- | 
| Cluster | arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid | 
| Topic | arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name | 
| Group | arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/group-name | 
| Transactional ID | arn:aws:kafka:region:account-id:transactional-id/cluster-name/cluster-uuid/transactional-id | 

You can use the asterisk (\$1) wildcard any number of times anywhere in the part of the ARN that comes after `:cluster/`, `:topic/`, `:group/`, and `:transactional-id/`. The following are some examples of how you can use the asterisk (\$1) wildcard to refer to multiple resources:
+ `arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*`: all the topics in any cluster named MyTestCluster, regardless of the cluster's UUID.
+ `arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1/*_test`: all topics whose name ends with "\$1test" in the cluster whose name is MyTestCluster and whose UUID is abcd1234-0123-abcd-5678-1234abcd-1.
+ `arn:aws:kafka:us-east-1:0123456789012:transactional-id/MyTestCluster/*/5555abcd-1111-abcd-1234-abcd1234-1`: all transactions whose transactional ID is 5555abcd-1111-abcd-1234-abcd1234-1, across all incarnations of a cluster named MyTestCluster in your account. This means that if you create a cluster named MyTestCluster, then delete it, and then create another cluster by the same name, you can use this resource ARN to represent the same transactions ID on both clusters. However, the deleted cluster isn't accessible.

# Common use cases for client authorization policy
<a name="iam-access-control-use-cases"></a>

The first column in the following table shows some common use cases. To authorize a client to carry out a given use case, include the required actions for that use case in the client's authorization policy, and set `Effect` to `Allow`.

For information about all the actions that are part of IAM access control for Amazon MSK, see [Semantics of IAM authorization policy actions and resources](kafka-actions.md).

**Note**  
Actions are denied by default. You must explicitly allow every action that you want to authorize the client to perform.


****  

| Use case | Required actions | 
| --- | --- | 
| Admin |  `kafka-cluster:*`  | 
| Create a topic |  `kafka-cluster:Connect` `kafka-cluster:CreateTopic`  | 
| Produce data |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:WriteData`  | 
| Consume data |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:DescribeGroup` `kafka-cluster:AlterGroup` `kafka-cluster:ReadData`  | 
| Produce data idempotently |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:WriteData` `kafka-cluster:WriteDataIdempotently`  | 
| Produce data transactionally |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:WriteData` `kafka-cluster:DescribeTransactionalId` `kafka-cluster:AlterTransactionalId`  | 
| Describe the configuration of a cluster |  `kafka-cluster:Connect` `kafka-cluster:DescribeClusterDynamicConfiguration`  | 
| Update the configuration of a cluster |  `kafka-cluster:Connect` `kafka-cluster:DescribeClusterDynamicConfiguration` `kafka-cluster:AlterClusterDynamicConfiguration`  | 
| Describe the configuration of a topic |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopicDynamicConfiguration` | 
| Update the configuration of a topic |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopicDynamicConfiguration` `kafka-cluster:AlterTopicDynamicConfiguration`  | 
| Alter a topic |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:AlterTopic`  | 

# Mutual TLS client authentication for Amazon MSK
<a name="msk-authentication"></a>

You can enable client authentication with TLS for connections from your applications to your Amazon MSK brokers. To use client authentication, you need an AWS Private CA. The AWS Private CA can be either in the same AWS account as your cluster, or in a different account. For information about AWS Private CAs, see [Creating and Managing a AWS Private CA](https://docs.aws.amazon.com/acm-pca/latest/userguide/create-CA.html).

Amazon MSK doesn't support certificate revocation lists (CRLs). To control access to your cluster topics or block compromised certificates, use Apache Kafka ACLs and AWS security groups. For information about using Apache Kafka ACLs, see [Apache Kafka ACLs](msk-acls.md).

**Topics**
+ [Create a Amazon MSK cluster that supports client authentication](msk-authentication-cluster.md)
+ [Set up a client to use authentication](msk-authentication-client.md)
+ [Produce and consume messages using authentication](msk-authentication-messages.md)

# Create a Amazon MSK cluster that supports client authentication
<a name="msk-authentication-cluster"></a>

This procedure shows you how to enable client authentication using a AWS Private CA.
**Note**  
We highly recommend using independent AWS Private CA for each MSK cluster when you use mutual TLS to control access. Doing so will ensure that TLS certificates signed by PCAs only authenticate with a single MSK cluster.

1. Create a file named `clientauthinfo.json` with the following contents. Replace *Private-CA-ARN* with the ARN of your PCA.

   ```
   {
      "Tls": {
          "CertificateAuthorityArnList": ["Private-CA-ARN"]
       }
   }
   ```

1. Create a file named `brokernodegroupinfo.json` as described in [Create a provisioned Amazon MSK cluster using the AWS CLI](create-cluster-cli.md).

1. Client authentication requires that you also enable encryption in transit between clients and brokers. Create a file named `encryptioninfo.json` with the following contents. Replace *KMS-Key-ARN* with the ARN of your KMS key. You can set `ClientBroker` to `TLS` or `TLS_PLAINTEXT`.

   ```
   {
      "EncryptionAtRest": {
          "DataVolumeKMSKeyId": "KMS-Key-ARN"
       },
      "EncryptionInTransit": {
           "InCluster": true,
           "ClientBroker": "TLS"
       }
   }
   ```

   For more information about encryption, see [Amazon MSK encryption](msk-encryption.md).

1. On a machine where you have the AWS CLI installed, run the following command to create a cluster with authentication and in-transit encryption enabled. Save the cluster ARN provided in the response.

   ```
   aws kafka create-cluster --cluster-name "AuthenticationTest" --broker-node-group-info file://brokernodegroupinfo.json --encryption-info file://encryptioninfo.json --client-authentication file://clientauthinfo.json --kafka-version "{YOUR KAFKA VERSION}" --number-of-broker-nodes 3
   ```

# Set up a client to use authentication
<a name="msk-authentication-client"></a>

This process describes how to set up an Amazon EC2 instance to use as a client to use authentication.

This process describes how to produce and consume messages using authentication by creating a client machine, creating a topic, and configuring the required security settings.

1. Create an Amazon EC2 instance to use as a client machine. For simplicity, create this instance in the same VPC you used for the cluster. See [Step 3: Create a client machine](create-client-machine.md) for an example of how to create such a client machine.

1. Create a topic. For an example, see the instructions under [Step 4: Create a topic in the Amazon MSK cluster](create-topic.md).

1. On a machine where you have the AWS CLI installed, run the following command to get the bootstrap brokers of the cluster. Replace *Cluster-ARN* with the ARN of your cluster.

   ```
   aws kafka get-bootstrap-brokers --cluster-arn Cluster-ARN
   ```

   Save the string associated with `BootstrapBrokerStringTls` in the response.

1. On your client machine, run the following command to use the JVM trust store to create your client trust store. If your JVM path is different, adjust the command accordingly.

   ```
   cp /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.201.b09-0.amzn2.x86_64/jre/lib/security/cacerts kafka.client.truststore.jks
   ```

1. On your client machine, run the following command to create a private key for your client. Replace *Distinguished-Name*, *Example-Alias*, *Your-Store-Pass*, and *Your-Key-Pass* with strings of your choice.

   ```
   keytool -genkey -keystore kafka.client.keystore.jks -validity 300 -storepass Your-Store-Pass -keypass Your-Key-Pass -dname "CN=Distinguished-Name" -alias Example-Alias -storetype pkcs12 -keyalg rsa
   ```

1. On your client machine, run the following command to create a certificate request with the private key you created in the previous step.

   ```
   keytool -keystore kafka.client.keystore.jks -certreq -file client-cert-sign-request -alias Example-Alias -storepass Your-Store-Pass -keypass Your-Key-Pass
   ```

1. Open the `client-cert-sign-request` file and ensure that it starts with `-----BEGIN CERTIFICATE REQUEST-----` and ends with `-----END CERTIFICATE REQUEST-----`. If it starts with `-----BEGIN NEW CERTIFICATE REQUEST-----`, delete the word `NEW` (and the single space that follows it) from the beginning and the end of the file.

1. On a machine where you have the AWS CLI installed, run the following command to sign your certificate request. Replace *Private-CA-ARN* with the ARN of your PCA. You can change the validity value if you want. Here we use 300 as an example.

   ```
   aws acm-pca issue-certificate --certificate-authority-arn Private-CA-ARN --csr fileb://client-cert-sign-request --signing-algorithm "SHA256WITHRSA" --validity Value=300,Type="DAYS"
   ```

   Save the certificate ARN provided in the response.
**Note**  
To retrieve your client certificate, use the `acm-pca get-certificate` command and specify your certificate ARN. For more information, see [get-certificate](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/acm-pca/get-certificate.html) in the *AWS CLI Command Reference*.

1. Run the following command to get the certificate that AWS Private CA signed for you. Replace *Certificate-ARN* with the ARN you obtained from the response to the previous command.

   ```
   aws acm-pca get-certificate --certificate-authority-arn Private-CA-ARN --certificate-arn Certificate-ARN
   ```

1. From the JSON result of running the previous command, copy the strings associated with `Certificate` and `CertificateChain`. Paste these two strings in a new file named signed-certificate-from-acm. Paste the string associated with `Certificate` first, followed by the string associated with `CertificateChain`. Replace the `\n` characters with new lines. The following is the structure of the file after you paste the certificate and certificate chain in it.

   ```
   -----BEGIN CERTIFICATE-----
   ...
   -----END CERTIFICATE-----
   -----BEGIN CERTIFICATE-----
   ...
   -----END CERTIFICATE-----
   -----BEGIN CERTIFICATE-----
   ...
   -----END CERTIFICATE-----
   ```

1. Run the following command on the client machine to add this certificate to your keystore so you can present it when you talk to the MSK brokers.

   ```
   keytool -keystore kafka.client.keystore.jks -import -file signed-certificate-from-acm -alias Example-Alias -storepass Your-Store-Pass -keypass Your-Key-Pass
   ```

1. Create a file named `client.properties` with the following contents. Adjust the truststore and keystore locations to the paths where you saved `kafka.client.truststore.jks`. Substitute your Kafka client version for the *\$1YOUR KAFKA VERSION\$1* placeholders.

   ```
   security.protocol=SSL
   ssl.truststore.location=/tmp/kafka_2.12-{YOUR KAFKA VERSION}/kafka.client.truststore.jks
   ssl.keystore.location=/tmp/kafka_2.12-{YOUR KAFKA VERSION}/kafka.client.keystore.jks
   ssl.keystore.password=Your-Store-Pass
   ssl.key.password=Your-Key-Pass
   ```

# Produce and consume messages using authentication
<a name="msk-authentication-messages"></a>

This process describes how to produce and consume messages using authentication.

1. Run the following command to create a topic. The file named `client.properties` is the one you created in the previous procedure.

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server BootstrapBroker-String --replication-factor 3 --partitions 1 --topic ExampleTopic --command-config client.properties
   ```

1. Run the following command to start a console producer. The file named `client.properties` is the one you created in the previous procedure.

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --bootstrap-server BootstrapBroker-String --topic ExampleTopic --producer.config client.properties
   ```

1. In a new command window on your client machine, run the following command to start a console consumer.

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-consumer.sh --bootstrap-server BootstrapBroker-String --topic ExampleTopic --consumer.config client.properties
   ```

1. Type messages in the producer window and watch them appear in the consumer window.

# Sign-in credentials authentication with AWS Secrets Manager
<a name="msk-password"></a>

You can control access to your Amazon MSK clusters using sign-in credentials that are stored and secured using AWS Secrets Manager. Storing user credentials in Secrets Manager reduces the overhead of cluster authentication such as auditing, updating, and rotating credentials. Secrets Manager also lets you share user credentials across clusters.

After you associate a secret with an MSK cluster, MSK syncs the credential data periodically.

**Topics**
+ [How sign-in credentials authentication works](msk-password-howitworks.md)
+ [Set up SASL/SCRAM authentication for an Amazon MSK cluster](msk-password-tutorial.md)
+ [Working with users](msk-password-users.md)
+ [Limitations when using SCRAM secrets](msk-password-limitations.md)

# How sign-in credentials authentication works
<a name="msk-password-howitworks"></a>

Sign-in credentials authentication for Amazon MSK uses SASL/SCRAM (Simple Authentication and Security Layer/ Salted Challenge Response Mechanism) authentication. To set up sign-in credentials authentication for a cluster, you create a Secret resource in [AWS Secrets Manager](https://docs.aws.amazon.com//secretsmanager/?id=docs_gateway), and associate sign-in credentials with that secret. 

SASL/SCRAM is defined in [RFC 5802](https://tools.ietf.org/html/rfc5802). SCRAM uses secured hashing algorithms, and does not transmit plaintext sign-in credentials between client and server. 

**Note**  
When you set up SASL/SCRAM authentication for your cluster, Amazon MSK turns on TLS encryption for all traffic between clients and brokers.

# Set up SASL/SCRAM authentication for an Amazon MSK cluster
<a name="msk-password-tutorial"></a>

To set up a secret in AWS Secrets Manager, follow the [Creating and Retrieving a Secret](https://docs.aws.amazon.com/secretsmanager/latest/userguide/tutorials_basic.html) tutorial in the [AWS Secrets Manager User Guide](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html).

Note the following requirements when creating a secret for an Amazon MSK cluster:
+ Choose **Other type of secrets (e.g. API key)** for the secret type.
+ Your secret name must begin with the prefix **AmazonMSK\$1**.
+ You must either use an existing custom AWS KMS key or create a new custom AWS KMS key for your secret. Secrets Manager uses the default AWS KMS key for a secret by default. 
**Important**  
A secret created with the default AWS KMS key cannot be used with an Amazon MSK cluster.
+ Your sign-in credential data must be in the following format to enter key-value pairs using the **Plaintext** option.

  ```
  {
    "username": "alice",
    "password": "alice-secret"
  }
  ```
+ Record the ARN (Amazon Resource Name) value for your secret. 
+ 
**Important**  
You can't associate a Secrets Manager secret with a cluster that exceeds the limits described in [Right-size your cluster: Number of partitions per Standard broker](bestpractices.md#partitions-per-broker).
+ If you use the AWS CLI to create the secret, specify a key ID or ARN for the `kms-key-id` parameter. Don't specify an alias.
+ To associate the secret with your cluster, use either the Amazon MSK console, or the [ BatchAssociateScramSecret](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-scram-secrets.html#BatchAssociateScramSecret) operation. 
**Important**  
When you associate a secret with a cluster, Amazon MSK attaches a resource policy to the secret that allows your cluster to access and read the secret values that you defined. You should not modify this resource policy. Doing so can prevent your cluster from accessing your secret. If you make any changes to the Secrets resource policy and/ or the KMS key used for secret encryption, make sure you re-associate the secrets to your MSK cluster. This will make sure that your cluster can continue accessing your secret.

  The following example JSON input for the `BatchAssociateScramSecret` operation associates a secret with a cluster:

  ```
  {
    "clusterArn" : "arn:aws:kafka:us-west-2:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",          
    "secretArnList": [
      "arn:aws:secretsmanager:us-west-2:0123456789019:secret:AmazonMSK_MyClusterSecret"
    ]
  }
  ```

# Connecting to your cluster with sign-in credentials
<a name="msk-password-tutorial-connect"></a>

After you create a secret and associate it with your cluster, you can connect your client to the cluster. The following procedure demonstrates how to connect a client to a cluster that uses SASL/SCRAM authentication. It also shows how to produce to and consume from an example topic.

**Topics**
+ [Connecting a client to cluster using SASL/SCRAM authentication](#w2aab9c13c29c17c13c11b9b7)
+ [Troubleshooting connection issues](#msk-password-tutorial-connect-troubleshooting)

## Connecting a client to cluster using SASL/SCRAM authentication
<a name="w2aab9c13c29c17c13c11b9b7"></a>

1. Run the following command on a machine that has AWS CLI installed. Replace *clusterARN* with the ARN of your cluster.

   ```
   aws kafka get-bootstrap-brokers --cluster-arn clusterARN
   ```

   From the JSON result of this command, save the value associated with the string named `BootstrapBrokerStringSaslScram`. You'll use this value in later steps.

1. On your client machine, create a JAAS configuration file that contains the user credentials stored in your secret. For example, for the user **alice**, create a file called `users_jaas.conf` with the following content.

   ```
   KafkaClient {
      org.apache.kafka.common.security.scram.ScramLoginModule required
      username="alice"
      password="alice-secret";
   };
   ```

1. Use the following command to export your JAAS config file as a `KAFKA_OPTS` environment parameter.

   ```
   export KAFKA_OPTS=-Djava.security.auth.login.config=<path-to-jaas-file>/users_jaas.conf
   ```

1. Create a file named `kafka.client.truststore.jks` in a `/tmp` directory.

1. (Optional) Use the following command to copy the JDK key store file from your JVM `cacerts` folder into the `kafka.client.truststore.jks` file that you created in the previous step. Replace *JDKFolder* with the name of the JDK folder on your instance. For example, your JDK folder might be named `java-1.8.0-openjdk-1.8.0.201.b09-0.amzn2.x86_64`.

   ```
   cp /usr/lib/jvm/JDKFolder/lib/security/cacerts /tmp/kafka.client.truststore.jks
   ```

1. In the `bin` directory of your Apache Kafka installation, create a client properties file called `client_sasl.properties` with the following contents. This file defines the SASL mechanism and protocol.

   ```
   security.protocol=SASL_SSL
   sasl.mechanism=SCRAM-SHA-512
   ```

1. To create an example topic, run the following command. Replace *BootstrapBrokerStringSaslScram* with the bootstrap broker string that you obtained in step 1 of this topic.

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server BootstrapBrokerStringSaslScram --command-config <path-to-client-properties>/client_sasl.properties --replication-factor 3 --partitions 1 --topic ExampleTopicName
   ```

1. To produce to the example topic that you created, run the following command on your client machine. Replace *BootstrapBrokerStringSaslScram* with the bootstrap broker string that you retrieved in step 1 of this topic.

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list BootstrapBrokerStringSaslScram --topic ExampleTopicName --producer.config client_sasl.properties
   ```

1. To consume from the topic you created, run the following command on your client machine. Replace *BootstrapBrokerStringSaslScram* with the bootstrap broker string that you obtained in step 1 of this topic.

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-consumer.sh --bootstrap-server BootstrapBrokerStringSaslScram --topic ExampleTopicName --from-beginning --consumer.config client_sasl.properties
   ```

## Troubleshooting connection issues
<a name="msk-password-tutorial-connect-troubleshooting"></a>

When running Kafka client commands, you might encounter Java heap memory errors, especially when working with large topics or datasets. These errors occur because Kafka tools run as Java applications with default memory settings that might be insufficient for your workload.

To resolve `Out of Memory Java Heap` errors, you can increase the Java heap size by modifying the `KAFKA_OPTS` environment variable to include memory settings.

The following example sets the maximum heap size to 1GB (`-Xmx1G`). You can adjust this value based on your available system memory and requirements.

```
export KAFKA_OPTS="-Djava.security.auth.login.config=<path-to-jaas-file>/users_jaas.conf -Xmx1G"
```

For consuming large topics, consider using time-based or offset-based parameters instead of `--from-beginning` to limit memory usage:

```
<path-to-your-kafka-installation>/bin/kafka-console-consumer.sh --bootstrap-server BootstrapBrokerStringSaslScram --topic ExampleTopicName --max-messages 1000 --consumer.config client_sasl.properties
```

# Working with users
<a name="msk-password-users"></a>

**Creating users:** You create users in your secret as key-value pairs. When you use the **Plaintext** option in the Secrets Manager console, you should specify sign-in credential data in the following format.

```
{
  "username": "alice",
  "password": "alice-secret"
}
```

**Revoking user access:** To revoke a user's credentials to access a cluster, we recommend that you first remove or enforce an ACL on the cluster, and then disassociate the secret. This is because of the following:
+ Removing a user does not close existing connections.
+ Changes to your secret take up to 10 minutes to propagate.

For information about using an ACL with Amazon MSK, see [Apache Kafka ACLs](msk-acls.md).

For clusters using ZooKeeper mode, we recommend that you restrict access to your ZooKeeper nodes to prevent users from modifying ACLs. For more information, see [Control access to Apache ZooKeeper nodes in your Amazon MSK cluster](zookeeper-security.md).

# Limitations when using SCRAM secrets
<a name="msk-password-limitations"></a>

Note the following limitations when using SCRAM secrets:
+ Amazon MSK only supports SCRAM-SHA-512 authentication.
+ An Amazon MSK cluster can have up to 1000 users.
+ You must use an AWS KMS key with your Secret. You cannot use a Secret that uses the default Secrets Manager encryption key with Amazon MSK. For information about creating a KMS key, see [Creating symmetric encryption KMS keys](https://docs.aws.amazon.com/kms/latest/developerguide/create-keys.html#create-symmetric-cmk).
+ You can't use an asymmetric KMS key with Secrets Manager.
+ You can associate up to 10 secrets with a cluster at a time using the [ BatchAssociateScramSecret](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-scram-secrets.html#BatchAssociateScramSecret) operation.
+ The name of secrets associated with an Amazon MSK cluster must have the prefix **AmazonMSK\$1**.
+ Secrets associated with an Amazon MSK cluster must be in the same Amazon Web Services account and AWS region as the cluster.

# Apache Kafka ACLs
<a name="msk-acls"></a>

Apache Kafka has a pluggable authorizer and ships with an out-of-box authorizer implementation. Amazon MSK enables this authorizer in the `server.properties` file on the brokers.

Apache Kafka ACLs have the format "Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP". If RP doesn't match a specific resource R, then R has no associated ACLs, and therefore no one other than super users is allowed to access R. To change this Apache Kafka behavior, you set the property `allow.everyone.if.no.acl.found` to true. Amazon MSK sets it to true by default. This means that with Amazon MSK clusters, if you don't explicitly set ACLs on a resource, all principals can access this resource. If you enable ACLs on a resource, only the authorized principals can access it. If you want to restrict access to a topic and authorize a client using TLS mutual authentication, add ACLs using the Apache Kafka authorizer CLI. For more information about adding, removing, and listing ACLs, see [Kafka Authorization Command Line Interface](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Authorization+Command+Line+Interface).

Because Amazon MSK configures brokers as super users, they can access all topics. This helps the brokers to replicate messages from the primary partition whether or not the `allow.everyone.if.no.acl.found` property is defined for the cluster's configuration.

**To add or remove read and write access to a topic**

1. Add your brokers to the ACL table to allow them to read from all topics that have ACLs in place. To grant your brokers read access to a topic, run the following command on a client machine that can communicate with the MSK cluster. 

   Replace *Distinguished-Name* with the DNS of any of your cluster's bootstrap brokers, then replace the string before the first period in this distinguished name by an asterisk (`*`). For example, if one of your cluster's bootstrap brokers has the DNS `b-6.mytestcluster.67281x.c4.kafka.us-east-1.amazonaws.com`, replace *Distinguished-Name* in the following command with `*.mytestcluster.67281x.c4.kafka.us-east-1.amazonaws.com`. For information on how to get the bootstrap brokers, see [Get the bootstrap brokers for an Amazon MSK cluster](msk-get-bootstrap-brokers.md).

   ```
   <path-to-your-kafka-installation>/bin/kafka-acls.sh --bootstrap-server BootstrapServerString --add --allow-principal "User:CN=Distinguished-Name" --operation Read --group=* --topic Topic-Name
   ```

1. To grant a client application read access to a topic, run the following command on your client machine. If you use mutual TLS authentication, use the same *Distinguished-Name* you used when you created the private key.

   ```
   <path-to-your-kafka-installation>/bin/kafka-acls.sh --bootstrap-server BootstrapServerString --add --allow-principal "User:CN=Distinguished-Name" --operation Read --group=* --topic Topic-Name
   ```

   To remove read access, you can run the same command, replacing `--add` with `--remove`.

1. To grant write access to a topic, run the following command on your client machine. If you use mutual TLS authentication, use the same *Distinguished-Name* you used when you created the private key.

   ```
   <path-to-your-kafka-installation>/bin/kafka-acls.sh --bootstrap-server BootstrapServerString --add --allow-principal "User:CN=Distinguished-Name" --operation Write --topic Topic-Name
   ```

   To remove write access, you can run the same command, replacing `--add` with `--remove`.

# Changing an Amazon MSK cluster's security group
<a name="change-security-group"></a>

This page explains how to change the security group of an existing MSK cluster. You might need to change a cluster's security group in order to provide access to a certain set of users or to limit access to the cluster. For information about security groups, see [Security groups for your VPC](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_SecurityGroups.html) in the Amazon VPC user guide.

1. Use the [ListNodes](https://docs.amazonaws.cn/en_us/msk/1.0/apireference/clusters-clusterarn-nodes.html#ListNodes) API or the [list-nodes](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kafka/list-nodes.html) command in the AWS CLI to get a list of the brokers in your cluster. The results of this operation include the IDs of the elastic network interfaces (ENIs) that are associated with the brokers.

1. Sign in to the AWS Management Console and open the Amazon EC2 console at [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Using the dropdown list near the top-right corner of the screen, select the Region in which the cluster is deployed.

1. In the left pane, under **Network & Security**, choose **Network Interfaces**.

1. Select the first ENI that you obtained in the first step. Choose the **Actions** menu at the top of the screen, then choose **Change Security Groups**. Assign the new security group to this ENI. Repeat this step for each of the ENIs that you obtained in the first step.
**Note**  
Changes that you make to a cluster's security group using the Amazon EC2 console aren't reflected in the MSK console under **Network settings**.

1. Configure the new security group's rules to ensure that your clients have access to the brokers. For information about setting security group rules, see [Adding, Removing, and Updating Rules](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_SecurityGroups.html?shortFooter=true#AddRemoveRules) in the Amazon VPC user guide.

**Important**  
If you change the security group that is associated with the brokers of a cluster, and then add new brokers to that cluster, Amazon MSK associates the new brokers with the original security group that was associated with the cluster when the cluster was created. However, for a cluster to work correctly, all of its brokers must be associated with the same security group. Therefore, if you add new brokers after changing the security group, you must follow the previous procedure again and update the ENIs of the new brokers.

# Control access to Apache ZooKeeper nodes in your Amazon MSK cluster
<a name="zookeeper-security"></a>

For security reasons you can limit access to the Apache ZooKeeper nodes that are part of your Amazon MSK cluster. To limit access to the nodes, you can assign a separate security group to them. You can then decide who gets access to that security group.

**Important**  
This section does not apply for clusters running in KRaft mode. See [KRaft mode](metadata-management.md#kraft-intro).

**Topics**
+ [To place your Apache ZooKeeper nodes in a separate security group](zookeeper-security-group.md)
+ [Using TLS security with Apache ZooKeeper](zookeeper-security-tls.md)

# To place your Apache ZooKeeper nodes in a separate security group
<a name="zookeeper-security-group"></a>

To limit access to Apache ZooKeeper nodes, you can assign a separate security group to them. You can choose who has access to this new security group by setting security group rules.

1. Get the Apache ZooKeeper connection string for your cluster. To learn how, see [ZooKeeper mode](metadata-management.md#msk-get-connection-string). The connection string contains the DNS names of your Apache ZooKeeper nodes.

1. Use a tool like `host` or `ping` to convert the DNS names you obtained in the previous step to IP addresses. Save these IP addresses because you need them later in this procedure.

1. Sign in to the AWS Management Console and open the Amazon EC2 console at [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. In the left pane, under **NETWORK & SECURITY**, choose **Network Interfaces**.

1. In the search field above the table of network interfaces, type the name of your cluster, then type return. This limits the number of network interfaces that appear in the table to those interfaces that are associated with your cluster.

1. Select the check box at the beginning of the row that corresponds to the first network interface in the list.

1. In the details pane at the bottom of the page, look for the **Primary private IPv4 IP**. If this IP address matches one of the IP addresses you obtained in the first step of this procedure, this means that this network interface is assigned to an Apache ZooKeeper node that is part of your cluster. Otherwise, deselect the check box next to this network interface, and select the next network interface in the list. The order in which you select the network interfaces doesn't matter. In the next steps, you will perform the same operations on all network interfaces that are assigned to Apache ZooKeeper nodes, one by one.

1. When you select a network interface that corresponds to an Apache ZooKeeper node, choose the **Actions** menu at the top of the page, then choose **Change Security Groups**. Assign a new security group to this network interface. For information about creating security groups, see [Creating a Security Group](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_SecurityGroups.html?shortFooter=true#CreatingSecurityGroups) in the Amazon VPC documentation.

1. Repeat the previous step to assign the same new security group to all the network interfaces that are associated with the Apache ZooKeeper nodes of your cluster.

1. You can now choose who has access to this new security group. For information about setting security group rules, see [Adding, Removing, and Updating Rules](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_SecurityGroups.html?shortFooter=true#AddRemoveRules) in the Amazon VPC documentation.

# Using TLS security with Apache ZooKeeper
<a name="zookeeper-security-tls"></a>

You can use TLS security for encryption in transit between your clients and your Apache ZooKeeper nodes. To implement TLS security with your Apache ZooKeeper nodes, do the following:
+ Clusters must use Apache Kafka version 2.5.1 or later to use TLS security with Apache ZooKeeper.
+ Enable TLS security when you create or configure your cluster. Clusters created with Apache Kafka version 2.5.1 or later with TLS enabled automatically use TLS security with Apache ZooKeeper endpoints. For information about setting up TLS security, see [Get started with Amazon MSK encryption](msk-working-with-encryption.md).
+ Retrieve the TLS Apache ZooKeeper endpoints using the [DescribeCluster ](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html#DescribeCluster) operation.
+ Create an Apache ZooKeeper configuration file for use with the `kafka-configs.sh` and [https://kafka.apache.org/documentation/#security_authz_cli](https://kafka.apache.org/documentation/#security_authz_cli) tools, or with the ZooKeeper shell. With each tool, you use the `--zk-tls-config-file` parameter to specify your Apache ZooKeeper config.

  The following example shows a typical Apache ZooKeeper configuration file: 

  ```
  zookeeper.ssl.client.enable=true
  zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
  zookeeper.ssl.keystore.location=kafka.jks
  zookeeper.ssl.keystore.password=test1234
  zookeeper.ssl.truststore.location=truststore.jks
  zookeeper.ssl.truststore.password=test1234
  ```
+ For other commands (such as `kafka-topics`), you must use the `KAFKA_OPTS` environment variable to configure Apache ZooKeeper parameters. The following example shows how to configure the `KAFKA_OPTS` environment variable to pass Apache ZooKeeper parameters into other commands:

  ```
  export KAFKA_OPTS="
  -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty 
  -Dzookeeper.client.secure=true 
  -Dzookeeper.ssl.trustStore.location=/home/ec2-user/kafka.client.truststore.jks
  -Dzookeeper.ssl.trustStore.password=changeit"
  ```

  After you configure the `KAFKA_OPTS` environment variable, you can use CLI commands normally. The following example creates an Apache Kafka topic using the Apache ZooKeeper configuration from the `KAFKA_OPTS` environment variable:

  ```
  <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --zookeeper ZooKeeperTLSConnectString --replication-factor 3 --partitions 1 --topic AWSKafkaTutorialTopic
  ```

**Note**  
The names of the parameters you use in your Apache ZooKeeper configuration file and those you use in your `KAFKA_OPTS` environment variable are not consistent. Pay attention to which names you use with which parameters in your configuration file and `KAFKA_OPTS` environment variable.

For more information about accessing your Apache ZooKeeper nodes with TLS, see [ KIP-515: Enable ZK client to use the new TLS supported authentication](https://cwiki.apache.org/confluence/display/KAFKA/KIP-515%3A+Enable+ZK+client+to+use+the+new+TLS+supported+authentication).

# Compliance validation for Amazon Managed Streaming for Apache Kafka
<a name="MSK-compliance"></a>

Third-party auditors assess the security and compliance of Amazon Managed Streaming for Apache Kafka as part of AWS compliance programs. These include PCI and HIPAA BAA.

For a list of AWS services in scope of specific compliance programs, see [Amazon Services in Scope by Compliance Program](https://aws.amazon.com/compliance/services-in-scope/). For general information, see [AWS Compliance Programs](https://aws.amazon.com/compliance/programs/).

You can download third-party audit reports using AWS Artifact. For more information, see [Downloading Reports in AWS Artifact](https://docs.aws.amazon.com/artifact/latest/ug/downloading-documents.html).

Your compliance responsibility when using Amazon MSK is determined by the sensitivity of your data, your company's compliance objectives, and applicable laws and regulations. AWS provides the following resources to help with compliance:
+ [Security and Compliance Quick Start Guides](https://aws.amazon.com/quickstart/?awsf.quickstart-homepage-filter=categories%23security-identity-compliance) – These deployment guides discuss architectural considerations and provide steps for deploying security- and compliance-focused baseline environments on AWS.
+ [Architecting for HIPAA Security and Compliance Whitepaper](https://docs.aws.amazon.com/whitepapers/latest/architecting-hipaa-security-and-compliance-on-aws/architecting-hipaa-security-and-compliance-on-aws.html) – This whitepaper describes how companies can use AWS to create HIPAA-compliant applications.
+ [AWS Compliance Resources](https://aws.amazon.com/compliance/resources/) – This collection of workbooks and guides might apply to your industry and location.
+ [Evaluating Resources with Rules](https://docs.aws.amazon.com/config/latest/developerguide/evaluate-config.html) in the *AWS Config Developer Guide* – The AWS Config service assesses how well your resource configurations comply with internal practices, industry guidelines, and regulations.
+ [AWS Security Hub CSPM](https://docs.aws.amazon.com/securityhub/latest/userguide/what-is-securityhub.html) – This AWS service provides a comprehensive view of your security state within AWS that helps you check your compliance with security industry standards and best practices.

# Resilience in Amazon Managed Streaming for Apache Kafka
<a name="disaster-recovery-resiliency"></a>

The AWS global infrastructure is built around AWS Regions and Availability Zones. AWS Regions provide multiple physically separated and isolated Availability Zones, which are connected with low-latency, high-throughput, and highly redundant networking. With Availability Zones, you can design and operate applications and databases that automatically fail over between zones without interruption. Availability Zones are more highly available, fault tolerant, and scalable than traditional single or multiple data center infrastructures. 

For more information about AWS Regions and Availability Zones, see [AWS Global Infrastructure](https://aws.amazon.com/about-aws/global-infrastructure/).

# Infrastructure security in Amazon Managed Streaming for Apache Kafka
<a name="infrastructure-security"></a>

As a managed service, Amazon Managed Streaming for Apache Kafka is protected by the AWS global network security procedures that are described in the [Amazon Web Services: Overview of Security Processes](https://d0.awsstatic.com/whitepapers/Security/AWS_Security_Whitepaper.pdf) whitepaper.

You use AWS published API calls to access Amazon MSK through the network. Clients must support Transport Layer Security (TLS) 1.0 or later. We recommend TLS 1.2 or later. Clients must also support cipher suites with perfect forward secrecy (PFS) such as Ephemeral Diffie-Hellman (DHE) or Elliptic Curve Ephemeral Diffie-Hellman (ECDHE). Most modern systems such as Java 7 and later support these modes.

Additionally, requests must be signed by using an access key ID and a secret access key that is associated with an IAM principal. Or you can use the [AWS Security Token Service](https://docs.aws.amazon.com/STS/latest/APIReference/Welcome.html) (AWS STS) to generate temporary security credentials to sign requests.

# Amazon MSK logging
<a name="msk-logging"></a>

You can deliver Apache Kafka broker logs to one or more of the following destination types: Amazon CloudWatch Logs, Amazon S3, Amazon Data Firehose. You can also log Amazon MSK API calls with AWS CloudTrail.

**Note**  
Broker logs are available on both MSK Standard and Express brokers.

## Broker logs
<a name="broker-logs"></a>

Broker logs enable you to troubleshoot your Apache Kafka applications and to analyze their communications with your MSK cluster. You can configure your new or existing MSK cluster to deliver INFO-level broker logs to one or more of the following types of destination resources: a CloudWatch log group, an S3 bucket, a Firehose delivery stream. Through Firehose you can then deliver the log data from your delivery stream to OpenSearch Service.

You must create a destination resource before you configure your cluster to deliver broker logs to it. Amazon MSK doesn't create these destination resources for you if they don't already exist. For information about these three types of destination resources and how to create them, see the following documentation:
+ [Amazon CloudWatch Logs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/WhatIsCloudWatchLogs.html)
+ [Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html)
+ [Amazon Data Firehose](https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html)

### Required permissions
<a name="broker-logs-perms"></a>

To configure a destination for Amazon MSK broker logs, the IAM identity that you use for Amazon MSK actions must have the permissions described in the [AWS managed policy: AmazonMSKFullAccess](security-iam-awsmanpol-AmazonMSKFullAccess.md) policy. 

To stream broker logs to an S3 bucket, you also need the `s3:PutBucketPolicy` permission. For information about S3 bucket policies, see [How Do I Add an S3 Bucket Policy?](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/add-bucket-policy.html) in the Amazon S3 User Guide. For information about IAM policies in general, see [Access Management](https://docs.aws.amazon.com/IAM/latest/UserGuide/access.html) in the IAM User Guide. 

### Required KMS key policy for use with SSE-KMS buckets
<a name="sse-kms-buckets"></a>

If you enabled server-side encryption for your S3 bucket using AWS KMS-managed keys (SSE-KMS) with a customer managed key, add the following to the key policy for your KMS key so that Amazon MSK can write broker files to the bucket.

```
{
  "Sid": "Allow Amazon MSK to use the key.",
  "Effect": "Allow",
  "Principal": {
    "Service": [
      "delivery.logs.amazonaws.com"
    ]
  },
  "Action": [
    "kms:Encrypt",
    "kms:Decrypt",
    "kms:ReEncrypt*",
    "kms:GenerateDataKey*",
    "kms:DescribeKey"
  ],
  "Resource": "*"
}
```

### Configure broker logs using the AWS Management Console
<a name="broker-logs-console"></a>

If you are creating a new cluster, look for the **Broker log delivery** heading in the **Monitoring** section. You can specify the destinations to which you want Amazon MSK to deliver your broker logs. 

For an existing cluster, choose the cluster from your list of clusters, then choose the **Properties** tab. Scroll down to the **Log delivery** section and then choose its **Edit** button. You can specify the destinations to which you want Amazon MSK to deliver your broker logs.

### Configure broker logs using the AWS CLI
<a name="broker-logs-cli"></a>

When you use the `create-cluster` or the `update-monitoring` commands, you can optionally specify the `logging-info` parameter and pass to it a JSON structure like the following example. In this JSON, all three destination types are optional.

**Note**  
You must set the `LogDeliveryEnabled` tag to `true`on Firehose streams to set up log delivery. The service-linked role that AWS creates for CloudWatch logs uses this tag to grant permission for all Firehose delivery streams. If you remove this tag, the service-linked role won't be able to deliver logs to the Firehose stream. To see an example of an IAM policy that shows the permissions that the service-linked role includes, see [IAM roles used for resource permissions](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AWS-logs-infrastructure-V2-Firehose.html) in the *Amazon CloudWatch User Guide*.

```
{
  "BrokerLogs": {
    "S3": {
      "Bucket": "amzn-s3-demo-bucket",
      "Prefix": "ExamplePrefix",
      "Enabled": true
    },
    "Firehose": {
      "DeliveryStream": "ExampleDeliveryStreamName",
      "Enabled": true
    },
    "CloudWatchLogs": {
      "Enabled": true,
      "LogGroup": "ExampleLogGroupName"
    }
  }
}
```

### Configure broker logs using the API
<a name="broker-logs-api"></a>

You can specify the optional `loggingInfo` structure in the JSON that you pass to the [CreateCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters.html#CreateCluster) or [UpdateMonitoring](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-monitoring.html#UpdateMonitoring) operations.

**Note**  
By default, when broker logging is enabled, Amazon MSK logs `INFO` level logs to the specified destinations. However for Standard brokers, users of Apache Kafka 2.4.X and later can dynamically set the broker log level to any of the [log4j log levels](https://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/Level.html). For information about dynamically setting the broker log level, see [ KIP-412: Extend Admin API to support dynamic application log levels](https://cwiki.apache.org/confluence/display/KAFKA/KIP-412%3A+Extend+Admin+API+to+support+dynamic+application+log+levels). If you dynamically set the log level to `DEBUG` or `TRACE`, we recommend using Amazon S3 or Firehose as the log destination. If you use CloudWatch Logs as a log destination and you dynamically enable `DEBUG` or `TRACE` level logging, Amazon MSK may continuously deliver a sample of logs. This can significantly impact broker performance and should only be used when the `INFO` log level is not verbose enough to determine the root cause of an issue.

# Log API calls with AWS CloudTrail
<a name="logging-API-using-cloudtrail"></a>



**Note**  
AWS CloudTrail logs are available for Amazon MSK only when you use [IAM access control](iam-access-control.md).

Amazon MSK is integrated with AWS CloudTrail, a service that provides a record of actions taken by a user, role, or an AWS service in Amazon MSK. CloudTrail captures API calls for as events. The calls captured include calls from the Amazon MSK console and code calls to the Amazon MSK API operations. It also captures Apache Kafka actions such as creating and altering topics and groups.

If you create a trail, you can enable continuous delivery of CloudTrail events to an Amazon S3 bucket, including events for Amazon MSK. If you don't configure a trail, you can still view the most recent events in the CloudTrail console in **Event history**. Using the information collected by CloudTrail, you can determine the request that was made to Amazon MSK or the Apache Kafka action, the IP address from which the request was made, who made the request, when it was made, and additional details. 

To learn more about CloudTrail, including how to configure and enable it, see the [AWS CloudTrail User Guide](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/).

## Amazon MSK information in CloudTrail
<a name="msk-info-in-cloudtrail"></a>

CloudTrail is enabled on your Amazon Web Services account when you create the account. When supported event activity occurs in an MSK cluster, that activity is recorded in a CloudTrail event along with other AWS service events in **Event history**. You can view, search, and download recent events in your Amazon Web Services account. For more information, see [Viewing Events with CloudTrail Event History](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/view-cloudtrail-events.html). 

For an ongoing record of events in your Amazon Web Services account, including events for Amazon MSK, create a trail. A *trail* enables CloudTrail to deliver log files to an Amazon S3 bucket. By default, when you create a trail in the console, the trail applies to all Regions. The trail logs events from all Regions in the AWS partition and delivers the log files to the Amazon S3 bucket that you specify. Additionally, you can configure other Amazon services to further analyze and act upon the event data collected in CloudTrail logs. For more information, see the following: 
+ [Overview for Creating a Trail](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-create-and-update-a-trail.html)
+ [CloudTrail Supported Services and Integrations](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-aws-service-specific-topics.html#cloudtrail-aws-service-specific-topics-integrations)
+ [Configuring Amazon SNS Notifications for CloudTrail](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/getting_notifications_top_level.html)
+ [Receiving CloudTrail Log Files from Multiple Regions](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/receive-cloudtrail-log-files-from-multiple-regions.html) and [Receiving CloudTrail Log Files from Multiple Accounts](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-receive-logs-from-multiple-accounts.html)

Amazon MSK logs all [Amazon MSK operations](https://docs.aws.amazon.com/MSK/2.0/APIReference/operations.html) as events in CloudTrail log files. In addition, it logs the following Apache Kafka actions.
+ kafka-cluster:DescribeClusterDynamicConfiguration 
+ kafka-cluster:AlterClusterDynamicConfiguration 
+ kafka-cluster:CreateTopic 
+ kafka-cluster:DescribeTopicDynamicConfiguration 
+ kafka-cluster:AlterTopic 
+ kafka-cluster:AlterTopicDynamicConfiguration 
+ kafka-cluster:DeleteTopic

Every event or log entry contains information about who generated the request. The identity information helps you determine the following: 
+ Whether the request was made with root user or AWS Identity and Access Management (IAM) user credentials.
+ Whether the request was made with temporary security credentials for a role or federated user.
+ Whether the request was made by another AWS service.

For more information, see the [CloudTrail userIdentity Element](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-event-reference-user-identity.html).

## Example: Amazon MSK log file entries
<a name="understanding-msk-entries"></a>

A trail is a configuration that enables delivery of events as log files to an Amazon S3 bucket that you specify. CloudTrail log files contain one or more log entries. An event represents a single request from any source and includes information about the requested action, the date and time of the action, request parameters, and so on. CloudTrail log files aren't an ordered stack trace of the public API calls and Apache Kafka actions, so they don't appear in any specific order.

The following example shows CloudTrail log entries that demonstrate the `DescribeCluster` and `DeleteCluster` Amazon MSK actions.

```
{
  "Records": [
    {
      "eventVersion": "1.05",
      "userIdentity": {
        "type": "IAMUser",
        "principalId": "ABCDEF0123456789ABCDE",
        "arn": "arn:aws:iam::012345678901:user/Joe",
        "accountId": "012345678901",
        "accessKeyId": "AIDACKCEVSQ6C2EXAMPLE",
        "userName": "Joe"
      },
      "eventTime": "2018-12-12T02:29:24Z",
      "eventSource": "kafka.amazonaws.com",
      "eventName": "DescribeCluster",
      "awsRegion": "us-east-1",
      "sourceIPAddress": "192.0.2.0",
      "userAgent": "aws-cli/1.14.67 Python/3.6.0 Windows/10 botocore/1.9.20",
      "requestParameters": {
        "clusterArn": "arn%3Aaws%3Akafka%3Aus-east-1%3A012345678901%3Acluster%2Fexamplecluster%2F01234567-abcd-0123-abcd-abcd0123efa-2"
      },
      "responseElements": null,
      "requestID": "bd83f636-fdb5-abcd-0123-157e2fbf2bde",
      "eventID": "60052aba-0123-4511-bcde-3e18dbd42aa4",
      "readOnly": true,
      "eventType": "AwsApiCall",
      "recipientAccountId": "012345678901"
    },
    {
      "eventVersion": "1.05",
      "userIdentity": {
        "type": "IAMUser",
        "principalId": "ABCDEF0123456789ABCDE",
        "arn": "arn:aws:iam::012345678901:user/Joe",
        "accountId": "012345678901",
        "accessKeyId": "AIDACKCEVSQ6C2EXAMPLE",
        "userName": "Joe"
      },
      "eventTime": "2018-12-12T02:29:40Z",
      "eventSource": "kafka.amazonaws.com",
      "eventName": "DeleteCluster",
      "awsRegion": "us-east-1",
      "sourceIPAddress": "192.0.2.0",
      "userAgent": "aws-cli/1.14.67 Python/3.6.0 Windows/10 botocore/1.9.20",
      "requestParameters": {
        "clusterArn": "arn%3Aaws%3Akafka%3Aus-east-1%3A012345678901%3Acluster%2Fexamplecluster%2F01234567-abcd-0123-abcd-abcd0123efa-2"
      },
      "responseElements": {
        "clusterArn": "arn:aws:kafka:us-east-1:012345678901:cluster/examplecluster/01234567-abcd-0123-abcd-abcd0123efa-2",
        "state": "DELETING"
      },
      "requestID": "c6bfb3f7-abcd-0123-afa5-293519897703",
      "eventID": "8a7f1fcf-0123-abcd-9bdb-1ebf0663a75c",
      "readOnly": false,
      "eventType": "AwsApiCall",
      "recipientAccountId": "012345678901"
    }
  ]
}
```

The following example shows a CloudTrail log entry that demonstrates the `kafka-cluster:CreateTopic` action.

```
{
  "eventVersion": "1.08",
  "userIdentity": {
    "type": "IAMUser",
    "principalId": "ABCDEFGH1IJKLMN2P34Q5",
    "arn": "arn:aws:iam::111122223333:user/Admin",
    "accountId": "111122223333",
    "accessKeyId": "CDEFAB1C2UUUUU3AB4TT",
    "userName": "Admin"
  },
  "eventTime": "2021-03-01T12:51:19Z",
  "eventSource": "kafka-cluster.amazonaws.com",
  "eventName": "CreateTopic",
  "awsRegion": "us-east-1",
  "sourceIPAddress": "198.51.100.0/24",
  "userAgent": "aws-msk-iam-auth/unknown-version/aws-internal/3 aws-sdk-java/1.11.970 Linux/4.14.214-160.339.amzn2.x86_64 OpenJDK_64-Bit_Server_VM/25.272-b10 java/1.8.0_272 scala/2.12.8 vendor/Red_Hat,_Inc.",
  "requestParameters": {
    "kafkaAPI": "CreateTopics",
    "resourceARN": "arn:aws:kafka:us-east-1:111122223333:topic/IamAuthCluster/3ebafd8e-dae9-440d-85db-4ef52679674d-1/Topic9"
  },
  "responseElements": null,
  "requestID": "e7c5e49f-6aac-4c9a-a1d1-c2c46599f5e4",
  "eventID": "be1f93fd-4f14-4634-ab02-b5a79cb833d2",
  "readOnly": false,
  "eventType": "AwsApiCall",
  "managementEvent": true,
  "eventCategory": "Management",
  "recipientAccountId": "111122223333"
}
```

# Metadata management
<a name="metadata-management"></a>

Amazon MSK supports Apache ZooKeeper or KRaft metadata management modes.

From Apache Kafka version 3.7.x on Amazon MSK, you can create clusters that use KRaft mode instead of ZooKeeper mode. KRaft-based clusters rely on controllers within Kafka to manage metadata.

**Topics**
+ [ZooKeeper mode](#msk-get-connection-string)
+ [KRaft mode](#kraft-intro)

## ZooKeeper mode
<a name="msk-get-connection-string"></a>

[Apache ZooKeeper](https://zookeeper.apache.org/) is "a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications," including Apache Kafka.

If your cluster is using ZooKeeper mode, you can use the steps below to get the Apache ZooKeeper connection string. However, we recommend that you use the `BootstrapServerString` to connect to your cluster and perfom admin operations as the `--zookeeper` flag has been deprecated in Kafka 2.5 and is removed from Kafka 3.0.

### Getting the Apache ZooKeeper connection string using the AWS Management Console
<a name="get-connection-string-console"></a>

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. The table shows all the clusters for the current region under this account. Choose the name of a cluster to view its description.

1. On the **Cluster summary** page, choose **View client information**. This shows you the bootstrap brokers, as well as the Apache ZooKeeper connection string.

### Getting the Apache ZooKeeper connection string using the AWS CLI
<a name="get-connection-string-cli"></a>

1. If you don't know the Amazon Resource Name (ARN) of your cluster, you can find it by listing all the clusters in your account. For more information, see [List Amazon MSK clusters](msk-list-clusters.md).

1. To get the Apache ZooKeeper connection string, along with other information about your cluster, run the following command, replacing *ClusterArn* with the ARN of your cluster. 

   ```
   aws kafka describe-cluster --cluster-arn ClusterArn
   ```

   The output of this `describe-cluster` command looks like the following JSON example.

   ```
   {
       "ClusterInfo": {
           "BrokerNodeGroupInfo": {
               "BrokerAZDistribution": "DEFAULT",
               "ClientSubnets": [
                   "subnet-0123456789abcdef0",
                   "subnet-2468013579abcdef1",
                   "subnet-1357902468abcdef2"
               ],
               "InstanceType": "kafka.m5.large",
               "StorageInfo": {
                   "EbsStorageInfo": {
                       "VolumeSize": 1000
                   }
               }
           },
           "ClusterArn": "arn:aws:kafka:us-east-1:111122223333:cluster/testcluster/12345678-abcd-4567-2345-abcdef123456-2",
           "ClusterName": "testcluster",
           "CreationTime": "2018-12-02T17:38:36.75Z",
           "CurrentBrokerSoftwareInfo": {
               "KafkaVersion": "2.2.1"
           },
           "CurrentVersion": "K13V1IB3VIYZZH",
           "EncryptionInfo": {
               "EncryptionAtRest": {
                   "DataVolumeKMSKeyId": "arn:aws:kms:us-east-1:555555555555:key/12345678-abcd-2345-ef01-abcdef123456"
               }
           },
           "EnhancedMonitoring": "DEFAULT",
           "NumberOfBrokerNodes": 3,
           "State": "ACTIVE",
           "ZookeeperConnectString": "10.0.1.101:2018,10.0.2.101:2018,10.0.3.101:2018"
       }
   }
   ```

   The previous JSON example shows the `ZookeeperConnectString` key in the output of the `describe-cluster` command. Copy the value corresponding to this key and save it for when you need to create a topic on your cluster.
**Important**  
Your Amazon MSK cluster must be in the `ACTIVE` state for you to be able to obtain the Apache ZooKeeper connection string. When a cluster is still in the `CREATING` state, the output of the `describe-cluster` command doesn't include `ZookeeperConnectString`. If this is the case, wait a few minutes and then run the `describe-cluster` again after your cluster reaches the `ACTIVE` state.

### Getting the Apache ZooKeeper connection string using the API
<a name="get-connection-string-api"></a>

To get the Apache ZooKeeper connection string using the API, see [DescribeCluster](https://docs.aws.amazon.com//msk/1.0/apireference/clusters-clusterarn.html#DescribeCluster).

## KRaft mode
<a name="kraft-intro"></a>

Amazon MSK introduced support for KRaft (Apache Kafka Raft) in Kafka version 3.7.x. The Apache Kafka community developed KRaft to replace [Apache ZooKeeper](#msk-get-connection-string) for metadata management in Apache Kafka clusters. In KRaft mode, cluster metadata is propagated within a group of Kafka controllers, which are part of the Kafka cluster, instead of across ZooKeeper nodes. KRaft controllers are included at no additional cost to you, and require no additional setup or management from you. See [KIP-500](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum) for more information about KRaft.

Here are some points to note about KRaft mode on MSK:
+ KRaft mode is only available for new clusters. You cannot switch metadata modes once the cluster is created.
+ On the MSK console, you can create a Kraft-based cluster by choosing Kafka version 3.7.x and selecting the KRaft checkbox in the cluster creation window.
+ To create a cluster in KRaft mode using the MSK API [https://docs.aws.amazon.com/msk/1.0/apireference/clusters.html#CreateCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters.html#CreateCluster) or [https://docs.aws.amazon.com/MSK/2.0/APIReference/v2-clusters.html#CreateClusterV2](https://docs.aws.amazon.com/MSK/2.0/APIReference/v2-clusters.html#CreateClusterV2) operations, you should use `3.7.x.kraft` as the version. Use `3.7.x` as the version to create a cluster in ZooKeeper mode.
+ The number of partitions per broker is the same on KRaft and ZooKeeper based clusters. However, KRaft allows you to host more partitions per cluster by provisioning [more brokers in a cluster](https://docs.aws.amazon.com/msk/latest/developerguide/limits.html).
+ There are no API changes required to use KRaft mode on Amazon MSK. However, if your clients still use the `--zookeeper` connection string today, you should update your clients to use the `--bootstrap-server` connection string to connect to your cluster. The `--zookeeper` flag is deprecated in Apache Kafka version 2.5 and is removed starting with Kafka version 3.0. We therefore recommend you use recent Apache Kafka client versions and the `--bootstrap-server` connection string for all connections to your cluster.
+ ZooKeeper mode continues to be available for all released versions where zookeeper is also supported by Apache Kafka. See [Supported Apache Kafka versions](supported-kafka-versions.md) for details on the end of support for Apache Kafka versions and future updates.
+ You should check that any tools you use are capable of using Kafka Admin APIs without ZooKeeper connections. Refer to [Use LinkedIn's Cruise Control for Apache Kafka with Amazon MSK](cruise-control.md) for updated steps to connect your cluster to Cruise Control. Cruise Control also has instructions for [running Cruise Control without ZooKeeper](https://github.com/linkedin/cruise-control/wiki/Run-without-ZooKeeper).
+ You do not need to access your cluster's KRaft controllers directly for any administrative actions. However, if you are using open monitoring to collect metrics, you also need the DNS endpoints of your controllers in order to collect some non-controller related metrics about your cluster. You can get these DNS endpoints from the MSK Console or using the [ListNodes](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-nodes.html#ListNodes) API operation. See [Monitor an MSK Provisioned cluster with Prometheus](open-monitoring.md) for updated steps for setting up open-monitoring for KRaft-based clusters.
+ There are no additional [CloudWatch metrics](https://docs.aws.amazon.com/msk/latest/developerguide/metrics-details.html) you need to monitor for KRaft mode clusters over ZooKeeper mode clusters. MSK manages the KRaft controllers used in your clusters.
+ You can continue managing ACLs using in KRaft mode clusters using the `--bootstrap-server` connection string. You should not use the `--zookeeper` connection string to manage ACLs. See [Apache Kafka ACLs](msk-acls.md).
+ In KRaft mode, your cluster’s metadata is stored on KRaft controllers within Kafka and not external ZooKeeper nodes. Therefore, you don't need to control access to controller nodes separately [as you do with ZooKeeper nodes](https://docs.aws.amazon.com/msk/latest/developerguide/zookeeper-security.html).

# Topic Operations
<a name="msk-topic-operations-information"></a>

You can use Amazon MSK APIs to manage topics in your MSK Provisioned cluster without the need to set up and maintain Kafka admin clients. With these APIs, you can define or read topic properties such as replication factor and partition count, along with configuration settings like retention and cleanup policies. You can programmatically manage Kafka topics using your familiar interfaces including AWS CLI, AWS SDKs, and AWS CloudFormation. These APIs are also integrated into the Amazon MSK console, bringing all topic operations to one place. You can now create or update topics with just a few clicks using guided defaults while gaining comprehensive visibility into topic configurations, partition-level information, and metrics.

**Important**  
These topic API responses reflect data that updates approximately every minute. For the most current topic state after making changes, allow approximately one minute before querying.

## Requirements for using topic APIs
<a name="topic-operations-requirements"></a>
+ Your cluster must be an MSK Provisioned cluster. These APIs are not available for MSK Serverless clusters.
+ Your cluster must be running Apache Kafka version 3.6.0 or later. For more information about supported versions, see [Supported Apache Kafka versions](supported-kafka-versions.md).
+ Your cluster must be in the `ACTIVE` state. For more information about cluster states, see [Understand MSK Provisioned cluster states](msk-cluster-states.md).
+ You must have the appropriate IAM permissions. For more information, see [IAM permissions for topic operations APIs](#topic-operations-permissions).

## IAM permissions for topic operations APIs
<a name="topic-operations-permissions"></a>

To call these APIs, you must have the appropriate IAM permissions. The following table lists the required permissions for each API.


**Required permissions for topic operations APIs**  

| API | Required Permissions | Resource | 
| --- | --- | --- | 
| ListTopics |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic`  | Cluster ARN, Topic ARN | 
| DescribeTopic |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:DescribeTopicDynamicConfiguration`  | Cluster ARN, Topic ARN | 
| DescribeTopicPartitions |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:DescribeTopicDynamicConfiguration`  | Cluster ARN, Topic ARN | 
| CreateTopic |  `kafka-cluster:Connect` `kafka-cluster:CreateTopic`  | Cluster ARN, Topic ARN | 
| DeleteTopic |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:DeleteTopic`  | Cluster ARN, Topic ARN | 
| UpdateTopic |  `kafka-cluster:Connect` `kafka-cluster:DescribeTopic` `kafka-cluster:AlterTopic` `kafka-cluster:AlterTopicDynamicConfiguration`  | Cluster ARN, Topic ARN | 

**Note**  
For `kafka-cluster:Connect`, specify the cluster ARN in your IAM policy. For all other actions, specify the topic ARN in your IAM policy.

**Note**  
For `ListTopics`, you can use a wildcard (\$1) to match all topics on a cluster. For example: `arn:aws:kafka:us-east-1:123456789012:topic/my-cluster/abcd1234-abcd-dcba-4321-a1b2abcd9f9f-2/*`.

For more information about IAM access control for Amazon MSK, see [IAM access control](iam-access-control.md).

**Topics**
+ [Requirements for using topic APIs](#topic-operations-requirements)
+ [IAM permissions for topic operations APIs](#topic-operations-permissions)
+ [List topics in an Amazon MSK cluster](msk-list-topics.md)
+ [Get detailed information about a topic](msk-describe-topic.md)
+ [View partition information for a topic](msk-describe-topic-partitions.md)
+ [Create topics in an Amazon MSK cluster](msk-create-topic.md)
+ [Update a topic in an Amazon MSK cluster](msk-update-topic.md)
+ [Delete a topic in an Amazon MSK cluster](msk-delete-topic.md)

# List topics in an Amazon MSK cluster
<a name="msk-list-topics"></a>

You can list all topics in your MSK Provisioned cluster to view basic metadata such as partition counts and replication factors. This is useful for monitoring your cluster's topics, performing inventory checks, or identifying topics for further investigation.

**Note**  
The `ListTopics` API provides basic topic metadata. To get detailed information about a specific topic, including its current status and configuration, use the `DescribeTopic` API. For more information, see [Get detailed information about a topic](msk-describe-topic.md).

**Note**  
This API response reflects data that updates approximately every minute. For the most current topic state after making changes, allow approximately one minute before querying.

**Topics**
+ [List topics using the AWS Management Console](list-topics-console.md)
+ [List topics using the AWS CLI](list-topics-cli.md)
+ [List topics using the API](list-topics-api.md)

# List topics using the AWS Management Console
<a name="list-topics-console"></a>

1. Sign in to the AWS Management Console, and open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. In the list of clusters, choose the name of the cluster for which you want to list topics.

1. On the cluster details page, choose the **Topics** tab.

1. The table shows all topics in the cluster, including the topic name, number of partitions, replication factor, and out-of-sync replica count.

# List topics using the AWS CLI
<a name="list-topics-cli"></a>

Run the following command, replacing *ClusterArn* with the Amazon Resource Name (ARN) of your cluster. If you don't have the ARN for your cluster, you can find it by listing all clusters. For more information, see [List Amazon MSK clusters](msk-list-clusters.md).

```
aws kafka list-topics --cluster-arn ClusterArn
```

The output of this command looks like the following JSON example.

```
{
    "topics": [
        {
            "topicArn": "arn:aws:kafka:us-east-1:123456789012:topic/MyCluster/abcd1234-abcd-dcba-4321-a1b2abcd9f9f-2/MyTopic",
            "topicName": "MyTopic",
            "partitionCount": 3,
            "replicationFactor": 3,
            "outOfSyncReplicaCount": 0
        },
        {
            "topicArn": "arn:aws:kafka:us-east-1:123456789012:topic/MyCluster/abcd1234-abcd-dcba-4321-a1b2abcd9f9f-2/AnotherTopic",
            "topicName": "AnotherTopic",
            "partitionCount": 6,
            "replicationFactor": 3,
            "outOfSyncReplicaCount": 1
        }
    ]
}
```

## Paginating results
<a name="list-topics-pagination"></a>

If your cluster has many topics, you can use pagination to retrieve results in smaller batches. Use the `--max-results` parameter to specify the maximum number of topics to return, and use the `--next-token` parameter to retrieve the next page of results.

```
aws kafka list-topics --cluster-arn ClusterArn --max-results 10
```

If there are more results available, the response includes a `nextToken` value. Use this token to retrieve the next page of results.

```
aws kafka list-topics --cluster-arn ClusterArn --max-results 10 --next-token NextToken
```

## Filtering topics by name
<a name="list-topics-filter"></a>

You can filter the list of topics by specifying a prefix using the `--topic-name-filter` parameter. This returns only topics whose names start with the specified prefix.

```
aws kafka list-topics --cluster-arn ClusterArn --topic-name-filter "prod-"
```

This command returns only topics whose names start with `prod-`, such as `prod-orders` or `prod-inventory`.

# List topics using the API
<a name="list-topics-api"></a>

To list topics using the API, see [ListTopics](https://docs.aws.amazon.com//msk/1.0/apireference/v1-clusters-clusterarn-topics.html#ListTopics).

# Get detailed information about a topic
<a name="msk-describe-topic"></a>

You can retrieve detailed information about a specific topic in your MSK Provisioned cluster, including its current status, partition count, replication factor, and configuration. This is useful for troubleshooting, validating topic settings, or monitoring topic status during operations.

**Note**  
This API response reflects data that updates approximately every minute. For the most current topic state after making changes, allow approximately one minute before querying.

**Topics**
+ [Describe a topic using the AWS Management Console](describe-topic-console.md)
+ [Describe a topic using the AWS CLI](describe-topic-cli.md)
+ [Describe a topic using the API](describe-topic-api.md)

# Describe a topic using the AWS Management Console
<a name="describe-topic-console"></a>

1. Sign in to the AWS Management Console, and open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. In the list of clusters, choose the name of the cluster that contains the topic you want to describe.

1. On the cluster details page, choose the **Topics** tab.

1. In the list of topics, choose the name of the topic you want to view.

1. The topic details page shows information about the topic, including its status, partition count, replication factor, and configuration settings.

# Describe a topic using the AWS CLI
<a name="describe-topic-cli"></a>

Run the following command, replacing *ClusterArn* with the Amazon Resource Name (ARN) of your cluster and *TopicName* with the name of the topic you want to describe.

```
aws kafka describe-topic --cluster-arn ClusterArn --topic-name TopicName
```

The output of this command looks like the following JSON example.

```
{
    "topicArn": "arn:aws:kafka:us-east-1:123456789012:topic/MyCluster/abcd1234-abcd-dcba-4321-a1b2abcd9f9f-2/MyTopic",
    "topicName": "MyTopic",
    "partitionCount": 3,
    "replicationFactor": 3,
    "configs": "Y29tcHJlc3Npb24udHlwZT1wcm9kdWNlcgpyZXRlbnRpb24ubXM9NjA0ODAwMDAw",
    "status": "ACTIVE"
}
```

## Understanding topic status
<a name="describe-topic-status"></a>

The `status` field indicates the current state of the topic. The following table describes the possible status values.


**Topic status values**  

| Status | Description | 
| --- | --- | 
| CREATING | The topic is being created. | 
| ACTIVE | The topic is active and ready for use. | 
| UPDATING | The topic configuration is being updated. | 
| DELETING | The topic is being deleted. | 

## Understanding topic configurations
<a name="describe-topic-configs"></a>

The `configs` field contains the topic's Kafka configuration properties, encoded in Base64 format. To view the configuration in a readable format, you need to decode the Base64 string.

The following example shows how to decode the configuration using the `base64` command on Linux or macOS.

```
echo "Y29tcHJlc3Npb24udHlwZT1wcm9kdWNlcgpyZXRlbnRpb24ubXM9NjA0ODAwMDAw" | base64 --decode
```

The decoded output shows the topic configuration properties in key-value format.

```
compression.type=producer
retention.ms=604800000
```

For more information about topic-level configuration properties, see [Topic-level Amazon MSK configuration](msk-configuration-properties.md#msk-topic-confinguration).

# Describe a topic using the API
<a name="describe-topic-api"></a>

To describe a topic using the API, see [DescribeTopic](https://docs.aws.amazon.com//msk/1.0/apireference/v1-clusters-clusterarn-topics-topicname.html#DescribeTopic).

# View partition information for a topic
<a name="msk-describe-topic-partitions"></a>

You can retrieve detailed information about the partitions of a specific topic in your MSK Provisioned cluster. This information includes the partition number, leader broker, replica brokers, and in-sync replicas (ISR). This is useful for monitoring partition distribution, identifying under-replicated partitions, or troubleshooting replication issues.

**Note**  
This API response reflects data that updates approximately every minute. For the most current topic state after making changes, allow approximately one minute before querying.

**Topics**
+ [View partition information using the AWS Management Console](describe-topic-partitions-console.md)
+ [View partition information using the AWS CLI](describe-topic-partitions-cli.md)
+ [View partition information using the API](describe-topic-partitions-api.md)

# View partition information using the AWS Management Console
<a name="describe-topic-partitions-console"></a>

1. Sign in to the AWS Management Console, and open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. In the list of clusters, choose the name of the cluster that contains the topic.

1. On the cluster details page, choose the **Topics** tab.

1. In the list of topics, choose the name of the topic for which you want to view partition information.

1. On the topic details page, the partition information is displayed, showing the partition number, leader broker, replicas, and in-sync replicas for each partition.

# View partition information using the AWS CLI
<a name="describe-topic-partitions-cli"></a>

Run the following command, replacing *ClusterArn* with the Amazon Resource Name (ARN) of your cluster and *TopicName* with the name of the topic.

```
aws kafka describe-topic-partitions --cluster-arn ClusterArn --topic-name TopicName
```

The output of this command looks like the following JSON example.

```
{
    "partitions": [
        {
            "partition": 0,
            "leader": 1,
            "replicas": [1, 2, 3],
            "isr": [1, 2, 3]
        },
        {
            "partition": 1,
            "leader": 2,
            "replicas": [2, 3, 1],
            "isr": [2, 3, 1]
        },
        {
            "partition": 2,
            "leader": 3,
            "replicas": [3, 1, 2],
            "isr": [3, 1]
        }
    ]
}
```

## Understanding partition information
<a name="describe-topic-partitions-fields"></a>

The response includes the following information for each partition:
+ **partition** — The partition number. Partitions are numbered starting from 0.
+ **leader** — The broker ID of the leader for this partition. The leader handles all read and write requests for the partition.
+ **replicas** — The list of broker IDs that have replicas of this partition. This includes both in-sync and out-of-sync replicas.
+ **isr** — The list of broker IDs that are in-sync replicas. These replicas are fully caught up with the leader and can take over as leader if needed.

In the example above, partition 2 has an out-of-sync replica. The `replicas` list includes broker 2, but the `isr` list does not. This indicates that broker 2 is not fully caught up with the leader for this partition.

## Paginating results
<a name="describe-topic-partitions-pagination"></a>

If your topic has many partitions, you can use pagination to retrieve results in smaller batches. Use the `--max-results` parameter to specify the maximum number of partitions to return, and use the `--next-token` parameter to retrieve the next page of results.

```
aws kafka describe-topic-partitions --cluster-arn ClusterArn --topic-name TopicName --max-results 10
```

If there are more results available, the response includes a `nextToken` value. Use this token to retrieve the next page of results.

```
aws kafka describe-topic-partitions --cluster-arn ClusterArn --topic-name TopicName --max-results 10 --next-token NextToken
```

## Common use cases
<a name="describe-topic-partitions-use-cases"></a>

Viewing partition information is useful for several scenarios:
+ **Identifying under-replicated partitions** — Compare the `replicas` and `isr` lists to identify partitions where some replicas are not in sync. This can indicate performance issues or broker problems.
+ **Monitoring partition distribution** — Check that partition leaders are evenly distributed across brokers to ensure balanced load.
+ **Troubleshooting replication issues** — Identify which brokers are having trouble keeping up with replication by examining the ISR list.
+ **Planning partition rebalancing** — Use this information to understand the current partition layout before performing rebalancing operations.

# View partition information using the API
<a name="describe-topic-partitions-api"></a>

To view partition information using the API, see [DescribeTopicPartitions](https://docs.aws.amazon.com//msk/1.0/apireference/v1-clusters-clusterarn-topics-topicname-partitions.html#DescribeTopicPartitions).

# Create topics in an Amazon MSK cluster
<a name="msk-create-topic"></a>

You can create topics in your MSK Provisioned cluster using this API directly without setting up any custom Kafka AdminClient. When creating a topic, you specify the topic name, partition count, replication factor, and optionally topic configurations.

**Topics**
+ [Create topics using the AWS Management Console](create-topic-console.md)
+ [Create a topic using the AWS CLI](create-topic-cli.md)
+ [Create a topic using the API](create-topic-api.md)

# Create topics using the AWS Management Console
<a name="create-topic-console"></a>

1. Sign in to the AWS Management Console, and open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. In the list of clusters, choose the name of the cluster where you want to create the topics.

1. On the cluster details page, choose the **Topics** tab.

1. Choose **Create topic**.

1. Enter the topic name, partition count, and replication factor. Optionally, add configurations. You can create multiple topics at once.

1. Choose **Create topic**.

# Create a topic using the AWS CLI
<a name="create-topic-cli"></a>

Run the following command, replacing *ClusterArn* with the Amazon Resource Name (ARN) of your cluster. If you don't have the ARN for your cluster, you can find it by listing all clusters. For more information, see [List Amazon MSK clusters](msk-list-clusters.md).

```
aws kafka create-topic --cluster-arn ClusterArn --topic-name MyTopic --partition-count 3 --replication-factor 3
```

The output of this command looks like the following JSON example.

```
{
    "topicArn": "arn:aws:kafka:us-east-1:123456789012:topic/MyCluster/abcd1234-abcd-dcba-4321-a1b2abcd9f9f-2/MyTopic",
    "topicName": "MyTopic",
    "status": "CREATING"
}
```

# Create a topic using the API
<a name="create-topic-api"></a>

To create a topic using the API, see [CreateTopic](https://docs.aws.amazon.com//msk/1.0/apireference/v1-clusters-clusterarn-topics.html#CreateTopic).

# Update a topic in an Amazon MSK cluster
<a name="msk-update-topic"></a>

Update the partition count or topic-level configurations for an existing topic. This operation modifies the topic without requiring recreation.

**Note**  
You can update either the partition count or the topic configurations in a single API call, but not both simultaneously. To update both, make separate API calls.

**Topics**
+ [Update a topic using the AWS Management Console](update-topic-console.md)
+ [Update a topic using the AWS CLI](update-topic-cli.md)
+ [Update a topic using the API](update-topic-api.md)

# Update a topic using the AWS Management Console
<a name="update-topic-console"></a>

1. Sign in to the AWS Management Console, and open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. In the list of clusters, choose the name of the cluster containing the topic you want to update.

1. On the cluster details page, choose the **Topics** tab.

1. Select the topic you want to update, then choose either **Edit partition settings** or **Edit configurations** from **Actions**.

1. Update the partition count or configurations as needed.

1. Choose **Save**.

# Update a topic using the AWS CLI
<a name="update-topic-cli"></a>

Run the following command, replacing *ClusterArn* with the Amazon Resource Name (ARN) of your cluster and *TopicName* with the name of the topic you want to update.

```
aws kafka update-topic --cluster-arn ClusterArn --topic-name TopicName --partition-count 6
```

The output of this command looks like the following JSON example.

```
{
    "topicArn": "arn:aws:kafka:us-east-1:123456789012:topic/MyCluster/abcd1234-abcd-dcba-4321-a1b2abcd9f9f-2/MyTopic",
    "topicName": "MyTopic",
    "status": "UPDATING"
}
```

# Update a topic using the API
<a name="update-topic-api"></a>

To update a topic using the API, see [UpdateTopic](https://docs.aws.amazon.com//msk/1.0/apireference/v1-clusters-clusterarn-topics-topicname.html#UpdateTopic).

# Delete a topic in an Amazon MSK cluster
<a name="msk-delete-topic"></a>

Deleting a topic permanently removes all its data, metadata, and partition information. This operation cannot be undone.

**Topics**
+ [Delete a topic using the AWS Management Console](delete-topic-console.md)
+ [Delete a topic using the AWS CLI](delete-topic-cli.md)
+ [Delete a topic using the API](delete-topic-api.md)

# Delete a topic using the AWS Management Console
<a name="delete-topic-console"></a>

1. Sign in to the AWS Management Console, and open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. In the list of clusters, choose the name of the cluster containing the topic you want to delete.

1. On the cluster details page, choose the **Topics** tab.

1. Select the topics you want to delete, then choose **Delete** from **Actions**.

1. Confirm the deletion, then choose **Delete**.

# Delete a topic using the AWS CLI
<a name="delete-topic-cli"></a>

Run the following command, replacing *ClusterArn* with the Amazon Resource Name (ARN) of your cluster and *TopicName* with the name of the topic you want to delete.

```
aws kafka delete-topic --cluster-arn ClusterArn --topic-name TopicName
```

The output of this command looks like the following JSON example.

```
{
    "topicArn": "arn:aws:kafka:us-east-1:123456789012:topic/MyCluster/abcd1234-abcd-dcba-4321-a1b2abcd9f9f-2/MyTopic",
    "topicName": "MyTopic",
    "status": "DELETING"
}
```

# Delete a topic using the API
<a name="delete-topic-api"></a>

To delete a topic using the API, see [DeleteTopic](https://docs.aws.amazon.com//msk/1.0/apireference/v1-clusters-clusterarn-topics-topicname.html#DeleteTopic).

# Amazon MSK resources
<a name="resources"></a>

The term *resources* has two meanings in Amazon MSK, depending on the context. In the context of APIs a resource is a structure on which you can invoke an operation. For a list of these resources and the operations that you can invoke on them, see [Resources](https://docs.aws.amazon.com/msk/1.0/apireference/resources.html) in the Amazon MSK API Reference. In the context of [IAM access control](iam-access-control.md), a resource is an entity to which you can allow or deny access, as defined in the [Authorization policy resources](kafka-actions.md#msk-iam-resources) section.

# Apache Kafka versions
<a name="kafka-versions"></a>

When you create an Amazon MSK cluster, you specify which Apache Kafka version you want to have on it. You can also update the Apache Kafka version of an existing cluster. The topics in the chapter help you understand timelines for Kafka version support and suggestions for best practices.

**Topics**
+ [Supported Apache Kafka versions](supported-kafka-versions.md)
+ [Amazon MSK version support](version-support.md)

# Supported Apache Kafka versions
<a name="supported-kafka-versions"></a>

Amazon Managed Streaming for Apache Kafka (Amazon MSK) supports the following Apache Kafka and Amazon MSK versions. The Apache Kafka community provides approximately 12 months of support for a version after its release date. For more details, see [Apache Kafka EOL (end of life) policy](https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan#TimeBasedReleasePlan-WhatIsOurEOLPolicy?).

The following table lists the Apache Kafka versions that Amazon MSK supports.


| Apache Kafka version | MSK release date | End of support date | 
| --- | --- | --- | 
| <a name="1.1.1-title"></a>[1.1.1](https://archive.apache.org/dist/kafka/1.1.1/RELEASE_NOTES.html) | -- | 2024-06-05 | 
| <a name="2.1.0-title"></a>[2.1.0](https://archive.apache.org/dist/kafka/2.1.0/RELEASE_NOTES.html) | -- | 2024-06-05 | 
| <a name="2.2.1-title"></a>[2.2.1](https://archive.apache.org/dist/kafka/2.2.1/RELEASE_NOTES.html) | 2019-07-31 | 2024-06-08 | 
| <a name="2.3.1-title"></a>[2.3.1](https://archive.apache.org/dist/kafka/2.3.1/RELEASE_NOTES.html) | 2019-12-19 | 2024-06-08 | 
| <a name="2.4.1-title"></a>[2.4.1](https://archive.apache.org/dist/kafka/2.4.1/RELEASE_NOTES.html) | 2020-04-02 | 2024-06-08 | 
| <a name="2.4.1.1-title"></a>[2.4.1.1](https://archive.apache.org/dist/kafka/2.4.1/RELEASE_NOTES.html) | 2020-09-09 | 2024-06-08 | 
| <a name="2.5.1-title"></a>[2.5.1](https://archive.apache.org/dist/kafka/2.5.1/RELEASE_NOTES.html) | 2020-09-30 | 2024-06-08 | 
| <a name="2.6.0-title"></a>[2.6.0](https://archive.apache.org/dist/kafka/2.6.0/RELEASE_NOTES.html) | 2020-10-21 | 2024-09-11 | 
| <a name="2.6.1-title"></a>[2.6.1](https://archive.apache.org/dist/kafka/2.6.1/RELEASE_NOTES.html) | 2021-01-19 | 2024-09-11 | 
| <a name="2.6.2-title"></a>[2.6.2](https://archive.apache.org/dist/kafka/2.6.2/RELEASE_NOTES.html) | 2021-04-29 | 2024-09-11 | 
| <a name="2.6.3-title"></a>[2.6.3](https://archive.apache.org/dist/kafka/2.6.3/RELEASE_NOTES.html) | 2021-12-21 | 2024-09-11 | 
| <a name="2.7.0-title"></a>[2.7.0](https://archive.apache.org/dist/kafka/2.7.0/RELEASE_NOTES.html) | 2020-12-29 | 2024-09-11 | 
| <a name="2.7.1-title"></a>[2.7.1](https://archive.apache.org/dist/kafka/2.7.1/RELEASE_NOTES.html) | 2021-05-25 | 2024-09-11 | 
| <a name="2.7.2-title"></a>[2.7.2](https://archive.apache.org/dist/kafka/2.7.2/RELEASE_NOTES.html) | 2021-12-21 | 2024-09-11 | 
| <a name="2.8.0-title"></a>[2.8.0](https://archive.apache.org/dist/kafka/2.8.0/RELEASE_NOTES.html) | 2021-05-19 | 2024-09-11 | 
| <a name="2.8.1-title"></a>[2.8.1](https://archive.apache.org/dist/kafka/2.8.1/RELEASE_NOTES.html) | 2022-10-28 | 2024-09-11 | 
| <a name="2.8.2-tiered-title"></a>[2.8.2-tiered](https://archive.apache.org/dist/kafka/2.8.2/RELEASE_NOTES.html) | 2022-10-28 | 2025-01-14 | 
| <a name="3.1.1-title"></a>[3.1.1](https://archive.apache.org/dist/kafka/3.1.1/RELEASE_NOTES.html) | 2022-06-22 | 2024-09-11 | 
| <a name="3.2.0-title"></a>[3.2.0](https://archive.apache.org/dist/kafka/3.2.0/RELEASE_NOTES.html) | 2022-06-22 | 2024-09-11 | 
| <a name="3.3.1-title"></a>[3.3.1](https://archive.apache.org/dist/kafka/3.3.1/RELEASE_NOTES.html) | 2022-10-26 | 2024-09-11 | 
| <a name="3.3.2-title"></a>[3.3.2](https://archive.apache.org/dist/kafka/3.3.2/RELEASE_NOTES.html) | 2023-03-02 | 2024-09-11 | 
| <a name="3.4.0-title"></a>[3.4.0](https://archive.apache.org/dist/kafka/3.4.0/RELEASE_NOTES.html) | 2023-05-04 | 2025-08-04 | 
| <a name="3.5.1-title"></a>[3.5.1](https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html) | 2023-09-26 | 2025-10-23 | 
| <a name="3.6.0-title"></a>[3.6.0](https://archive.apache.org/dist/kafka/3.6.0/RELEASE_NOTES.html) | 2023-11-16 | 2026-06-01 | 
| <a name="3.7.kraft"></a>[3.7.x](https://archive.apache.org/dist/kafka/3.7.0/RELEASE_NOTES.html) | 2024-05-29 | -- | 
| <a name="3.8-title"></a>[3.8.x](https://downloads.apache.org/kafka/3.8.0/RELEASE_NOTES.html) | 2025-02-20 | -- | 
| <a name="3.9-title"></a>[3.9.x](https://downloads.apache.org/kafka/3.9.0/RELEASE_NOTES.html) (Recommended) | 2025-04-21 | -- | 
| <a name="4.0-title"></a>[4.0.x](https://downloads.apache.org/kafka/4.0.0/RELEASE_NOTES.html) | 2025-05-16 | -- | 
| <a name="4.1-title"></a>[4.1.x](https://downloads.apache.org/kafka/4.1.0/RELEASE_NOTES.html) | 2025-10-15 | -- | 

For more information about Amazon MSK version support policy, see [Amazon MSK version support policy](version-support.md#version-support-policy).

## Amazon MSK version 4.1.x
<a name="4.1"></a>

Amazon Managed Streaming for Apache Kafka (Amazon MSK) now supports Apache Kafka version 4.1, which introduces Queues as a preview feature, a new Streams Rebalance Protocol in early access, and Eligible Leader Replicas (ELR). Along with these features, Apache Kafka version 4.1 includes various bug fixes and improvements.

A key highlight of Kafka 4.1 is the introduction of Queues as a preview feature. You can use multiple consumers to process messages from the same topic partitions, improving parallelism and throughput for workloads that need point-to-point message delivery. The new Streams Rebalance Protocol builds upon Kafka 4.0's consumer rebalance protocol, extending broker coordination capabilities to Kafka Streams for optimized task assignments and rebalancing. Additionally, ELR is now enabled by default to strengthen availability.

For more details and a complete list of improvements and bug fixes, see the [Apache Kafka release notes for version 4.1](https://downloads.apache.org/kafka/4.1.0/RELEASE_NOTES.html).

To start using Apache Kafka 4.1 on Amazon MSK, choose version 4.1.x when creating a new cluster through the AWS Management Console, AWS CLI, or AWS SDKs. You can also upgrade existing MSK provisioned clusters with an in-place rolling update. Amazon MSK orchestrates broker restarts to maintain availability and protect your data during the upgrade. Kafka version 4.1 support is available across all AWS Regions where Amazon MSK is offered.

## Amazon MSK version 4.0.x
<a name="4.0"></a>

Amazon Managed Streaming for Apache Kafka (Amazon MSK) now supports Apache Kafka version 4.0. This version brings the latest advancements in cluster management and performance to MSK Provisioned. Kafka 4.0 introduces a new consumer rebalance protocol, now generally available, that helps ensure smoother and faster group rebalances. In addition, Kafka 4.0 requires brokers and tools to use Java 17, providing improved security and performance, includes various bug fixes and improvements, and deprecates metadata management via Apache ZooKeeper.

For more details and a complete list of improvements and bug fixes, see the [Apache Kafka release notes for version 4.0](https://downloads.apache.org/kafka/4.0.0/RELEASE_NOTES.html).

## Amazon MSK version 3.9.x (Recommended)
<a name="3.9"></a>

Amazon Managed Streaming for Apache Kafka (Amazon MSK) now supports Apache Kafka version 3.9. This version enhances tiered storage functionality by letting you retain tiered data when you disable tiered storage at the topic level. Your consumer applications can read historical data from the remote log start offset (Rx) while maintaining continuous log offsets across local and remote storage.

Version 3.9 is the last version to support both ZooKeeper and KRaft metadata management systems. Amazon MSK will provide extended support for version 3.9 for a minimum of two years from its release date.

For more details and a complete list of improvements and bug fixes, see the [Apache Kafka release notes for version 3.9.x](https://downloads.apache.org/kafka/3.9.0/RELEASE_NOTES.html).

## Amazon MSK version 3.8.x
<a name="3.8"></a>

Amazon Managed Streaming for Apache Kafka (Amazon MSK) now supports Apache Kafka version 3.8. You can now create new clusters using version 3.8 with either KRAFT or ZooKeeper mode for metadata management or upgrade your existing ZooKeeper based clusters to use version 3.8. Apache Kafka version 3.8 includes several bug fixes and new features that improve performance. Key new features include support for compression level configuration. This allows you to further optimize your performance when using compression types such as lz4, zstd and gzip, by allowing you to change the default compression level. 

For more details and a complete list of improvements and bug fixes, see the [Apache Kafka release notes for version 3.8.x](https://downloads.apache.org/kafka/3.8.0/RELEASE_NOTES.html).

## Apache Kafka version 3.7.x (with production-ready tiered storage)
<a name="3.7.kraft"></a>

Apache Kafka version 3.7.x on MSK includes support for Apache Kafka version 3.7.0. You can create clusters or upgrade existing clusters to use the new 3.7.x version. With this change in version naming, you no longer have to adopt newer patch fix versions such as 3.7.1 when they are released by the Apache Kafka community. Amazon MSK will automatically update 3.7.x to support future patch versions once they become available. This allows you to benefit from the security and bug fixes available through patch fix versions without triggering a version upgrade. These patch fix versions released by Apache Kafka don't break version compatibility and you can benefit from the new patch fix versions without worrying about read or write errors for your client applications. Please make sure your infrastructure automation tools, such as CloudFormation, are updated to account for this change in version naming.

Amazon MSK now supports KRaft mode (Apache Kafka Raft) in Apache Kafka version 3.7.x. On Amazon MSK, like with ZooKeeper nodes, KRaft controllers are included at no additional cost to you, and require no additional setup or management from you. You can now create clusters in either KRaft mode or ZooKeeper mode on Apache Kafka version 3.7.x. In Kraft mode, you can add up to 60 brokers to host more partitions per-cluster, without requesting a limit increase, compared to the 30-broker quota on Zookeeper-based clusters. To learn more about KRaft on MSK, see [KRaft mode](metadata-management.md#kraft-intro).

Apache Kafka version 3.7.x also includes several bug fixes and new features that improve performance. Key improvements include leader discovery optimizations for clients and log segment flush optimization options. For a complete list of improvements and bug fixes, see the Apache Kafka release notes for [3.7.0](https://archive.apache.org/dist/kafka/3.7.0/RELEASE_NOTES.html).

## Apache Kafka version 3.6.0 (with production-ready tiered storage)
<a name="3.6.0"></a>

For information about Apache Kafka version 3.6.0 (with production-ready tiered storage), see its [release notes](https://archive.apache.org/dist/kafka/3.6.0/RELEASE_NOTES.html) on the Apache Kafka downloads site.

Amazon MSK will continue to use and manage Zookeeper for quorum management in this release for stability.

## Amazon MSK version 3.5.1
<a name="3.5.1"></a>

Amazon Managed Streaming for Apache Kafka (Amazon MSK) now supports Apache Kafka version 3.5.1 for new and existing clusters. Apache Kafka 3.5.1 includes several bug fixes and new features that improve performance. Key features include the introduction of new rack-aware partition assignment for consumers. Amazon MSK will continue to use and manage Zookeeper for quorum management in this release. For a complete list of improvements and bug fixes, see the Apache Kafka release notes for 3.5.1. 

For information about Apache Kafka version 3.5.1, see its [release notes](https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html) on the Apache Kafka downloads site.

## Amazon MSK version 3.4.0
<a name="3.4.0"></a>

Amazon Managed Streaming for Apache Kafka (Amazon MSK) now supports Apache Kafka version 3.4.0 for new and existing clusters. Apache Kafka 3.4.0 includes several bug fixes and new features that improve performance. Key features include a fix to improve stability to fetch from the closest replica. Amazon MSK will continue to use and manage Zookeeper for quorum management in this release. For a complete list of improvements and bug fixes, see the Apache Kafka release notes for 3.4.0.

For information about Apache Kafka version 3.4.0, see its [release notes](https://archive.apache.org/dist/kafka/3.4.0/RELEASE_NOTES.html) on the Apache Kafka downloads site.

## Amazon MSK version 3.3.2
<a name="3.3.2"></a>

Amazon Managed Streaming for Apache Kafka (Amazon MSK) now supports Apache Kafka version 3.3.2 for new and existing clusters. Apache Kafka 3.3.2 includes several bug fixes and new features that improve performance. Key features include a fix to improve stability to fetch from the closest replica. Amazon MSK will continue to use and manage Zookeeper for quorum management in this release. For a complete list of improvements and bug fixes, see the Apache Kafka release notes for 3.3.2.

For information about Apache Kafka version 3.3.2, see its [release notes](https://archive.apache.org/dist/kafka/3.3.2/RELEASE_NOTES.html) on the Apache Kafka downloads site.

## Amazon MSK version 3.3.1
<a name="3.3.1"></a>

Amazon Managed Streaming for Apache Kafka (Amazon MSK) now supports Apache Kafka version 3.3.1 for new and existing clusters. Apache Kafka 3.3.1 includes several bug fixes and new features that improve performance. Some of the key features include enhancements to metrics and partitioner. Amazon MSK will continue to use and manage Zookeeper for quorum management in this release for stability. For a complete list of improvements and bug fixes, see the Apache Kafka release notes for 3.3.1.

For information about Apache Kafka version 3.3.1, see its [release notes](https://archive.apache.org/dist/kafka/3.3.1/RELEASE_NOTES.html) on the Apache Kafka downloads site.

## Amazon MSK version 3.1.1
<a name="3.1.1"></a>

Amazon Managed Streaming for Apache Kafka (Amazon MSK) now supports Apache Kafka version 3.1.1 and 3.2.0 for new and existing clusters. Apache Kafka 3.1.1 and Apache Kafka 3.2.0 includes several bug fixes and new features that improve performance. Some of the key features include enhancements to metrics and the use of topic IDs. MSK will continue to use and manage Zookeeper for quorum management in this release for stability. For a complete list of improvements and bug fixes, see the Apache Kafka release notes for 3.1.1 and 3.2.0.

For information about Apache Kafka version 3.1.1 and 3.2.0, see its [3.2.0 release notes](https://archive.apache.org/dist/kafka/3.2.0/RELEASE_NOTES.html) and [3.1.1 release notes](https://archive.apache.org/dist/kafka/3.1.1/RELEASE_NOTES.html) on the Apache Kafka downloads site.

## Amazon MSK tiered storage version 2.8.2.tiered
<a name="2.8.2.tiered"></a>

This release is an Amazon MSK-only version of Apache Kafka version 2.8.2, and is compatible with open source Apache Kafka clients.

The 2.8.2.tiered release contains tiered storage functionality that is compatible with APIs introduced in [KIP-405 for Apache Kafka](https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage). For more information about the Amazon MSK tiered storage feature, see [Tiered storage for Standard brokers](msk-tiered-storage.md).

## Apache Kafka version 2.5.1
<a name="2.5.1"></a>

Apache Kafka version 2.5.1 includes several bug fixes and new features, including encryption in-transit for Apache ZooKeeper and administration clients. Amazon MSK provides TLS ZooKeeper endpoints, which you can query with the [DescribeCluster ](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html#DescribeCluster) operation. 

The output of the [ DescribeCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html#DescribeCluster) operation includes the `ZookeeperConnectStringTls` node, which lists the TLS zookeeper endpoints.

The following example shows the `ZookeeperConnectStringTls` node of the response for the `DescribeCluster` operation:

```
"ZookeeperConnectStringTls": "z-3.awskafkatutorialc.abcd123.c3.kafka.us-east-1.amazonaws.com:2182,z-2.awskafkatutorialc.abcd123.c3.kafka.us-east-1.amazonaws.com:2182,z-1.awskafkatutorialc.abcd123.c3.kafka.us-east-1.amazonaws.com:2182"
```

For information about using TLS encryption with zookeeper, see [Using TLS security with Apache ZooKeeper](zookeeper-security-tls.md).

For more information about Apache Kafka version 2.5.1, see its [release notes](https://archive.apache.org/dist/kafka/2.5.1/RELEASE_NOTES.html) on the Apache Kafka downloads site.

## Amazon MSK bug-fix version 2.4.1.1
<a name="2.4.1.1"></a>

This release is an Amazon MSK-only bug-fix version of Apache Kafka version 2.4.1. This bug-fix release contains a fix for [KAFKA-9752](https://issues.apache.org/jira/browse/KAFKA-9752), a rare issue that causes consumer groups to continuously rebalance and remain in the `PreparingRebalance` state. This issue affects clusters running Apache Kafka versions 2.3.1 and 2.4.1. This release contains a community-produced fix that is available in Apache Kafka version 2.5.0. 

**Note**  
Amazon MSK clusters running version 2.4.1.1 are compatible with any Apache Kafka client that is compatible with Apache Kafka version 2.4.1.

We recommend that you use MSK bug-fix version 2.4.1.1 for new Amazon MSK clusters if you prefer to use Apache Kafka 2.4.1. You can update existing clusters running Apache Kafka version 2.4.1 to this version to incorporate this fix. For information about upgrading an existing cluster, see [Upgrade the Apache Kafka version](version-upgrades.md).

To work around this issue without upgrading the cluster to version 2.4.1.1, see the [Consumer group stuck in `PreparingRebalance` state](troubleshooting.md#consumer-group-rebalance) section of the [Troubleshoot your Amazon MSK cluster](troubleshooting.md) guide. 

## Apache Kafka version 2.4.1 (use 2.4.1.1 instead)
<a name="2.4.1"></a>

**Note**  
You can no longer create an MSK cluster with Apache Kafka version 2.4.1. Instead, you can use [Amazon MSK bug-fix version 2.4.1.1](#2.4.1.1) with clients compatible with Apache Kafka version 2.4.1. And if you already have an MSK cluster with Apache Kafka version 2.4.1, we recommend you update it to use Apache Kafka version 2.4.1.1 instead.

KIP-392 is one of the key Kafka Improvement Proposals that are included in the 2.4.1 release of Apache Kafka. This improvement allows consumers to fetch from the closest replica. To use this feature, set `client.rack` in the consumer properties to the ID of the consumer's Availability Zone. An example AZ ID is `use1-az1`. Amazon MSK sets `broker.rack` to the IDs of the Availability Zones of the brokers. You must also set the `replica.selector.class` configuration property to `org.apache.kafka.common.replica.RackAwareReplicaSelector`, which is an implementation of rack awareness provided by Apache Kafka. 

When you use this version of Apache Kafka, the metrics in the `PER_TOPIC_PER_BROKER` monitoring level appear only after their values become nonzero for the first time. For more information about this, see [`PER_TOPIC_PER_BROKER` Level monitoring](metrics-details.md#broker-topic-metrics). 

For information about how to find Availability Zone IDs, see [AZ IDs for Your Resource](https://docs.aws.amazon.com/ram/latest/userguide/working-with-az-ids.html) in the AWS Resource Access Manager user guide. 

For information about setting configuration properties, see [Amazon MSK Provisioned configuration](msk-configuration.md). 

For more information about KIP-392, see [Allow Consumers to Fetch from Closest Replica](https://cwiki.apache.org/confluence/display/KAFKA/KIP-392:+Allow+consumers+to+fetch+from+closest+replica) in the Confluence pages.

For more information about Apache Kafka version 2.4.1, see its [release notes](https://archive.apache.org/dist/kafka/2.4.1/RELEASE_NOTES.html) on the Apache Kafka downloads site.

# Amazon MSK version support
<a name="version-support"></a>

This topic describes the [Amazon MSK version support policy](#version-support-policy) and the procedure for [Upgrade the Apache Kafka version](version-upgrades.md). If you're upgrading your Kafka version, follow the best practices outlined in [Best practices for version upgrades](version-upgrades-best-practices.md).

**Topics**
+ [Amazon MSK version support policy](#version-support-policy)
+ [Upgrade the Apache Kafka version](version-upgrades.md)
+ [Best practices for version upgrades](version-upgrades-best-practices.md)

## Amazon MSK version support policy
<a name="version-support-policy"></a>

This section describes the support policy for Amazon MSK supported Kafka versions.
+ All Kafka versions are supported until they reach their end of support date. For details on end of support dates, see [Supported Apache Kafka versions](supported-kafka-versions.md). Upgrade your MSK cluster to the recommended Kafka version or higher version before the end of support date. For details about upgrading your Apache Kafka version, see [Upgrade the Apache Kafka version](version-upgrades.md). A cluster using a Kafka version after its end of support date is auto-upgraded to the recommended Kafka version. Automatic upgrades can happen at any time after the end of support date. You will not receive any notification before the upgrade.
+ MSK will phase out support for newly created clusters that use Kafka versions with published end of support dates.

# Upgrade the Apache Kafka version
<a name="version-upgrades"></a>

You can upgrade an existing MSK cluster to a newer version of Apache Kafka. Before upgrading your cluster's Kafka version, verify that your client-side software's version supports the features in the new Kafka version.

For information about how to make a cluster highly available during an upgrade, see [Build highly available clusters](bestpractices.md#ensure-high-availability).

**Upgrade the Apache Kafka version using the AWS Management Console**

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. In the navigation bar, choose the Region where you created the MSK cluster.

1. Choose the MSK cluster which you want to upgrade.

1. On the **Properties** tab, choose **Upgrade** in the **Apache Kafka version** section.

1. In the **Apache Kafka version** section, do the following:

   1. In the *Choose Apache Kafka version* dropdown list, choose the target version to which you want to upgrade. For example, choose **3.9.x**.

   1. (Optional) Choose **View version compatibility** to verify compatibility between your cluster's current version and the available upgrade versions. Then, select **Choose** to proceed.
**Note**  
Amazon MSK supports in-place upgrades to most Apache Kafka versions. However, when upgrading from a ZooKeeper-based Kafka version to a KRaft-based version, you must create a new cluster. Then, copy your data to the new cluster, and switch clients to the new cluster.

   1. (Optional) Choose the **Update cluster configuration** checkbox to apply configuration updates compatible with the new version. This enables the new version’s features and improvements.

      You can skip this step if you need to maintain your existing custom configurations.
**Note**  
Server-side upgrades don't automatically update client applications.
To maintain cluster stability, version downgrades aren't supported.

   1. Choose **Upgrade** to start the process.

**Upgrade the Apache Kafka version using the AWS CLI**

1. Run the following command, replacing *ClusterArn* with the Amazon Resource Name (ARN) that you obtained when you created your cluster. If you don't have the ARN for your cluster, you can find it by listing all clusters. For more information, see [List Amazon MSK clusters](msk-list-clusters.md).

   ```
   aws kafka get-compatible-kafka-versions --cluster-arn ClusterArn
   ```

   The output of this command includes a list of the Apache Kafka versions to which you can upgrade the cluster. It looks like the following example.

   ```
   {
       "CompatibleKafkaVersions": [
           {
               "SourceVersion": "2.2.1",
               "TargetVersions": [
                   "2.3.1",
                   "2.4.1",
                   "2.4.1.1",
                   "2.5.1"
               ]
           }
       ]
   }
   ```

1. Run the following command, replacing *ClusterArn* with the Amazon Resource Name (ARN) that you obtained when you created your cluster. If you don't have the ARN for your cluster, you can find it by listing all clusters. For more information, see [List Amazon MSK clusters](msk-list-clusters.md).

   Replace *Current-Cluster-Version* with the current version of the cluster. For *TargetVersion* you can specify any of the target versions from the output of the previous command.
**Important**  
Cluster versions aren't simple integers. To find the current version of the cluster, use the [DescribeCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html#DescribeCluster) operation or the [describe-cluster](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kafka/describe-cluster.html) AWS CLI command. An example version is `KTVPDKIKX0DER`.

   ```
   aws kafka update-cluster-kafka-version --cluster-arn ClusterArn --current-version Current-Cluster-Version --target-kafka-version TargetVersion
   ```

   The output of the previous command looks like the following JSON.

   ```
   {
       
       "ClusterArn": "arn:aws:kafka:us-east-1:012345678012:cluster/exampleClusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2",
       "ClusterOperationArn": "arn:aws:kafka:us-east-1:012345678012:cluster-operation/exampleClusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2/0123abcd-abcd-4f7f-1234-9876543210ef"
   }
   ```

1. To get the result of the `update-cluster-kafka-version` operation, run the following command, replacing *ClusterOperationArn* with the ARN that you obtained in the output of the `update-cluster-kafka-version` command.

   ```
   aws kafka describe-cluster-operation --cluster-operation-arn ClusterOperationArn
   ```

   The output of this `describe-cluster-operation` command looks like the following JSON example.

   ```
   {
       "ClusterOperationInfo": {
           "ClientRequestId": "62cd41d2-1206-4ebf-85a8-dbb2ba0fe259",
           "ClusterArn": "arn:aws:kafka:us-east-1:012345678012:cluster/exampleClusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2",
           "CreationTime": "2021-03-11T20:34:59.648000+00:00",
           "OperationArn": "arn:aws:kafka:us-east-1:012345678012:cluster-operation/exampleClusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2/0123abcd-abcd-4f7f-1234-9876543210ef",
           "OperationState": "UPDATE_IN_PROGRESS",
           "OperationSteps": [
               {
                   "StepInfo": {
                       "StepStatus": "IN_PROGRESS"
                   },
                   "StepName": "INITIALIZE_UPDATE"
               },
               {
                   "StepInfo": {
                       "StepStatus": "PENDING"
                   },
                   "StepName": "UPDATE_APACHE_KAFKA_BINARIES"
               },
               {
                   "StepInfo": {
                       "StepStatus": "PENDING"
                   },
                   "StepName": "FINALIZE_UPDATE"
               }
           ],
           "OperationType": "UPDATE_CLUSTER_KAFKA_VERSION",
           "SourceClusterInfo": {
               "KafkaVersion": "2.4.1"
           },
           "TargetClusterInfo": {
               "KafkaVersion": "2.6.1"
           }
       }
   }
   ```

   If `OperationState` has the value `UPDATE_IN_PROGRESS`, wait a while, then run the `describe-cluster-operation` command again. When the operation is complete, the value of `OperationState` becomes `UPDATE_COMPLETE`. Because the time required for Amazon MSK to complete the operation varies, you might need to check repeatedly until the operation is complete. 

**Upgrade the Apache Kafka version using the API**

1. Invoke the [GetCompatibleKafkaVersions](https://docs.aws.amazon.com//msk/1.0/apireference/compatible-kafka-versions.html#GetCompatibleKafkaVersions) operation to get a list of the Apache Kafka versions to which you can upgrade the cluster.

1. Invoke the [UpdateClusterKafkaVersion](https://docs.aws.amazon.com//msk/1.0/apireference/clusters-clusterarn-version.html#UpdateClusterKafkaVersion) operation to upgrade the cluster to one of the compatible Apache Kafka versions.

# Best practices for version upgrades
<a name="version-upgrades-best-practices"></a>

To ensure client continuity during the rolling update that is performed as part of the Kafka version upgrade process, review the configuration of your clients and your Apache Kafka topics as follows:
+ Set the topic replication factor (RF) to a minimum value of `2` for two-AZ clusters and a minimum value of `3` for three-AZ clusters. An RF value of `2` can lead to offline partitions during patching.
+ Set minimum in-sync replicas (minISR) to a maximum value of 1 less than your Replication Factor (RF), which is `miniISR = (RF) - 1`. This makes sure that the partition replica set can tolerate one replica being offline or under-replicated.
+ Configure clients to use multiple broker connection strings. Having multiple brokers in a client’s connection string allows for failover if a specific broker supporting client I/O begins to be patched. For information about how to get a connection string with multiple brokers, see [Getting the bootstrap brokers for an Amazon MSK cluster](https://docs.aws.amazon.com//msk/latest/developerguide/msk-get-bootstrap-brokers.html).
+ We recommend that you upgrade connecting clients to the recommended version or above to benefit from the features available in the new version. Client upgrades are not subject to the end of life (EOL) dates of your MSK cluster's Kafka version, and do not need to be completed by the EOL date. Apache Kafka provides a [bi-directional client compatibility policy](https://kafka.apache.org/protocol#protocol_compatibility) that allows older clients to work with newer clusters and vice versa.
+ Kafka clients using versions 3.x.x are likely to come with the following defaults: `acks=all` and `enable.idempotence=true`. `acks=all` is different from the previous default of `acks=1` and provides extra durability by ensuring that all in-sync replicas acknowledge the produce request. Similarly, the default for `enable.idempotence` was previously `false`. The change to `enable.idempotence=true` as the default lowers the likelihood of duplicate messages. These changes are considered best practice settings and may introduce a small amount of additional latency that's within normal performance parameters.
+ Use the recommended Kafka version when creating new MSK clusters. Using the recommended Kafka version allows you to benefit from the latest Kafka and MSK features.

# Troubleshoot your Amazon MSK cluster
<a name="troubleshooting"></a>

The following information can help you troubleshoot problems that you might have with your Amazon MSK cluster. You can also post your issue to [AWS re:Post](https://repost.aws/). For troubleshooting Amazon MSK Replicator, see [Troubleshoot MSK Replicator](msk-replicator-troubleshooting.md).

**Topics**
+ [Volume replacement causes disk saturation due to replication overload](#replication-overload-disk-saturation)
+ [Consumer group stuck in `PreparingRebalance` state](#consumer-group-rebalance)
+ [Error delivering broker logs to Amazon CloudWatch Logs](#cw-broker-logs-error)
+ [No default security group](#troubleshooting-shared-vpc)
+ [Cluster appears stuck in the CREATING state](#troubleshooting-cluster-stuck)
+ [Cluster state goes from CREATING to FAILED](#troubleshooting-cluster-failed)
+ [Cluster state is ACTIVE but producers cannot send data or consumers cannot receive data](#troubleshooting-nodata)
+ [AWS CLI doesn't recognize Amazon MSK](#troubleshooting-nocli)
+ [Partitions go offline or replicas are out of sync](#troubleshooting-offlinepartition-outofsyncreplicas)
+ [Disk space is running low](#troubleshooting-lowdiskspace)
+ [Memory running low](#troubleshooting-lowmemory)
+ [Producer gets NotLeaderForPartitionException](#troubleshooting-NotLeaderForPartitionException)
+ [Under-replicated partitions (URP) greater than zero](#troubleshooting-urp)
+ [Cluster has topics called \$1\$1amazon\$1msk\$1canary and \$1\$1amazon\$1msk\$1canary\$1state](#amazon_msk_canary)
+ [Partition replication fails](#partition_replication_fails)
+ [Unable to access cluster that has public access turned on](#public-access-issues)
+ [Unable to access cluster through IPv6 bootstrap](#dualstack-issues)
+ [Unable to access cluster from within AWS: Networking issues](#networking-trouble)
+ [Failed authentication: Too many connects](#troubleshoot-too-many-connects)
+ [Failed authentication: Session too short](#troubleshoot-session-too-short)
+ [MSK Serverless: Cluster creation fails](#troubleshoot-serverless-create-cluster-failure)
+ [Can’t update KafkaVersionsList in MSK configuration](#troubleshoot-kafkaversionslist-cfn-update-failure)

## Volume replacement causes disk saturation due to replication overload
<a name="replication-overload-disk-saturation"></a>

During unplanned volume hardware failure, Amazon MSK may replace the volume with a new instance. Kafka repopulates the new volume by replicating partitions from other brokers in the cluster. Once partitions are replicated and caught up, they are eligible for leadership and in-sync replica (ISR) membership. 

**Problem**  
In a broker recovering from volume replacement, some partitions of varying sizes may come back online before others. This can be problematic as those partitions can be serving traffic from the same broker that is still catching up (replicating) other partitions. This replication traffic can sometimes saturate the underlying volume throughput limits, which is 250 MiB per second in the default case. When this saturation occurs, any partitions that are already caught up will be impacted, resulting in latency across the cluster for any brokers sharing ISR with those caught up partitions (not just leader partitions due to remote acks `acks=all`). This problem is more common with larger clusters that have larger numbers of partitions that vary in size. 

**Recommendation**
+ To improve replication I/O posture, ensure that [best practice thread settings](https://docs.aws.amazon.com/msk/latest/developerguide/bestpractices.html#optimize-broker-threads) are in place.
+ To reduce the likelihood of underlying volume saturation, enable provisioned storage with a higher throughput. A min throughput value of 500 MiB/s is recommended for high throughput replication cases, but the actual value needed will vary with throughput and use case. [Provision storage throughput for Standard brokers in a Amazon MSK cluster](msk-provision-throughput.md). 
+ To minimize replication pressure, lower `num.replica.fetchers` to the default value of `2`.

## Consumer group stuck in `PreparingRebalance` state
<a name="consumer-group-rebalance"></a>

If one or more of your consumer groups is stuck in a perpetual rebalancing state, the cause might be Apache Kafka issue [KAFKA-9752](https://issues.apache.org/jira/browse/KAFKA-9752), which affects Apache Kafka versions 2.3.1 and 2.4.1.

To resolve this issue, we recommend that you upgrade your cluster to [Amazon MSK bug-fix version 2.4.1.1](supported-kafka-versions.md#2.4.1.1), which contains a fix for this issue. For information about updating an existing cluster to Amazon MSK bug-fix version 2.4.1.1, see [Upgrade the Apache Kafka version](version-upgrades.md).

 The workarounds for solving this issue without upgrading the cluster to Amazon MSK bug-fix version 2.4.1.1 are to either set the Kafka clients to use [Static membership protocol](#consumer-group-rebalance-static) , or to [Identify and reboot](#consumer-group-rebalance-reboot) the coordinating broker node of the stuck consumer group. 

### Implementing static membership protocol
<a name="consumer-group-rebalance-static"></a>

To implement Static Membership Protocol in your clients, do the following:

1. Set the `group.instance.id` property of your [ Kafka Consumers](https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html) configuration to a static string that identifies the consumer in the group. 

1. Ensure that other instances of the configuration are updated to use the static string.

1. Deploy the changes to your Kafka Consumers.

Using Static Membership Protocol is more effective if the session timeout in the client configuration is set to a duration that allows the consumer to recover without prematurely triggering a consumer group rebalance. For example, if your consumer application can tolerate 5 minutes of unavailability, a reasonable value for the session timeout would be 4 minutes instead of the default value of 10 seconds.

**Note**  
Using Static Membership Protocol only reduces the probability of encountering this issue. You may still encounter this issue even when using Static Membership Protocol.

### Rebooting the coordinating broker node
<a name="consumer-group-rebalance-reboot"></a>

To reboot the coordinating broker node, do the following:

1. Identify the group coordinator using the `kafka-consumer-groups.sh` command.

1. Restart the group coordinator of the stuck consumer group using the [ RebootBroker](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-reboot-broker.html#RebootBroker) API action.

## Error delivering broker logs to Amazon CloudWatch Logs
<a name="cw-broker-logs-error"></a>

When you try to set up your cluster to send broker logs to Amazon CloudWatch Logs, you might get one of two exceptions.

If you get an `InvalidInput.LengthOfCloudWatchResourcePolicyLimitExceeded` exception, try again but use log groups that start with `/aws/vendedlogs/`. For more information, see [Enabling Logging from Certain Amazon Web Services](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AWS-logs-and-resource-policy.html).

If you get an `InvalidInput.NumberOfCloudWatchResourcePoliciesLimitExceeded` exception, choose an existing Amazon CloudWatch Logs policy in your account, and append the following JSON to it.

```
{"Sid":"AWSLogDeliveryWrite","Effect":"Allow","Principal":{"Service":"delivery.logs.amazonaws.com"},"Action":["logs:CreateLogStream","logs:PutLogEvents"],"Resource":["*"]}
```

If you try to append the JSON above to an existing policy but get an error that says you've reached the maximum length for the policy you picked, try to append the JSON to another one of your Amazon CloudWatch Logs policies. After you append the JSON to an existing policy, try once again to set up broker-log delivery to Amazon CloudWatch Logs.

## No default security group
<a name="troubleshooting-shared-vpc"></a>

If you try to create a cluster and get an error indicating that there's no default security group, it might be because you are using a VPC that was shared with you. Ask your administrator to grant you permission to describe the security groups on this VPC and try again. For an example of a policy that allows this action, see [Amazon EC2: Allows Managing EC2 Security Groups Associated With a Specific VPC, Programmatically and in the Console ](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_examples_ec2_securitygroups-vpc.html).

## Cluster appears stuck in the CREATING state
<a name="troubleshooting-cluster-stuck"></a>

Sometimes cluster creation can take up to 30 minutes. Wait for 30 minutes and check the state of the cluster again.

## Cluster state goes from CREATING to FAILED
<a name="troubleshooting-cluster-failed"></a>

Try creating the cluster again.

## Cluster state is ACTIVE but producers cannot send data or consumers cannot receive data
<a name="troubleshooting-nodata"></a>
+ If the cluster creation succeeds (the cluster state is `ACTIVE`), but you can't send or receive data, ensure that your producer and consumer applications have access to the cluster. For more information, see the guidance in [Step 3: Create a client machine](create-client-machine.md).
+ If your producers and consumers have access to the cluster but still experience problems producing and consuming data, the cause might be [KAFKA-7697](https://issues.apache.org/jira/browse/KAFKA-7697), which affects Apache Kafka version 2.1.0 and can lead to a deadlock in one or more brokers. Consider migrating to Apache Kafka 2.2.1, which is not affected by this bug. For information about how to migrate, see [Migrate Kafka workloads to an Amazon MSK cluster](migration.md).

## AWS CLI doesn't recognize Amazon MSK
<a name="troubleshooting-nocli"></a>

If you have the AWS CLI installed, but it doesn't recognize the Amazon MSK commands, upgrade your AWS CLI to the latest version. For detailed instructions on how to upgrade the AWS CLI, see [Installing the AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-install.html). For information about how to use the AWS CLI to run Amazon MSK commands, see [Amazon MSK key features and concepts](operations.md).

## Partitions go offline or replicas are out of sync
<a name="troubleshooting-offlinepartition-outofsyncreplicas"></a>

These can be symptoms of low disk space. See [Disk space is running low](#troubleshooting-lowdiskspace).

## Disk space is running low
<a name="troubleshooting-lowdiskspace"></a>

See the following best practices for managing disk space: [Monitor disk space](bestpractices.md#bestpractices-monitor-disk-space) and [Adjust data retention parameters](bestpractices.md#bestpractices-retention-period).

## Memory running low
<a name="troubleshooting-lowmemory"></a>

If you see the `MemoryUsed` metric running high or `MemoryFree` running low, that doesn't mean there's a problem. Apache Kafka is designed to use as much memory as possible, and it manages it optimally.

## Producer gets NotLeaderForPartitionException
<a name="troubleshooting-NotLeaderForPartitionException"></a>

This is often a transient error. Set the producer's `retries` configuration parameter to a value that's higher than its current value.

## Under-replicated partitions (URP) greater than zero
<a name="troubleshooting-urp"></a>

The `UnderReplicatedPartitions` metric is an important one to monitor. In a healthy MSK cluster, this metric has the value 0. If it's greater than zero, that might be for one of the following reasons.
+ If `UnderReplicatedPartitions` is spiky, the issue might be that the cluster isn't provisioned at the right size to handle incoming and outgoing traffic. See [Best practices for Standard brokers](bestpractices.md).
+ If `UnderReplicatedPartitions` is consistently greater than 0 including during low-traffic periods, the issue might be that you've set restrictive ACLs that don't grant topic access to brokers. To replicate partitions, brokers must be authorized to both READ and DESCRIBE topics. DESCRIBE is granted by default with the READ authorization. For information about setting ACLs, see [Authorization and ACLs](https://kafka.apache.org/documentation/#security_authz) in the Apache Kafka documentation.

## Cluster has topics called \$1\$1amazon\$1msk\$1canary and \$1\$1amazon\$1msk\$1canary\$1state
<a name="amazon_msk_canary"></a>

You might see that your MSK cluster has a topic with the name `__amazon_msk_canary` and another with the name `__amazon_msk_canary_state`. These are internal topics that Amazon MSK creates and uses for cluster health and diagnostic metrics. These topics are negligible in size and can't be deleted.

## Partition replication fails
<a name="partition_replication_fails"></a>

Ensure that you haven't set ACLs on CLUSTER\$1ACTIONS.

## Unable to access cluster that has public access turned on
<a name="public-access-issues"></a>

If your cluster has public access turned on, but you still cannot access it from the internet, follow these steps:

1. Ensure that the cluster's security group's inbound rules allow your IP address and the cluster's port. For a list of cluster port numbers, see [Port information](port-info.md). Also ensure that the security group's outbound rules allow outbound communications. For more information about security groups and their inbound and outbound rules, see [Security groups for your VPC](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_SecurityGroups.html) in the Amazon VPC User Guide.

1. Make sure that your IP address and the cluster's port are allowed in the inbound rules of the cluster's VPC network ACL. Unlike security groups, network ACLs are stateless. This means that you must configure both inbound and outbound rules. In the outbound rules, allow all traffic (port range: 0-65535) to your IP address. For more information, see [Add and delete rules](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-network-acls.html#Rules) in the Amazon VPC User Guide. 

1. Make sure that you are using the public-access bootstrap-brokers string to access the cluster. An MSK cluster that has public access turned on has two different bootstrap-brokers strings, one for public access, and one for access from within AWS. For more information, see [Get the bootstrap brokers using the AWS Management Console](get-bootstrap-console.md).

## Unable to access cluster through IPv6 bootstrap
<a name="dualstack-issues"></a>

If you're having trouble connecting to a cluster using the provided IPv6 bootstrap strings, follow these steps:

1.  Ensure your client has both IPv4 and IPv6 addresses assigned. Your client application must be running in a subnet that has both IPv4 and IPv6 addressing enabled and properly configured. Check if your VPC has both IPv4 CIDR block and an associated IPv6 CIDR block, confirm your subnet has both IPv4 and IPv6 addresses enabled, and verify your EC2 instance or client environment has both IPv4 and IPv6 addresses assigned. For more information, see [IP addressing for your VPCs and subnets](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-ip-addressing.html) in the Amazon VPC User Guide. 

1.  Ensure relevant IPv6 ports are present in the security group inbound and outbound rules. Add inbound rules to allow traffic on the cluster's ports from your IPv6 addresses and configure outbound rules to allow IPv6 traffic. For specific port numbers, see [Port information](https://docs.aws.amazon.com/msk/latest/developerguide/port-info.html) in the MSK documentation. Remember to update both IPv4 and IPv6 rules if running in dual-stack mode. For more information about security groups and their inbound and outbound rules, see [Security groups for your VPC](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-security-groups.html) in the Amazon VPC User Guide. 

1.  Ensure JVM property configuration is correct for IPv6 support. In your client application, set `java.net.preferIPv6Addresses` to `true` and `java.net.preferIPv4Stack` to `false`. These settings can be configured either as system properties or JVM arguments. Restart your application after making these changes for them to take effect. 

## Unable to access cluster from within AWS: Networking issues
<a name="networking-trouble"></a>

If you have an Apache Kafka application that is unable to communicate successfully with an MSK cluster, start by performing the following connectivity test.

1. Use any of the methods described in [Get the bootstrap brokers for an Amazon MSK cluster](msk-get-bootstrap-brokers.md) to get the addresses of the bootstrap brokers.

1. In the following command replace *bootstrap-broker* with one of the broker addresses that you obtained in the previous step. Replace *port-number* with 9094 if the cluster is set up to use TLS authentication. If the cluster doesn't use TLS authentication, replace *port-number* with 9092. Run the command from the client machine.

   ```
   telnet bootstrap-broker port-number
   ```

   Where port-number is:
   + 9094 if the cluster is set up to use TLS authentication. 
   + 9092 If the cluster doesn't use TLS authentication.
   + A different port-number is required if public access is enabled.

   Run the command from the client machine.

1. Repeat the previous command for all the bootstrap brokers.

If the client machine is able to access the brokers, this means there are no connectivity issues. In this case, run the following command to check whether your Apache Kafka client is set up correctly. To get *bootstrap-brokers*, use any of the methods described in [Get the bootstrap brokers for an Amazon MSK cluster](msk-get-bootstrap-brokers.md). Replace *topic* with the name of your topic.

```
<path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list bootstrap-brokers --producer.config client.properties --topic topic
```

If the previous command succeeds, this means that your client is set up correctly. If you're still unable to produce and consume from an application, debug the problem at the application level.

If the client machine is unable to access the brokers, see the following subsections for guidance that is based on your client-machine setup. 

### Amazon EC2 client and MSK cluster in the same VPC
<a name="troubleshoot-ec2-client-in-cluster-vpc"></a>

If the client machine is in the same VPC as the MSK cluster, make sure the cluster's security group has an inbound rule that accepts traffic from the client machine's security group. For information about setting up these rules, see [Security Group Rules](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_SecurityGroups.html#SecurityGroupRules). For an example of how to access a cluster from an Amazon EC2 instance that's in the same VPC as the cluster, see [Get started using Amazon MSK](getting-started.md).

### Amazon EC2 client and MSK cluster in different VPCs
<a name="troubleshoot-peering-connection"></a>

If the client machine and the cluster are in two different VPCs, ensure the following: 
+ The two VPCs are peered.
+ The status of the peering connection is active.
+ The route tables of the two VPCs are set up correctly.

For information about VPC peering, see [Working with VPC Peering Connections](https://docs.aws.amazon.com/vpc/latest/peering/working-with-vpc-peering.html).

### On-premises client
<a name="troubleshoot-on-prem-client"></a>

In the case of an on-premises client that is set up to connect to the MSK cluster using Site-to-Site VPN, ensure the following:
+ The VPN connection status is `UP`. For information about how to check the VPN connection status, see [How do I check the current status of my VPN tunnel?](https://aws.amazon.com/premiumsupport/knowledge-center/check-vpn-tunnel-status/).
+ The route table of the cluster's VPC contains the route for an on-premises CIDR whose target has the format `Virtual private gateway(vgw-xxxxxxxx)`.
+ The MSK cluster's security group allows traffic on port 2181, port 9092 (if your cluster accepts plaintext traffic), and port 9094 (if your cluster accepts TLS-encrypted traffic).

For more Site-to-Site VPN troubleshooting guidance, see [Troubleshooting Client VPN](https://docs.aws.amazon.com/vpn/latest/clientvpn-admin/troubleshooting.html).

### Direct Connect
<a name="troubleshoot-direct-connect"></a>

If the client uses Direct Connect, see [Troubleshooting Direct Connect](https://docs.aws.amazon.com/directconnect/latest/UserGuide/Troubleshooting.html).

If the previous troubleshooting guidance doesn't resolve the issue, ensure that no firewall is blocking network traffic. For further debugging, use tools like `tcpdump` and `Wireshark` to analyze traffic and to make sure that it is reaching the MSK cluster.

## Failed authentication: Too many connects
<a name="troubleshoot-too-many-connects"></a>

The `Failed authentication ... Too many connects` error indicates that a broker is protecting itself because one or more IAM clients are trying to connect to it at an aggressive rate. To help brokers accept a higher rate of new IAM connections, you can increase the [https://kafka.apache.org/documentation/#producerconfigs_reconnect.backoff.ms](https://kafka.apache.org/documentation/#producerconfigs_reconnect.backoff.ms) configuration parameter.

To learn more about the rate limits for new connections per broker, see the [Amazon MSK quota](limits.md) page.

## Failed authentication: Session too short
<a name="troubleshoot-session-too-short"></a>

The `Failed authentication ... Session too short` error occurs when your client tries to connect to a cluster using IAM credentials that are about to expire. Make sure that you check how your IAM credentials are being refreshed. Most likely, the credentials are being replaced too close to session expiry which leads to issues on the server side, and authentication failures.

## MSK Serverless: Cluster creation fails
<a name="troubleshoot-serverless-create-cluster-failure"></a>

If you try to create an MSK Serverless cluster and the workflow fails, you may not have permission to create a VPC endpoint. Verify that your administrator has granted you permission to create a VPC endpoint by allowing the `ec2:CreateVpcEndpoint` action. 

For a complete list of permissions required to perform all Amazon MSK actions, see [AWS managed policy: AmazonMSKFullAccess](security-iam-awsmanpol-AmazonMSKFullAccess.md).

## Can’t update KafkaVersionsList in MSK configuration
<a name="troubleshoot-kafkaversionslist-cfn-update-failure"></a>

When you update the [KafkaVersionsList](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-msk-configuration.html#cfn-msk-configuration-kafkaversionslist) property in the [AWS::MSK::Configuration](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-msk-configuration.html) resource, the update fails with the following error.

```
Resource of type 'AWS::MSK::Configuration' with identifier '<identifierName>' already exists.
```

When you update the `KafkaVersionsList` property, AWS CloudFormation recreates a new configuration with the updated property before deleting the old configuration. The CloudFormation stack update fails because the new configuration uses the same name as the existing configuration. Such an update requires a [resource replacement](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-update-behaviors.html#update-replacement). To successfully update `KafkaVersionsList`, you must also update the [Name](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-msk-configuration.html#cfn-msk-configuration-name) property in the same operation.

In addition, if your configuration is attached with any clusters created using the AWS Management Console or AWS CLI, add the following to your configuration resource to prevent [failed resource deletion attempts](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/troubleshooting.html#troubleshooting-errors-resource-removed-not-deleted).

```
UpdateReplacePolicy: Retain
```

After the update succeeds, go to the Amazon MSK console and delete the old configuration. For information about MSK configurations, see [Amazon MSK Provisioned configuration](msk-configuration.md).