

# Step 4: Create a topic in the Amazon MSK cluster
<a name="create-topic"></a>

In this step of [Getting Started Using Amazon MSK](getting-started.md), you can create a topic using one of two approaches: using native AWS tools with the CreateTopic API, or using Apache Kafka AdminClient tools on a client machine.

**Warning**  
When using AWS tools with the CreateTopic API, verify that your cluster meets the requirements. For details, see [Requirements for using topic APIs](https://docs.aws.amazon.com/msk/latest/developerguide/msk-topic-operations-information.html#topic-operations-requirements).

**Warning**  
When using the AdminClient approach, Apache Kafka version numbers used in this tutorial are examples only. We recommend that you use the same version of the client as your MSK cluster version. An older client version might be missing certain features and critical bug fixes.

**Topics**
+ [Creating a topic using AWS tools](#create-topic-aws-tools)
+ [Determining your MSK cluster version](#find-msk-cluster-version)
+ [Creating a topic on the client machine](#create-topic-client-machine)

## Creating a topic using AWS tools
<a name="create-topic-aws-tools"></a>

You can create topics in your MSK cluster using AWS tools such as the AWS CLI, AWS SDKs, or the AWS Management Console. This approach provides a streamlined way to manage topics without requiring direct access to Kafka client tools.

For detailed information about creating topics using the AWS tools, see the [CreateTopic API developer guide](https://docs.aws.amazon.com/msk/latest/developerguide/msk-create-topic.html).

## Determining your MSK cluster version
<a name="find-msk-cluster-version"></a>

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. In the navigation bar, choose the Region where you created the MSK cluster.

1. Choose the MSK cluster.

1. Note the version of Apache Kafka used on the cluster.

1. Replace instances of Amazon MSK version numbers in this tutorial with the version obtained in Step 3.

## Creating a topic on the client machine
<a name="create-topic-client-machine"></a>

1. **Connect to your client machine.**

   1. Open the Amazon EC2 console at [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

   1. In the navigation pane, choose **Instances**. Then, select the check box beside the name of the client machine that you created in [Step 3: Create a client machine](create-client-machine.md).

   1. Choose **Actions**, and then choose **Connect**. Follow the instructions in the console to connect to your client machine.

1. **Install Java and set up the Kafka version environment variable.**

   1. Install Java on the client machine by running the following command.

      ```
      sudo yum -y install java-11
      ```

   1. Store the [Kafka version](#find-msk-cluster-version) of your MSK cluster in the environment variable, `KAFKA_VERSION`, as shown in the following command. You'll need this information throughout the setup.

      ```
      export KAFKA_VERSION={KAFKA VERSION}
      ```

      For example, if you're using version 3.6.0, use the following command.

      ```
      export KAFKA_VERSION=3.6.0
      ```

1. **Download and extract Apache Kafka.**

   1. Run the following command to download Apache Kafka. 

      ```
      wget https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz
      ```
**Note**  
The following list presents some alternative Kafka download information that you can use, if you encounter any issues.  
If you encounter connectivity issues or want to use a mirror site, try using the Apache mirror selector, as shown in the following command.  

        ```
        wget https://www.apache.org/dyn/closer.cgi?path=/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz
        ```
Download an appropriate version directly from the [Apache Kafka website](https://kafka.apache.org/downloads).

   1. Run the following command in the directory where you downloaded the TAR file in the previous step.

      ```
      tar -xzf kafka_2.13-$KAFKA_VERSION.tgz
      ```

   1. Store the full path to the newly created directory inside the `KAFKA_ROOT` environment variable.

      ```
      export KAFKA_ROOT=$(pwd)/kafka_2.13-$KAFKA_VERSION
      ```

1. **Set up authentication for your MSK cluster.**

   1. [Find the latest version](https://github.com/aws/aws-msk-iam-auth/releases/latest) of the Amazon MSK IAM client library. This library allows your client machine to access the MSK cluster using IAM authentication.

   1. Using the following commands, navigate to the `$KAFKA_ROOT/libs` directory and download the associated Amazon MSK IAM JAR that you found in the previous step. Make sure to replace *\$1LATEST VERSION\$1* with the actual version number you're downloading.

      ```
      cd $KAFKA_ROOT/libs
      ```

      ```
      wget https://github.com/aws/aws-msk-iam-auth/releases/latest/download/aws-msk-iam-auth-{LATEST VERSION}-all.jar
      ```
**Note**  
Before running any Kafka commands that interact with your MSK cluster, you might need to add the Amazon MSK IAM JAR file to your Java classpath. Set the `CLASSPATH` environment variable, as shown in the following example.  

      ```
      export CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-{LATEST VERSION}-all.jar
      ```
This sets the `CLASSPATH` for your entire session, making the JAR available to all subsequent Kafka commands.

   1. Go to the `$KAFKA_ROOT/config` directory to create the client configuration file.

      ```
      cd $KAFKA_ROOT/config
      ```

   1. Copy the following property settings and paste them into a new file. Save the file as **client.properties**.

      ```
      security.protocol=SASL_SSL
      sasl.mechanism=AWS_MSK_IAM
      sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
      sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
      ```

1. **(Optional) Adjust the Java heap size for Kafka tools. **

   If you encounter any memory-related issues or you're working with a large number of topics or partitions, you can adjust the Java heap size. To do this, set the `KAFKA_HEAP_OPTS` environment variable before running Kafka commands.

   The following example sets both the maximum and initial heap size to 512 megabytes. Adjust these values according to your specific requirements and available system resources.

   ```
   export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
   ```

1. **Get your cluster connection information.**

   1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

   1. Wait for the status of your cluster to become **Active**. This might take several minutes. After the status becomes **Active**, choose the cluster name. This takes you to a page containing the cluster summary.

   1. Choose **View client information**.

   1. Copy the connection string for the private endpoint.

      You'll get three endpoints for each of the brokers. Store one of these connection strings in the environment variable `BOOTSTRAP_SERVER`, as shown in the following command. Replace *<bootstrap-server-string>* with the actual value of the connection string.

      ```
      export BOOTSTRAP_SERVER=<bootstrap-server-string>
      ```

1. **Run the following command to create the topic.**

   ```
   $KAFKA_ROOT/bin/kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties --replication-factor 3 --partitions 1 --topic MSKTutorialTopic
   ```

   If you get a `NoSuchFileException` for the `client.properties` file, make sure that this file exists in the current working directory within the Kafka bin directory.
**Note**  
If you prefer not to set the `CLASSPATH` environment variable for your entire session, you can alternatively prefix each Kafka command with the `CLASSPATH` variable. This approach applies the classpath only to that specific command.  

   ```
   CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-{LATEST VERSION}-all.jar \
   $KAFKA_ROOT/bin/kafka-topics.sh --create \
   --bootstrap-server $BOOTSTRAP_SERVER \
   --command-config $KAFKA_ROOT/config/client.properties \
   --replication-factor 3 \
   --partitions 1 \
   --topic MSKTutorialTopic
   ```

1. **(Optional) Verify that the topic was created successfully.**

   1. If the command succeeds, you should see the following message: `Created topic MSKTutorialTopic.`

   1. List all topics to confirm your topic exists.

      ```
      $KAFKA_ROOT/bin/kafka-topics.sh --list --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties
      ```

   If the command is unsuccessful or you run into an error, see [Troubleshoot your Amazon MSK cluster](troubleshooting.md) for troubleshooting information.

1. **(Optional) Delete the environment variables you used in this tutorial.**

   If you want to keep your environment variables for the next steps in this tutorial, skip this step. Otherwise, you can unset these variables, as shown in the following example.

   ```
   unset KAFKA_VERSION KAFKA_ROOT BOOTSTRAP_SERVER CLASSPATH KAFKA_HEAP_OPTS
   ```

**Next Step**

[Step 5: Produce and consume data](produce-consume.md)