

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 步骤 4：在 Amazon MSK 集群中创建主题
<a name="create-topic"></a>

在 “[开始使用 Amazon MSK](getting-started.md)” 的这一步骤中，您可以使用以下两种方法之一创建主题：在 CreateTopic API 中使用原生 AWS 工具，或者在客户端计算机上使用 Apache Kafka AdminClient 工具。

**警告**  
在 CreateTopic API 中使用 AWS 工具时，请验证您的集群是否符合要求。有关详细信息，请参阅[使用主题的要求 APIs](https://docs.aws.amazon.com/msk/latest/developerguide/msk-topic-operations-information.html#topic-operations-requirements)。

**警告**  
使用该 AdminClient 方法时，本教程中使用的 Apache Kafka 版本号仅为示例。建议您使用与 MSK 集群版本相同的客户端版本。客户端版本较旧可能会缺少某些功能和关键错误修复。

**Topics**
+ [使用 AWS 工具创建主题](#create-topic-aws-tools)
+ [确定 MSK 集群版本](#find-msk-cluster-version)
+ [在客户端计算机上创建主题](#create-topic-client-machine)

## 使用 AWS 工具创建主题
<a name="create-topic-aws-tools"></a>

您可以使用 AWS CLI 或 AWS 管理控制台等 AWS 工具在 MSK 集群中创建主题。 AWS SDKs这种方法提供了一种无需直接访问 Kafka 客户端工具即可管理主题的简化方法。

有关使用这些 AWS 工具创建主题的详细信息，请参阅 [CreateTopic API 开发者指南](https://docs.aws.amazon.com/msk/latest/developerguide/msk-create-topic.html)。

## 确定 MSK 集群版本
<a name="find-msk-cluster-version"></a>

1. 在 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) 打开 Amazon MSK 控制台。

1. 在导航栏上，选择您在其中创建 MSK 集群的区域。

1. 选择 MSK 集群。

1. 请注意集群上所用 Apache Kafka 的版本。

1. 将本教程中的 Amazon MSK 版本号实例替换为在步骤 3 中获得的版本。

## 在客户端计算机上创建主题
<a name="create-topic-client-machine"></a>

1. **连接到客户端计算机。**

   1. 打开位于 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 的 Amazon EC2 控制台。

   1. 在导航窗格中，选择 **Instances (实例)**。然后选中您在 [步骤 3：创建客户端计算机](create-client-machine.md) 中创建的客户端计算机名称旁边的复选框。

   1. 选择 **Actions (操作)**，然后选择 **Connect (连接)**。按照控制台中的说明，连接到您的客户端计算机。

1. **安装 Java 并设置 Kafka 版本环境变量。**

   1. 通过运行以下命令在客户端计算机上安装 Java。

      ```
      sudo yum -y install java-11
      ```

   1. 将 MSK 集群的 [Kafka 版本](#find-msk-cluster-version)存储在环境变量 `KAFKA_VERSION` 中，如以下命令所示。整个设置过程都需要此信息。

      ```
      export KAFKA_VERSION={KAFKA VERSION}
      ```

      例如，如果您正在使用版本 3.6.0，则使用以下命令：

      ```
      export KAFKA_VERSION=3.6.0
      ```

1. **下载并提取 Apache Kafka。**

   1. 运行以下命令以下载 Apache Kafka。

      ```
      wget https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz
      ```
**注意**  
下表列出了一些替代的 Kafka 下载信息，如果遇到任何问题，可以使用这些信息。  
如果遇到连接问题或想要使用镜像站点，可尝试使用 Apache 镜像选择器，如以下命令所示。  

        ```
        wget https://www.apache.org/dyn/closer.cgi?path=/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz
        ```
直接从 [Apache Kafka 网站](https://kafka.apache.org/downloads)下载相应的版本。

   1. 在上一步中将 TAR 文件下载到的目录中运行以下命令。

      ```
      tar -xzf kafka_2.13-$KAFKA_VERSION.tgz
      ```

   1. 将新创建目录的完整路径存储在环境变量 `KAFKA_ROOT` 中。

      ```
      export KAFKA_ROOT=$(pwd)/kafka_2.13-$KAFKA_VERSION
      ```

1. **为 MSK 集群设置身份验证。**

   1. [查找最新版本的](https://github.com/aws/aws-msk-iam-auth/releases/latest) Amazon MSK IAM 客户端库。此库支持客户端计算机使用 IAM 身份验证来访问 MSK 集群。

   1. 使用以下命令，导航至 `$KAFKA_ROOT/libs` 目录，并下载在上一步中找到的相关 Amazon MSK IAM JAR。请务必*\$1LATEST VERSION\$1*替换为您正在下载的实际版本号。

      ```
      cd $KAFKA_ROOT/libs
      ```

      ```
      wget https://github.com/aws/aws-msk-iam-auth/releases/latest/download/aws-msk-iam-auth-{LATEST VERSION}-all.jar
      ```
**注意**  
在运行任何与 MSK 集群交互的 Kafka 命令之前，可能需要将 Amazon MSK IAM JAR 文件添加至 Java 类路径中。按下例所示设置 `CLASSPATH` 环境变量。  

      ```
      export CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-{LATEST VERSION}-all.jar
      ```
这将为整个会话设置 `CLASSPATH`，从而使 JAR 可供后续所有 Kafka 命令使用。

   1. 转到 `$KAFKA_ROOT/config` 目录以创建客户端配置文件。

      ```
      cd $KAFKA_ROOT/config
      ```

   1. 复制以下属性设置并将其粘贴到新文件中。将该文件保存为 **client.properties**。

      ```
      security.protocol=SASL_SSL
      sasl.mechanism=AWS_MSK_IAM
      sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
      sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
      ```

1. **（可选）调整 Kafka 工具的 Java 堆大小。**

   如果遇到任何内存相关问题，或正在处理大量的主题或分区，则可调整 Java 堆大小。为此，需在运行 Kafka 命令之前设置 `KAFKA_HEAP_OPTS` 环境变量。

   以下示例将最大堆大小和初始堆大小均设为 512 兆字节。根据具体要求和可用的系统资源调整这些值。

   ```
   export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
   ```

1. **获取集群连接信息。**

   1. 在 [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) 打开 Amazon MSK 控制台。

   1. 等待集群的状态变为**活动**。这可能需要花几分钟的时间。在状态变为**活动**后，选择集群名称。这会将您引导至包含集群摘要的页面。

   1. 选择**查看客户端信息**。

   1. 复制私有端点的连接字符串。

      您将为每个代理获得三个端点。将其中一个连接字符串存储在环境变量 `BOOTSTRAP_SERVER` 中，如以下命令所示。*<bootstrap-server-string>*替换为连接字符串的实际值。

      ```
      export BOOTSTRAP_SERVER=<bootstrap-server-string>
      ```

1. **运行以下命令以创建主题。**

   ```
   $KAFKA_ROOT/bin/kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties --replication-factor 3 --partitions 1 --topic MSKTutorialTopic
   ```

   如果 `client.properties` 文件出现 `NoSuchFileException`，请确保该文件存在于 Kafka bin 目录中的当前工作目录。
**注意**  
如果不想为整个会话设置环境变量 `CLASSPATH`，也可以在各个 Kafka 命令前添加 `CLASSPATH` 变量作为前缀。这种方法仅将类路径应用于特定命令。  

   ```
   CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-{LATEST VERSION}-all.jar \
   $KAFKA_ROOT/bin/kafka-topics.sh --create \
   --bootstrap-server $BOOTSTRAP_SERVER \
   --command-config $KAFKA_ROOT/config/client.properties \
   --replication-factor 3 \
   --partitions 1 \
   --topic MSKTutorialTopic
   ```

1. **（可选）验证是否已成功创建主题：**

   1. 如果此命令成功，您应该看到以下消息：`Created topic MSKTutorialTopic.`

   1. 列出所有主题以确认您的主题存在。

      ```
      $KAFKA_ROOT/bin/kafka-topics.sh --list --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties
      ```

   如果命令失败或遇到错误，请参阅[排查 Amazon MSK 集群的问题](troubleshooting.md)获取疑难解答信息。

1. **（可选）删除在本教程中使用的环境变量。**

   如果想在本教程中的后续步骤中保留环境变量，请跳过这一步。或者，也可以取消设置这些变量，如下例所示。

   ```
   unset KAFKA_VERSION KAFKA_ROOT BOOTSTRAP_SERVER CLASSPATH KAFKA_HEAP_OPTS
   ```

**下一步**

[步骤 5：生成和使用数据](produce-consume.md)