

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 什么是 Amazon MSK 复制器？
<a name="msk-replicator"></a>

Amazon MSK 复制器是一项 Amazon MSK 功能，它使您能够在不同或相同 AWS 区域的 Amazon MSK 集群之间可靠地复制数据。但源和目标集群必须位于同一个 AWS 账户中。借助 MSK 复制器，您可以轻松构建具有区域弹性的流媒体应用程序，以提高可用性和业务连续性。MSK 复制器可在 MSK 集群之间提供自动异步复制，无需编写自定义代码、管理基础设施或设置跨区域网络。

MSK 复制器会自动扩缩底层资源，这样您就可以按需复制数据，而无需监控或扩展容量。MSK Replicator 还会复制必要的 Kafka 元数据，包括主题配置、访问控制列表 (ACLs) 和使用者组偏移量。如果某个区域发生意外事件，您可以故障转移到另一个 AWS 区域并无缝地恢复处理。

MSK 复制器支持跨区域复制（CRR）和同区域复制（SRR）。在跨区域复制中，源和目标 MSK 集群位于不同的 AWS 区域。在同区域复制中，源 MSK 集群和目标 MSK 集群都在同一个 AWS 区域中。在将源和目标 MSK 集群与 MSK 复制器一起使用之前，您需要创建源集群和目标 MSK 集群。

**注意**  
MSK Replicator 支持以下 AWS 区域：美国东部（us-east-1，弗吉尼亚北部）；美国东部（us-east-2，俄亥俄州）；美国西部（us-west-2，俄勒冈）；欧洲（eu-west-1，爱尔兰）；欧洲（eu-central-1，法兰克福）；亚太地区（ap-southeast-1，爱尔兰）；欧洲（eu-central-1，法兰克福）apst-1，新加坡）；亚太地区（ap-southeast-2，悉尼）；欧洲（eu-north-1，斯德哥尔摩）；亚太地区（ap-southeast-1，孟买）；欧洲（eu-west-3，巴黎）；南美（sa-east-1，圣保罗）；亚太地区（ap-northeast-2，首尔）；欧洲（eu-west-2，伦敦）；亚太地区（ap-northeast-1，东京）；美国西部（us-west-1，加利福尼亚北部）；加拿大（ca-central-1，中部）；中国（北京）（cn-northeast-1）；中国（宁夏）（cn-northwest-1）；亚太地区（海得拉巴）（ap-southeast-2）、亚太地区（马来西亚）（ap-southeast-5）、亚太地区（泰国）（ap-southeast-7）、墨西哥（中部）(mx-central-1)、亚太地区（台北）（ap-east-2）、加拿大西部（卡尔加里）（ca-west-1）、欧洲（西班牙）（eu-southeast-2）、中东（巴林）（me-southeast-1）、亚太地区（雅加达）（ap-southeast-3）、非洲（开普省）城镇）（af-southeast-1）、中东（阿联酋）（me-central-1）、亚太地区（香港）（ap-east-1）、亚太地区（大阪）（ap-northeast-3）、亚洲太平洋（墨尔本）（ap-southeast-4）、欧洲（米兰）（eu-southeast-1）、以色列（特拉维夫）（il-central-1）、欧洲（苏黎世）（eu-central-2）和亚太地区（新西兰）（ap-southeast-6）

以下是 Amazon MSK 复制器的一些常见用法。
+ 构建多区域流媒体应用程序：无需设置自定义解决方案即可构建高度可用且具有容错能力的流媒体应用程序，以提高弹性。
+ 更低延迟的数据访问：为不同地理区域的使用器提供更低延迟的数据访问。
+ 将数据分发给您的合作伙伴：将数据从一个 Apache Kafka 集群复制到多个 Apache Kafka 集群，这样不同的集群就 teams/partners 有自己的数据副本。
+ 聚合数据进行分析：将来自多个 Apache Kafka 集群的数据复制到一个集群中，以便轻松生成有关聚合实时数据的见解。
+ 本地写入，全局访问您的数据：设置多活复制，自动将在一个 AWS 区域执行的写入操作传播到其他区域，从而以更低的延迟和成本提供数据。

# Amazon MSK 复制器的工作原理
<a name="msk-replicator-how-it-works"></a>

要开始使用 MSK Replicator，你需要在目标集群的区域中创建一个新的复制器。 AWS *MSK Replicator 会自动将主 AWS 区域中名为*源的*集群中的所有数据复制到目标区域中名为目标的集群。*源集群和目标集群可以位于相同或不同的 AWS 区域。如果目标集群尚不存在，则需要创建该集群。

当您创建 Replicator 时，MSK Replicator 会在目标集群的 AWS 区域中部署所有必需的资源，以优化数据复制延迟。复制延迟因许多因素而异，包括 MSK 集群 AWS 区域之间的网络距离、源集群和目标集群的吞吐容量以及源集群和目标集群上的分区数量。MSK 复制器会自动扩缩底层资源，这样您就可以按需复制数据，而无需监控或扩展容量。

## 数据复制
<a name="msk-replicator-data-replication"></a>

默认情况下，MSK 复制器将所有数据从源集群主题分区的最新偏移异步复制到目标集群。如果“检测和复制新的主题”设置已开启，则 MSK 复制器会自动检测新主题或主题分区并将其复制到目标集群。但是，复制器可能需要长达 30 秒的时间才能在目标集群上检测并创建新的主题或主题分区。在目标集群上创建主题之前，向源主题生成的任何消息都不会被复制。或者，如果想将主题上的现有消息复制到目标集群，则可以[在创建期间配置复制器](msk-replicator-prepare-cluster.md)，以从源集群主题分区中最早的偏移开始复制。

MSK 复制器不存储您的数据。数据从源集群使用，在内存中缓冲，然后写入目标集群。当数据成功写入或重试失败后，缓冲区会自动清除。MSK 复制器与您的集群之间的所有通信和数据始终在传输过程中加密。所有 MSK Replicator API 调用（例如`DescribeClusterV2`，`CreateTopic`，）`DescribeTopicDynamicConfiguration`都将在中捕获。 AWS CloudTrail您的 MSK 代理日志也将反映相同的内容。

MSK 复制器在目标集群中创建主题，复制器因子为 3。如果需要，您可以直接在目标集群上修改复制因子。

## 元数据复制
<a name="msk-replicator-metadata-replication"></a>

MSK 复制器还支持将元数据从源集群复制到目标集群。元数据包括主题配置、访问控制列表 (ACLs) 和使用者组偏移量。与数据复制一样，元数据复制也是异步发生的。为了获得更好的性能，MSK 复制器优先进行数据复制而不是元数据复制。

下表是 MSK Replicator 复制的访问控制列表 (ACLs) 的列表。


| 操作 | 研究 | APIs 允许 | 
| --- | --- | --- | 
|  更改  |  Topic  |  CreatePartitions  | 
|  AlterConfigs  |  Topic  |  AlterConfigs  | 
|  Create  |  Topic  |  CreateTopics，元数据  | 
|  删除  |  Topic  |  DeleteRecords, DeleteTopics  | 
|  描述  |  Topic  |  ListOffsets、元数据、 OffsetFetch、 OffsetForLeaderEpoch  | 
|  DescribeConfigs  |  Topic  |  DescribeConfigs  | 
|  读取  |  Topic  |  获取， OffsetCommit， TxnOffsetCommit  | 
|  写入（仅拒绝）  |  Topic  |  生产， AddPartitionsToTxn  | 

MSK Replicator ACLs 仅为资源类型 Topic 复制文字模式类型。不复制前缀模式类型 ACLs 和其他 ACLs 资源类型。MSK Replicator 也不会在目标集群 ACLs 上删除。如果删除源集群上的 ACL，则还应同时删除目标集群上的 ACL。有关 Kafka ACLs 资源、模式和操作的更多详细信息，请参阅 [https://kafka.apache.org/documentation/\$1security\$1authz\$1cli](https://kafka.apache.org/documentation/#security_authz_cli)。

MSK Replicator 仅复制 Kafka ACLs，而 IAM 访问控制不使用它。如果您的客户使用 IAM 访问控制 read/write 来访问您的 MSK 集群，则还需要在目标集群上配置相关的 IAM 策略以实现无缝故障转移。带前缀和相同主题名称复制配置也是如此。

作为消费者组偏移量同步的一部分，MSK 复制器会针对源集群上的消费者进行优化，这些消费者从更靠近流末端的位置（主题分区的末尾）进行读取。如果您的消费者组在源集群上出现延迟，那么与源相比，您可能会看到目标上的消费者组的延迟更高。这意味着在失效转移到目标集群后，您的消费者将重新处理更多重复消息。为了减少此延迟后，源集群上的消费者需要赶上进度并从流末端（主题分区的末尾）开始消耗。当您的消费者赶上时，MSK 复制器将自动减少延迟。

![\[MSK 复制器源集群和目标集群\]](http://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/images/msk-replicator-diagram.png)


## 主题名称配置
<a name="msk-replicator-topic-name-configuration"></a>

MSK 复制器有两种主题名称配置模式：*带前缀*（默认）或*相同*主题名称复制。

**带前缀主题名称复制**  
默认情况下，MSK 复制器在目标集群中创建新主题，并在源集群主题名称中添加自动生成的前缀，例如 `<sourceKafkaClusterAlias>.topic`。这是为了将复制的主题与目标集群中的其他主题区分开来，并避免集群之间循环复制数据。

例如，MSK Replicator 将名为 “主题” 的主题中的数据从源集群复制到目标集群中名为 < Alias>.Topic 的新主题。sourceKafkaCluster您可以使用 `DescribeReplicator` API 或 MSK 控制台上的 **Replicator** 详细信息页面在 “**sourceKafkaCluster别名**” 字段下找到将添加到目标集群中主题名称的前缀。目标集群中的前缀是 < sourceKafkaCluster Alias>。

为确保您的消费者能够可靠地从备用集群重新启动处理，您需要将消费者配置为使用通配符运算符 `.*` 从主题中读取数据。例如，您的消费者需要使用消费。 `*topic1`在两个 AWS 地区。此示例还将包括 `footopic1` 之类的主题，因此请根据需要调整通配符运算符。

如果要将复制器数据保存在目标集群的单独主题中（例如主动-主动集群设置），则应使用 MSK 复制器，它会添加前缀。

**相同主题名称复制**  
作为默认设置的替代方案，Amazon MSK 复制器允许您在主题复制设置为相同主题名称复制的情况下创建复制器（控制台中为**保留相同的主题名称**）。您可以在拥有目标 MSK 集群的 AWS 区域中创建新的复制器。同名的复制主题可以避免避免重新配置客户端来读取复制的主题。

相同主题名称复制（控制台中为**保留相同的主题名称**）具有以下优点：
+ 允许您在复制过程中保留相同的主题名称，同时自动避免无限复制循环的风险。
+ 使设置和操作多集群流架构变得更简单，因为您可以避免重新配置客户端来读取复制的主题。
+ 对于主动-被动集群架构，相同主题名称复制功能还简化了失效转移过程，允许应用程序无缝失效转移到备用集群，而无需更改任何主题名称或重新配置客户端。
+ 可用于更轻松地将来自多个 MSK 集群的数据整合到单个集群中，以进行数据聚合或集中分析。这要求您为每个源集群和相同目标集群创建单独的复制器。
+ 通过将数据复制到目标集群中同名主题，可以简化从一个 MSK 集群到另一个 MSK 集群的数据迁移。

Amazon MSK 复制器使用 Kafka 标头自动避免将数据复制回其来源主题，从而消除复制期间无限循环的风险。标头是一个键值对，可以包含每个 Kafka 消息中的键、值和时间戳。MSK 复制器将源集群和主题的标识符嵌入到每个正在复制的记录的头中。MSK 复制器使用标头信息来避免无限复制循环。您应该验证您的客户端是否能够按预期读取复制的数据。

# 教程：为 Amazon MSK 复制器设置源集群和目标集群
<a name="msk-replicator-getting-started"></a>

本教程向您展示如何在同一 AWS 区域或不同 AWS 区域中设置源集群和目标集群。然后，您可以使用这些集群创建 Amazon MSK 复制器。

# 准备 Amazon MSK 源集群
<a name="msk-replicator-prepare-cluster"></a>

如果您已经为 MSK 复制器创建了 MSK 源集群，请确保它满足本节中描述的要求。否则，请按照以下步骤创建 MSK 预置或无服务器源集群。

创建跨区域和同区域的 MSK 复制器源集群的过程类似。差异将在以下过程中引用。

1. 在源区域中[开启了 IAM 访问控制](create-iam-access-control-cluster-in-console.md)的情况下创建 MSK 预置集群或无服务器集群。您的源集群必须至少有三个代理。

1. 对于跨区域 MSK 复制器，如果源是预置集群，请在为 IAM 访问控制方案开启多 VPC 私有连接的情况下对其进行配置。请注意，开启多 VPC 时，不支持未经身份验证的身份验证类型。对于其他身份验证方案（mTL），您无需为连接到您的 MSK 集群SASL/SCRAM). You can simultaneously use mTLS or SASL/SCRAM的其他客户端开启多 VPC 私有连接。您可以在控制台集群详细信息**网络设置**中或使用 `UpdateConnectivity` API 配置多 VPC 私有连接。请参阅[集群所有者开启多 VPC](mvpc-cluster-owner-action-turn-on.md)。如果您的源集群是 MSK Serverless 集群，则无需开启多 VPC 私有连接。

   对于同区域的 MSK 复制器，MSK 源集群不需要多 VPC 私有连接，并且其他客户端仍然可以使用未经身份验证的身份验证类型访问该集群。

1. 对于跨区域 MSK 复制器，您必须将基于资源的权限策略附加到源集群。这允许 MSK 连接到此集群以复制数据。您可以使用下面的 CLI 或 AWS 控制台程序执行此操作。另请参阅 [Amazon MSK 基于资源的策略](security_iam_service-with-iam.md)。对于同区域的 MSK 复制器，您不需要执行此步骤。

------
#### [ Console: create resource policy ]

使用以下 JSON 更新源集群策略。将占位符替换为源集群的 ARN。

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
    {
        "Effect": "Allow",
        "Principal": {
            "Service": [
                "kafka.amazonaws.com"
            ]
        },
        "Action": [
            "kafka:CreateVpcConnection",
            "kafka:GetBootstrapBrokers",
            "kafka:DescribeClusterV2"
        ],
        "Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/myCluster/abcd1234-5678-90ab-cdef-1234567890ab-1"
    }
  ]
}
```

使用集群详细信息页面上的**操作**菜单下的**编辑集群策略**选项。

![\[在控制台中编辑集群策略\]](http://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/images/edit-cluster-policy.png)


------
#### [ CLI: create resource policy ]

注意：如果您使用 AWS 控制台创建源集群并选择创建新 IAM 角色的选项，则会将所需的信任策略 AWS 附加到该角色。另一方面，如果您希望 MSK 使用现有 IAM 角色或您自己创建角色，请将以下信任策略附加到该角色，以便 MSK 复制器可以代入该角色。有关如何修改角色的信任关系的更多信息，请参阅[修改角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_manage_modify.html)。

1. 使用此命令获取 MSK 集群策略的当前版本。将占位符替换为实际的集群 ARN。

   ```
   aws kafka get-cluster-policy —cluster-arn <Cluster ARN>
   {
   "CurrentVersion": "K1PA6795UKM GR7",
   "Policy": "..."
   }
   ```

1. 创建基于资源的策略，以允许 MSK 复制器访问您的源集群。使用以下语法作为模板，将占位符替换为实际的源集群 ARN。

   ```
   aws kafka put-cluster-policy --cluster-arn "<sourceClusterARN>" --policy '{
   "Version": "2012-10-17", 		 	 	 
   "Statement": [
   {
   "Effect": "Allow",
   "Principal": {
   "Service": [
   "kafka.amazonaws.com"
   ]
   },
   "Action": [
   "kafka:CreateVpcConnection",
   "kafka:GetBootstrapBrokers",
   "kafka:DescribeClusterV2"
   ],
   "Resource": "<sourceClusterARN>"
   }
   ]
   ```

------

# 准备 Amazon MSK 目标集群
<a name="msk-replicator-prepare-target-cluster"></a>

在开启了 IAM 访问控制的情况下创建 MSK 目标集群（预置或无服务器集群）。目标集群不需要开启多 VPC 私有连接。目标集群可以与源集群位于同一 AWS 区域或不同的区域。源集群和目标集群必须位于同一个 AWS 账户中。您的目标集群必须至少有三个代理。

# 教程：创建 Amazon MSK 复制器
<a name="msk-replicator-create"></a>

在设置源集群和目标集群后，您可以使用这些集群创建 Amazon MSK 复制器。在您创建 Amazon MSK 复制器之前，请确保已拥有 [创建 MSK 复制器所需的 IAM 权限](msk-replicator-requirements.md#replicator-role-permissions-successful)。

**Contents**
+ [创建 Amazon MSK 复制器的注意事项](msk-replicator-requirements.md)
  + [创建 MSK 复制器所需的 IAM 权限](msk-replicator-requirements.md#replicator-role-permissions-successful)
  + [MSK 复制器支持的集群类型和版本](msk-replicator-supported-clusters-versions.md)
  + [支持的 MSK Serverless 集群配置](msk-replicator-serverless-requirements.md)
    + [集群配置更改](msk-replicator-serverless-requirements.md#msk-replicator-config-changes)
+ [在目标集群区域使用 AWS 控制台创建复制器](msk-replicator-create-console.md)
  + [选择源集群](msk-replicator-create-console.md#msk-replicator-create-console-choose-source)
  + [选择目标集群](msk-replicator-create-console.md#msk-replicator-create-console-choose-target)
  + [配置复制器设置和权限](msk-replicator-create-console.md#msk-replicator-create-settings)

# 创建 Amazon MSK 复制器的注意事项
<a name="msk-replicator-requirements"></a>

以下各节简要介绍了使用 MSK 复制器功能的先决条件、支持的配置和最佳实践。它涵盖了必要的权限、集群兼容性和 Serverless 特定的要求，以及有关创建后如何管理复制器的指导。

## 创建 MSK 复制器所需的 IAM 权限
<a name="replicator-role-permissions-successful"></a>

以下是创建 MSK 复制器所需的 IAM policy 示例。只有在创建 MSK 复制器时提供了标签的情况下，才需要执行 `kafka:TagResource` 操作。应将复制器 IAM 策略附加到与您的客户端对应的 IAM 角色。有关创建授权策略的信息，请参阅 [Create authorization policies](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html#create-iam-access-control-policies)。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Sid": "MSKReplicatorIAMPassRole",
      "Effect": "Allow",
      "Action": "iam:PassRole",
      "Resource": "arn:aws:iam::123456789012:role/MSKReplicationRole",
      "Condition": {
        "StringEquals": {
          "iam:PassedToService": "kafka.amazonaws.com"
        }
      }
    },
    {
      "Sid": "MSKReplicatorServiceLinkedRole",
      "Effect": "Allow",
      "Action": "iam:CreateServiceLinkedRole",
      "Resource": "arn:aws:iam::123456789012:role/aws-service-role/kafka.amazonaws.com/AWSServiceRoleForKafka*"
    },
    {
      "Sid": "MSKReplicatorEC2Actions",
      "Effect": "Allow",
      "Action": [
        "ec2:DescribeSubnets",
        "ec2:DescribeSecurityGroups",
        "ec2:DescribeVpcs",
        "ec2:CreateNetworkInterface"
      ],
      "Resource": [
        "arn:aws:ec2:us-east-1:123456789012:subnet/subnet-0abcd1234ef56789",
        "arn:aws:ec2:us-east-1:123456789012:security-group/sg-0123abcd4567ef89",
        "arn:aws:ec2:us-east-1:123456789012:network-interface/eni-0a1b2c3d4e5f67890",
        "arn:aws:ec2:us-east-1:123456789012:vpc/vpc-0a1b2c3d4e5f67890"
      ]
    },
    {
      "Sid": "MSKReplicatorActions",
      "Effect": "Allow",
      "Action": [
        "kafka:CreateReplicator",
        "kafka:TagResource"
      ],
      "Resource": [
        "arn:aws:kafka:us-east-1:123456789012:cluster/myCluster/abcd1234-56ef-78gh-90ij-klmnopqrstuv",
        "arn:aws:kafka:us-east-1:123456789012:replicator/myReplicator/wxyz9876-54vu-32ts-10rq-ponmlkjihgfe"
      ]
    }
  ]
}
```

------

以下是描述复制器的示例 IAM policy。需要 `kafka:DescribeReplicator` 操作或 `kafka:ListTagsForResource` 操作之一即可，而不是两者都需要。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "kafka:DescribeReplicator",
                "kafka:ListTagsForResource"
            ],
            "Resource": "*"
        }
    ]
}
```

------

# MSK 复制器支持的集群类型和版本
<a name="msk-replicator-supported-clusters-versions"></a>

这些是对支持的实例类型、Kafka 版本和网络配置的要求。
+ MSK 复制器支持 MSK 预置集群和 MSK Serverless 集群的任意组合，作为源集群和目标集群。MSK 复制器目前不支持其他类型的 Kafka 集群。
+ MSK Serverless 集群需要 IAM 访问控制，不支持 Apache Kafka ACL 复制，并且对主题配置复制的支持有限。请参阅[什么是 MSK Serverless？](serverless.md)。
+ 只有运行 Apache Kafka 2.7.0 或更高版本的集群才支持 MSK 复制器，无论您的源集群和目标集群是在相同区域还是位于不同的 AWS 区域。
+ MSK 复制器支持使用 m5.large 或更大的实例类型的集群。不支持 t3.small 集群。
+ 如果您将 MSK 复制器与 MSK 预置集群一起使用，则源集群和目标集群中至少需要三个代理。您可以在两个可用区的集群之间复制数据，但这些集群中至少需要四个代理。
+ 您的源 MSK 集群和目标 MSK 集群必须位于同一个 AWS 账户中。不支持跨不同账户的集群复制。
+ 如果源集群和目标 MSK 集群位于不同的 AWS 区域（跨区域），则 MSK Replicator 要求源集群为其 IAM 访问控制方法开启多 VPC 私有连接。

  源集群上跨 AWS 区域复制 MSK 的其他身份验证方法无需使用多 VPC。

  如果您要在同一 AWS 区域的集群之间复制数据，也不需要多 VPC。请参阅[单区域中的 Amazon MSK 多 VPC 私有连接](aws-access-mult-vpc.md)。
+ 相同主题名称复制（控制台中为**保留相同的主题名称**）需要运行 Kafka 版本 2.8.1 或更高版本的 MSK 集群。
+ 对于相同主题名称复制（控制台中为**保留相同的主题名称**）配置，为避免循环复制的风险，请不要更改 MSK 复制器创建的标头（`__mskmr`）。

# 支持的 MSK Serverless 集群配置
<a name="msk-replicator-serverless-requirements"></a>
+ MSK Serverless 支持在创建主题期间为 MSK Serverless 目标集群复制以下主题配置：`cleanup.policy`、`compression.type`、`max.message.bytes`、`retention.bytes`、`retention.ms`。
+ 在主题配置同步期间，MSK Serverless 仅支持以下主题配置：`compression.type`、`max.message.bytes`、`retention.bytes`、`retention.ms`。
+ 复制器在目标 MSK Serverless 集群上使用 83 个压缩分区。确保目标 MSK Serverless 集群有足够数量的压缩分区。请参阅[MSK Serverless 限额](limits.md#serverless-quota)。

## 集群配置更改
<a name="msk-replicator-config-changes"></a>
+ 建议您不要在创建 MSK 复制器后打开或关闭分层存储。如果您的目标集群未分层，则无论您的源集群是否分层，MSK 都不会复制分层存储配置。如果在创建复制器后在目标集群上开启分层存储，则需要重新创建复制器。如果要将数据从非分层集群复制到分层集群，则不应复制主题配置。请参阅[在现有主题上启用和禁用分层存储](https://docs.aws.amazon.com/msk/latest/developerguide/msk-enable-disable-topic-tiered-storage-cli.html)。
+ 创建 MSK 复制器后，请勿更改集群配置设置。集群配置设置将在创建 MSK 复制器期间进行验证。为避免 MSK 复制器出现问题，请勿在创建 MSK 复制器后更改以下设置。
  + 将 MSK 集群更改为 t3 实例类型。
  + 更改服务执行角色权限。
  + 禁用 MSK 多 VPC 私有连接。
  + 更改附加的集群基于资源的策略。
  + 更改集群安全组规则。

# 在目标集群区域使用 AWS 控制台创建复制器
<a name="msk-replicator-create-console"></a>

以下部分介绍了创建复制器的分步控制台工作流。

**复制器详细信息**

1. [在您的目标 MSK 集群所在的 AWS 区域，在家中打开 Amazon MSK 控制台？https://console.aws.amazon.com/msk/ region=us](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)-east-1\$1/home/。

1. 选择**复制器**以显示账户中的复制器列表。

1. 选择**创建复制器**。

1. 在**复制器详细信息**窗格中，为新的复制器指定一个唯一的名称。

## 选择源集群
<a name="msk-replicator-create-console-choose-source"></a>

源集群包含要复制到目标 MSK 集群的数据。

1. 在**源集群**窗格中，选择源集群所在的 AWS 区域。

   您可以通过前往 **MSK 集群**并查看**集群**详情 ARN 来查找集群的区域。区域名称嵌入在 ARN 字符串中。在以下示例 ARN 中，`ap-southeast-2` 位于集群区域中。

   ```
   arn:aws:kafka:ap-southeast-2:123456789012:cluster/cluster-11/eec93c7f-4e8b-4baf-89fb-95de01ee639c-s1
   ```

1. 输入您的源集群 ARN，或浏览以选择您的源集群。

1. 为您的源集群选择子网。

   控制台显示源集群区域中可用的子网供您选择。必须至少选择两个子网。对于同区域的 MSK 复制器，您选择的用于访问源集群的子网和用于访问目标集群的子网必须位于同一个可用区中。

1. 为 MSK 复制器选择安全组以访问您的源集群。
   + 对于跨区域复制（CRR），您不需要为源集群提供安全组。
   + 对于同区域复制 (SRR)，请转到位于的 Amazon EC2 控制台， https://console.aws.amazon.com/ec2/并确保您将为 Replicator 提供的安全组具有出站规则，允许流量进入源集群的安全组。此外，确保源集群的安全组具有入站规则允许来自为源提供的复制器安全组的流量。

      

**要将入站规则添加到源集群的安全组：**

     1. 在 AWS 控制台中，选择集群**名称，进入源集群**的详细信息。

     1. 选择**属性**选项卡，然后向下滚动到**网络设置**窗格，以选择所应用的**安全组**名称。

     1. 转到入站规则，然后选择**编辑入站规则**。

     1. 选择**添加规则**。

     1. 在新规则的**类型**列中，选择**自定义 TCP**。

     1. 在**端口范围**列中，键入 `9098`。MSK 复制器使用 IAM 访问控制连接到使用端口 9098 的集群。

     1. 在**源**列中，键入您将在为源集群创建 复制器期间提供的安全组的名称（这可能与 MSK 源集群的安全组相同），然后选择**保存规则**。

      

**要将出站规则添加到为源提供的复制器安全组：**

     1. 在 Amazon EC2 的 AWS 控制台中，转到您在为源创建复制器时将提供的安全组。

     1. 转到出站规则，然后选择**编辑出站规则**。

     1. 选择**添加规则**。

     1. 在新规则的**类型**列中，选择**自定义 TCP**。

     1. 在**端口范围**列中，键入 `9098`。MSK 复制器使用 IAM 访问控制连接到使用端口 9098 的集群。

     1. 在**源**列中，键入 MSK 源集群的安全组的名称，然后选择**保存规则**。

**注意**  
或者，如果不想使用安全组限制流量，则可以添加允许所有流量的入站和出站规则。  
1. 选择**添加规则**。  
2. 选择**类型**列中的**所有流量**。  
3. 在“源”列中，键入 `0.0.0.0/0`，然后选择**保存规则**。

## 选择目标集群
<a name="msk-replicator-create-console-choose-target"></a>

目标集群是源数据复制到的 MSK 预置集群或无服务器集群。

**注意**  
MSK 复制器在目标集群中创建新主题，并在主题名称中添加自动生成的前缀。例如，MSK 复制器将“`topic`”中的数据从源集群复制到目标集群中名为 `<sourceKafkaClusterAlias>.topic` 的新主题。这是为了将包含从源集群复制的数据的主题与目标集群中的其他主题区分开来，并避免在集群之间循环复制数据。您可以使用 `DescribeReplicator` API 或 MSK 控制台上的 **Replicator 详细信息**页面在 “**sourceKafkaCluster别名**” 字段下找到将添加到目标集群中主题名称的前缀。目标集群中的前缀是 `<sourceKafkaClusterAlias>`。

1. 在**目标集群**窗格中，选择目标集群所在的 AWS 区域。

1. 输入目标集群的 ARN 或浏览以选择目标集群。

1. 为目标集群选择子网。

   控制台显示目标集群区域中可用的子网供您选择。至少选择两个子网。

1. 为 MSK 复制器选择安全组以访问您的目标集群。

   将显示目标集群区域中可用的安全组供您选择。所选安全组与每个连接相关联。有关使用安全组的更多信息，请参阅 *Amazon VPC 用户指南*中的[使用安全组控制 AWS 资源流量](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-security-groups.html)。
   + 对于跨区域复制 (CRR) 和同区域复制 (SRR)，请访问位于的 Amazon EC2 控制台，[https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/)并确保您将提供给 Replicator 的安全组具有出站规则，允许流量进入目标集群的安全组。此外，请确保目标集群的安全组具有入站规则，以接受来自为目标提供的复制器安全组的流量。

      

**要将入站规则添加到目标集群的安全组：**

     1. 在 AWS 控制台中，选择集群**名称，进入目标集群**的详细信息。

     1. 选择**属性**选项卡，然后向下滚动到“网络设置”窗格，以选择所应用的**安全组**名称。

     1. 转到入站规则，然后选择**编辑入站规则**。

     1. 选择**添加规则**。

     1. 在新规则的**类型**列中，选择**自定义 TCP**。

     1. 在**端口范围**列中，键入 `9098`。MSK 复制器使用 IAM 访问控制连接到使用端口 9098 的集群。

     1. 在**源**列中，键入您将在为目标集群创建复制器期间提供的安全组的名称（这可能与 MSK 目标集群的安全组相同），然后选择**保存规则**。

      

**要将出站规则添加到为目标提供的复制器安全组：**

     1. 在 AWS 控制台中，转到您将在为目标创建 Replicator 期间提供的安全组。

     1. 选择**属性**选项卡，然后向下滚动到“网络设置”窗格，以选择所应用的**安全组**名称。

     1. 转到出站规则，然后选择**编辑出站规则**。

     1. 选择**添加规则**。

     1. 在新规则的**类型**列中，选择**自定义 TCP**。

     1. 在**端口范围**列中，键入 `9098`。MSK 复制器使用 IAM 访问控制连接到使用端口 9098 的集群。

     1. 在**源**列中，键入 MSK 目标集群的安全组的名称，然后选择**保存规则**。

**注意**  
或者，如果不想使用安全组限制流量，则可以添加允许所有流量的入站和出站规则。  
1. 选择**添加规则**。  
2. 选择**类型**列中的**所有流量**。  
3. 在“源”列中，键入 `0.0.0.0/0`，然后选择**保存规则**。

## 配置复制器设置和权限
<a name="msk-replicator-create-settings"></a>

1. 在**复制器设置**窗格中，使用允许和拒绝列表中的正则表达式指定要复制的主题。默认情况下会复制所有主题。
**注意**  
MSK 复制器仅按排序顺序复制最多 750 个主题。如果需要复制更多主题，我们建议您创建一个单独的复制器。如果您需要为每个 Replicator 提供超过 750 个主题的[支持，请前往 AWS 控制台 Support Center 并创建支持案例](https://console.aws.amazon.com/support/home#/)。您可以使用 “TopicCount” 指标监控正在复制的主题数量。请参阅[Amazon MSK 标准代理配额](limits.md#msk-provisioned-quota)。

1. 默认情况下，MSK 复制器从选定主题中的*最新*偏移量开始复制。或者，如果您想复制主题上的现有数据，则可以从选定主题中*最早*（最旧）偏移量开始复制。一旦创建了复制器，您就无法更改此设置。此设置对应于[https://docs.aws.amazon.com/msk/1.0/apireference-replicator/v1-replicators.html#CreateReplicator](https://docs.aws.amazon.com/msk/1.0/apireference-replicator/v1-replicators.html#CreateReplicator)请求和[https://docs.aws.amazon.com/msk/1.0/apireference-replicator/v1-replicators-replicatorarn.html#DescribeReplicator](https://docs.aws.amazon.com/msk/1.0/apireference-replicator/v1-replicators-replicatorarn.html#DescribeReplicator)响应中的[https://docs.aws.amazon.com/msk/1.0/apireference-replicator/v1-replicators-replicatorarn.html#v1-replicators-replicatorarn-model-replicationstartingposition](https://docs.aws.amazon.com/msk/1.0/apireference-replicator/v1-replicators-replicatorarn.html#v1-replicators-replicatorarn-model-replicationstartingposition)字段 APIs。

1. 选择主题名称配置：
   + `PREFIXED` 主题名称复制（在控制台中**为主题名称添加前缀**）：默认设置。MSK 复制器将“topic1”从源集群复制到目标集群中名为 `<sourceKafkaClusterAlias>.topic1` 的新主题。
   + 相同主题名称复制（控制台中为**保留相同的主题名称**）：来自源集群的主题在目标集群中以相同的主题名称进行复制。

   此设置对应于`CreateReplicator`请求和`DescribeReplicator`响应中的`TopicNameConfiguration`字段 APIs。请参阅[Amazon MSK 复制器的工作原理](msk-replicator-how-it-works.md)。
**注意**  
默认情况下，MSK 复制器在目标集群中创建新主题，并在主题名称中添加自动生成的前缀。这是为了将包含从源集群复制的数据的主题与目标集群中的其他主题区分开来，并避免在集群之间循环复制数据。或者，您可以创建具有相同主题名称复制（控制台中为**保留相同的主题名称**）的 MSK 复制器，以便在复制期间保留主题名称。此配置减少了您在设置期间重新配置客户端应用程序的需要，并使操作多集群流架构变得更加简单。

1. 默认情况下，MSK Replicator 会复制所有元数据，包括主题配置、访问控制列表 (ACLs) 和使用者组偏移量，以实现无缝故障转移。如果您创建的不是用于失效转移的复制器，则可以选择关闭**其他设置**部分中提供的一个或多个设置。
**注意**  
MSK Replicator 不会复制写入， ACLs 因为您的制作者不应直接写入目标集群中复制的主题。失效转移后，您的生成器应写入目标集群中的本地主题。有关详细信息，请参阅 [按计划向辅助 AWS 区域执行故障转移](msk-replicator-perform-planned-failover.md)。

1. 在**使用器组复制**窗格中，使用允许和拒绝列表中的正则表达式指定要复制的主题。默认情况下，所有使用器组都会被复制。

1. 在**压缩**窗格中，您可以选择压缩写入目标集群的数据。如果您要使用压缩，我们建议您使用与源集群中的数据相同的压缩方法。

1. 在**访问权限**窗格中，执行以下任一操作：

   1. 选择**创建或更新具有所需策略的 IAM 角色**。MSK 控制台将自动为服务执行角色附加必要的权限和信任策略，以便读取和写入您的源和目标 MSK 集群。  
![\[用于创建或更新复制器 IAM 角色的 MSK 控制台\]](http://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/images/msk-replicator-ezCRC.png)

   1. 通过选择**从 Amazon MSK 可以代入的 IAM 角色中选择**提供您自己的 IAM 角色。我们建议您将 `AWSMSKReplicatorExecutionRole` 托管的 IAM 策略附加到您的服务执行角色，而不是编写您自己的 IAM 策略。

      1. 创建 IAM 角色，复制器将使用该角色对源和目标 MSK 集群进行读取和写入操作，并使用以下 JSON 作为信任策略的一部分，并将 `AWSMSKReplicatorExecutionRole` 附加到该角色。在信任策略中，将占位符 <yourAccountID> 替换为您的实际账户 ID。

------
#### [ JSON ]

****  

        ```
        {
            "Version":"2012-10-17",		 	 	 
            "Statement": [
                {
                    "Effect": "Allow",
                    "Principal": {
                        "Service": "kafka.amazonaws.com"
                    },
                    "Action": "sts:AssumeRole",
                    "Condition": {
                        "StringEquals": {
                            "aws:SourceAccount": "<yourAccountID>"
                        }
                    }
                }
            ]
        }
        ```

------

1. 在**复制器标签**窗格中，您可以选择为 MSK 复制器资源分配标签。有关更多信息，请参阅 [为 Amazon MSK 集群添加标签](msk-tagging.md)。对于跨区域 MSK 复制器，在创建复制器时，标签会自动同步到远程区域。如果在创建复制器后更改标签，则更改不会自动同步到远程区域，因此您需要手动同步本地复制器和远程复制器参考。

1. 选择**创建**。

如果想要限制 `kafka-cluster:WriteData` 权限，请参阅 [How IAM access control for Amazon MSK works](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html#how-to-use-iam-access-control) 中的 *Create authorization policies* 部分。您需要为源集群和目标集群添加 `kafka-cluster:WriteDataIdempotently` 权限。

大约需要 30 分钟才能成功创建 MSK 复制器并转换到 RUNNING 状态。

如果您创建一个新的 MSK 复制器来替换已删除的复制器，则新的复制器会从最新的偏移开始复制。

如果您的 MSK 复制器已转换为 FAILED 状态，请参阅问题排查部分[排查 MSK 复制器的问题](msk-replicator-troubleshooting.md)。

# 编辑 MSK 复制器设置
<a name="msk-replicator-edit-settings"></a>

创建 MSK 复制器后，您无法更改源集群、目标集群、复制器起始位置或主题名称复制配置。您需要创建一个新的复制器才能使用相同主题名称复制配置。但是，您可以编辑其他复制器设置，例如要复制的主题和消费者组。

1. 登录并在[https://console.aws.amazon.com/msk/家中打开 Amazon MSK 控制台？ AWS 管理控制台 region=us](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)-east-1\$1/home/。

1. 在左侧导航窗格中，选择**复制器**以显示账户中的复制器列表，然后选择要编辑的 MSK 复制器。

1. 选择**属性**选项卡。

1. 在**复制器设置**部分，选择**编辑复制器**。

1. 您可以通过更改任一设置来编辑 MSK 复制器设置。
   + 使用允许和拒绝列表中的正则表达式指定要复制的主题。默认情况下，MSK Replicator 会复制所有元数据，包括主题配置、访问控制列表 (ACLs) 和使用者组偏移量，以实现无缝故障转移。如果您创建的不是用于失效转移的复制器，则可以选择关闭**其他设置**部分中提供的一个或多个设置。
**注意**  
MSK Replicator 不会复制写入， ACLs 因为您的制作者不应直接写入目标集群中复制的主题。失效转移后，您的生成器应写入目标集群中的本地主题。有关详细信息，请参阅 [按计划向辅助 AWS 区域执行故障转移](msk-replicator-perform-planned-failover.md)。
   + 对于**使用器组复制**，您可以在允许和拒绝列表中使用正则表达式指定要复制的使用器组。默认情况下，所有使用器组都会被复制。如果允许列表和拒绝列表为空，则关闭使用器组复制。
   + 在**目标压缩类型**下，您可以选择压缩写入目标集群的数据。如果您要使用压缩，我们建议您使用与源集群中的数据相同的压缩方法。

1. 保存更改。

   大约需要 30 分钟才能成功创建 MSK 复制器并转换到 RUNNING 状态。如果您的 MSK 复制器已转换为 FAILED 状态，请参阅问题排查部分[排查 MSK 复制器问题](msk-replicator-troubleshooting.md)。

# 删除 MSK 复制器
<a name="msk-replicator-delete"></a>

如果 MSK 复制器创建失败（FAILED 状态），则可能需要将其删除。一旦创建 MSK 复制器，就无法更改分配给 MSK 复制器的源集群和目标集群。您可以删除现有 MSK 复制器并创建新的复制器。如果您创建一个新的 MSK 复制器来替换已删除的复制器，则新的复制器会从最新的偏移开始复制。

1. 在您的源集群所在的 AWS 区域，登录并在[https://console.aws.amazon.com/msk/家中打开 Amazon MSK 控制台？ AWS 管理控制台 region=us](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)-east-1\$1/home/。

1. 在导航窗格中，选择**复制器**。

1. 从 MSK 复制器列表中，选择要删除的复制器，然后选择**删除**。

# 监控复制
<a name="msk-replicator-monitor"></a>

您可以在目标集群区域[https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)中使用来查看每个 Amazon MSK Replicator `ReplicationLatency` `MessageLag`、、的主题和汇总级别的指标。`ReplicatorThroughput`在 “AWS/Kafka” 命名空间下**ReplicatorName**方可以看到指标。您还可以查看 `ReplicatorFailure`、`AuthError` 和 `ThrottleTime` 指标来检查问题。

MSK 控制台显示每个 MSK CloudWatch 复制器的指标子集。从控制台**复制器**列表中，选择复制器的名称并选择**监控**选项卡。

## MSK 复制器指标
<a name="msk-replicator-metrics"></a>

以下指标描述了 MSK 复制器的性能或连接指标。

AuthError 指标不包括主题级别的身份验证错误。要监控 MSK Replicator 的主题级身份验证错误，请监控 Replicator 的 ReplicationLatency 指标和源集群的主题级指标。 MessagesInPerSec如果主题 ReplicationLatency 降至 0，但该主题仍有数据正在生成给它，则表示 Replicator 在该主题上存在身份验证问题。检查复制器的服务执行 IAM 角色是否有足够的权限访问该主题。


****  
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/msk-replicator-monitor.html)

# 使用复制来提高 Kafka 流应用程序跨区域的弹性
<a name="msk-replicator-increase-resiliency"></a>

您可以使用 MSK Replicator 设置主动-主动或主动-被动集群拓扑，以提高 Apache Kafka 应用程序跨区域的弹性。 AWS 在主动-主动设置中，两个 MSK 集群都积极提供读取和写入服务。在主动-被动设置中，一次只有一个 MSK 集群主动提供流媒体数据，而另一个集群处于备用状态。

## 构建多区域 Apache Kafka 应用程序的注意事项
<a name="msk-replication-multi-region-kafka-applications"></a>

您的使用器必须能够在不影响下游的情况下重新处理重复的消息。MSK Replicator 会复制可能 at-least-once导致备用集群中出现重复的数据。当您切换到辅助 AWS 区域时，您的消费者可能会多次处理相同的数据。MSK 复制器会优先处理复制数据而不是使用器偏移，以提高性能。失效转移后，使用器可能会开始从较早的偏移中读取，从而导致重复处理。

生成器和使用器还必须容忍丢失最少的数据。由于 MSK Replicator 异步复制数据，因此当主 AWS 区域开始出现故障时，无法保证所有数据都会复制到辅助区域。您可以使用复制延迟来确定未复制到二级区域的最大数据量。

## 使用主动-主动与主动-被动集群拓扑
<a name="msk-replicator-active-versus-passive"></a>

主动-主动集群拓扑提供了几乎为零的恢复时间，并且您的流媒体应用程序能够在多个 AWS 区域同时运行。当一个区域中的集群受损时，连接到另一个区域的集群的应用程序会继续处理数据。

主动-被动设置适用于一次只能在一个 AWS 区域运行的应用程序，或者当您需要更多地控制数据处理顺序时。主动-被动设置比主动-主动设置需要更多的恢复时间，因为您必须在二级区域启动整个主动-被动设置，包括您的生成器和使用器，才能在失效转移后恢复流式传输数据。

# 使用推荐的主题命名配置创建主动-被动 Kafka 集群设置
<a name="msk-replicators-active-passive-cluster-setup"></a>

对于主动-被动设置，我们建议您在两个不同的区域中使用类似的生产者、MSK 集群和消费者（使用相同的消费者组名称）设置。 AWS 两个 MSK 集群必须具有相同的读取和写入容量，以确保可靠的数据复制。您需要创建 MSK 复制器才能将数据从主集群持续复制到备用集群。您还需要将生产者配置为将数据写入同一 AWS 区域中集群的主题中。

对于主动-被动设置，请创建一个具有相同主题名称复制的新复制器（控制台中为**保留相同的主题名称**），开始将数据从主区域的 MSK 集群复制到辅助区域的集群。我们建议您在两个 AWS 区域中运行一组重复的生产者和使用者，每个生产者和使用者都使用其引导字符串连接到各自区域中的集群。这简化了失效转移过程，因为它不需要更改引导字符串。为确保消费者从中断的地方读取数据，源集群和目标集群中的消费者应具有相同的消费者组 ID。

如果您对 MSK 复制器使用相同主题名称复制（控制台中为**保留相同的主题名称**），它将复制与相应源主题同名的主题。

我们建议您为目标集群上的客户端配置集群级别的设置和权限。您无需配置主题级别的设置和文字读取， ACLs 因为如果您选择了复制访问控制列表的选项，MSK Replicator 会自动复制它们。请参阅[元数据复制](msk-replicator-how-it-works.md#msk-replicator-metadata-replication)。

# 故障转移到辅助 AWS 区域
<a name="msk-replicator-when-planned-failover"></a>

我们建议您使用 Amazon 监控次要 AWS 区域的复制延迟 CloudWatch。在主 AWS 区域发生服务事件期间，复制延迟可能会突然增加。如果延迟持续增加，请使用 S AWS ervice Health Dashboard 检查主要 AWS 区域中的服务事件。如果发生事件，您可以故障转移到辅助 AWS 区域。

# 按计划向辅助 AWS 区域执行故障转移
<a name="msk-replicator-perform-planned-failover"></a>

您可以按计划进行故障转移，以测试应用程序在包含源 MSK 集群的主 AWS 区域发生意外事件时的弹性。计划失效转移不应导致数据丢失。

如果使用相同主题名称复制配置，请按照以下步骤操作：

1. 关闭所有连接到您的源集群的生成器和使用器。

1. 创建一个新的 MSK 复制器，将数据从辅助区域中的 MSK 集群复制到主区域中的 MSK 集群，并使用相同主题名称复制（控制台中为**保留相同的主题名称**）。这是将要写入二级区域的数据复制回主区域所必需的，这样您就可以在意外事件结束后对主区域执行失效自动恢复。

1. 启动连接到辅助 AWS 区域中目标集群的生产者和消费者。

如果使用带前缀主题名称配置，请按照以下步骤进行失效转移：

1. 关闭所有连接到您的源集群的生成器和使用器。

1. 创建新的 MSK 复制器，将数据从二级区域的 MSK 集群复制到主区域中的 MSK 集群。这是将要写入二级区域的数据复制回主区域所必需的，这样您就可以在意外事件结束后对主区域执行失效自动恢复。

1. 在辅助 AWS 区域的目标集群上启动生产者。

1. 请按照以下选项卡之一的步骤操作，具体取决于应用程序的消息排序要求。

------
#### [ No message ordering ]

   如果您的应用程序不需要消息排序，则在辅助 AWS 区域启动使用通配符运算符（例如，）同时读取本地（例如主题`<sourceKafkaClusterAlias>.topic`）和复制主题（例如`.*topic`）的消费者。

------
#### [ Message ordering ]

   如果您的应用程序需要消息排序，则仅为目标集群上复制的主题（例如 `<sourceKafkaClusterAlias>.topic`）启动使用器，而不为本地主题（例如 `topic`）启动使用器。

------

1. 等待目标 MSK 集群上所有已复制主题的使用器完成所有数据的处理，这样使用器延迟为 0，而处理的记录数也为 0。然后，停止目标集群上已复制主题的使用器。此时，从源 MSK 集群复制到目标 MSK 集群的所有记录都已使用。

1. 在目标 MSK 集群上启动本地主题（例如 `topic`）的使用器。

# 对辅助 AWS 区域执行计划外故障转移
<a name="msk-replicator-perform-unplanned-failover"></a>

如果您的源 MSK 群集 AWS 所在的主区域发生服务事件，并且您想暂时将流量重定向到拥有目标 MSK 群集的辅助区域，则可以进行计划外故障转移。由于 MSK 复制器异步复制数据，计划外失效转移可能会导致一些数据丢失。您可以使用 [监控复制](msk-replicator-monitor.md) 中的指标来跟踪消息延迟。

如果使用相同主题名称复制配置（控制台中为**保留相同的主题名称**），请按照以下步骤操作：

1. 尝试关闭所有连接到主区域中源 MSK 集群的生成器和使用器。由于该区域存在损坏，此操作可能不成功。

1. 启动生产者和使用者连接到辅助 AWS 区域中的目标 MSK 集群以完成故障转移。由于 MSK Replicator 还会复制包括读取 ACLs 和使用者组偏移量在内的元数据，因此您的生产者和使用者将从故障转移前停下来的位置无缝恢复处理。

如果使用 `PREFIX` 主题名称配置，请按照以下步骤进行失效转移：

1. 尝试关闭所有连接到主区域中源 MSK 集群的生成器和使用器。由于该区域存在损坏，此操作可能不成功。

1. 启动生产者和使用者连接到辅助 AWS 区域中的目标 MSK 集群以完成故障转移。由于 MSK Replicator 还会复制包括读取 ACLs 和使用者组偏移量在内的元数据，因此您的生产者和使用者将从故障转移前停下来的位置无缝恢复处理。

1. 请按照以下选项卡之一的步骤操作，具体取决于应用程序的消息排序要求。

------
#### [ No message ordering ]

   如果您的应用程序不需要消息排序，则在目标 AWS 区域启动使用通配符运算符（例如`topic`）同时读取本地（例如`<sourceKafkaClusterAlias>.topic`）和复制主题（例如`.*topic`）的使用者。

------
#### [ Message ordering ]

   1. 仅为目标集群上复制的主题（例如 `<sourceKafkaClusterAlias>.topic`）启动使用器，而不为本地主题（例如 `topic`）启动使用器。

   1. 等待目标 MSK 集群上所有已复制主题的使用器完成所有数据的处理，这样偏移延迟为 0，而处理的记录数也为 0。然后，停止目标集群上已复制主题的使用器。此时，从源 MSK 集群复制到目标 MSK 集群的所有记录都已使用。

   1. 在目标 MSK 集群上启动本地主题（例如 `topic`）的使用器。

------

1. 一旦服务事件在主区域中结束，请创建一个新的 MSK 复制器，以将数据从辅助区域中的 MSK 集群复制到主区域中的 MSK 集群，应将复制器的起始位置设置为*最早*。这是将要写入二级区域的数据复制回主区域所必需的，这样您就可以在服务事件结束后对主区域执行失效自动恢复。如果未将复制器的起始位置设置为*最早*，则在主区域的服务事件期间向辅助区域中的集群生成的任何数据都不会被复制回主区域中的集群。

# 对主 AWS 区域执行故障恢复
<a name="msk-replicator-perform-failback"></a>

在主 AWS 区域的服务事件结束后，您可以回切到该区域。

如果使用相同主题名称复制配置，请按照以下步骤操作：

1. 创建一个新的 MSK 复制器，将辅助集群作为源，主集群作为目标，起始位置设置为*最早*，且使用相同主题名称复制（控制台中为**保留相同的主题名称**）。

   这将启动将失效转移后写入辅助群集的所有数据复制回主区域的过程。

1. 在 Amazon 中监控新复制器上的`MessageLag`指标， CloudWatch 直到达到该指标`0`，这表明所有数据都已从辅助复制到主副本。

1. 所有数据复制完成后，停止所有连接到辅助集群的生产者，并启动连接到主集群的生产者。

1. 等待连接到辅助集群的使用者的 `MaxOffsetLag` 指标变成 `0`，以确保它们已处理完所有数据。请参阅[监控消费者延迟](consumer-lag.md)。

1. 所有数据处理完毕后，停止辅助区域中的消费者并启动连接到主集群的消费者以完成失效自动恢复。

1. 删除在第一步中创建的将数据从辅助集群复制到主集群的复制器。

1. 验证您的现有 Replicator 将数据从主集群复制到辅助集群的状态是否为 “正在运行”，并且在 Amazon CloudWatch `0` 中具有`ReplicatorThroughput`指标。

   请注意，当您创建一个新的复制器并将其起始位置设为*最早*以进行失效自动恢复时，它会开始读取辅助集群主题中的所有数据。根据您的数据留存设置，您的主题可能包含来自源集群的数据。虽然 MSK 复制器会自动筛选这些消息，但是您仍将为辅助集群中的所有数据支付数据处理和传输费用。您可以使用 `ReplicatorBytesInPerSec` 跟踪复制器处理的总数据。请参阅[MSK 复制器指标](msk-replicator-monitor.md#msk-replicator-metrics)。

如果使用带前缀主题名称配置，请按照以下步骤进行操作：

只有在从辅助区域的集群复制到主区域的集群已赶上并且 Amazon 中的 MessageLag 指标接近 0 之后， CloudWatch 才应启动故障恢复步骤。计划的失效自动恢复不应导致数据丢失。

1. 关闭所有连接到二级区域中 MSK 集群的生成器和使用器。

1. 对于主动-被动拓扑，请删除正在将数据从二级区域的集群复制到主区域的复制器。对于主动-主动拓扑，您无需删除复制器。

1. 启动连接到主区域中 MSK 集群的生成器。

1. 请按照以下选项卡之一的步骤操作，具体取决于应用程序的消息排序要求。

------
#### [ No message ordering ]

   如果您的应用程序不需要消息排序，则在主 AWS 区域中启动使用通配符运算符（例如`topic`）同时读取本地（例如`<sourceKafkaClusterAlias>.topic`）和复制主题（例如`.*topic`）的使用者。本地主题（例如 topic）的使用器将从失效转移前消耗的最后一个偏移恢复。如果在失效转移之前有任何未处理的数据，则现在将对其进行处理。如果是计划内失效转移，则不应有此类记录。

------
#### [ Message ordering ]

   1. 仅为主区域上复制的主题（例如 `<sourceKafkaClusterAlias>.topic`）启动使用器，而不为本地主题（例如 `topic`）启动使用器。

   1. 等待主区域集群上所有已复制主题的使用器完成所有数据的处理，这样偏移延迟为 0，处理的记录数也为 0。然后，停止主区域集群上已复制主题的使用器。此时，失效转移后在二级区域生成的所有记录都已在主区域中使用。

   1. 在主区域的集群上启动本地主题（例如 `topic`）的使用器。

------

1. 使用 `ReplicatorThroughput` 和 latency 指标，验证从主区域中的集群到复制区域中的集群的现有复制器是否处于 RUNNING 状态并按预期运行。

# 使用 MSK 复制器创建主动-主动设置
<a name="msk-replicator-active-active"></a>

如果您想要创建一个主动-主动设置，其中两个 MSK 集群都积极地提供读写服务，我们建议您使用具有带前缀主题名称复制（控制台中为**为主题名称添加前缀**）的 MSK 复制器。但是，这将要求您重新配置消费者来读取复制的主题。

按照以下步骤在源 MSK 集群 A 和目标 MSK 集群 B 之间设置主动-主动拓扑。

1. 创建 MSK 复制器，将 MSK 集群 A 作为源，将 MSK 集群 B 作为目标。

1. 成功创建上述 MSK 复制器后，创建一个以集群 B 为源、集群 A 为目标的复制器。

1. 创建两组生成器，每组生成器将数据同时写入与生成器位于同一区域的集群中的本地主题（例如“主题”）。

1. 创建两组使用者，每组使用通配符订阅读取数据（例如”。 \$1topic”) 来自与消费者位于同一 AWS 区域的 MSK 集群。这样，您的使用器将自动从本地主题（例如 `topic`）读取在该区域本地生成的数据，以及从主题中带有 `<sourceKafkaClusterAlias>.topic` 前缀的其他区域复制的数据。这两组使用者应具有不同的使用者组， IDs 这样当 MSK Replicator 将它们复制到另一个集群时，消费者组偏移量就不会被覆盖。

如果想要避免重新配置客户端，可以使用相同主题名称复制（控制台中为**保留相同的主题名称**）创建 MSK 复制器来创建主动-主动设置，而不是使用带前缀主题名称复制（控制台中为**为主题名称添加前缀**）。但是，您将为每个复制器支付额外的数据处理和数据传输费用。这是因为每个复制器需要处理两倍于平时的数据量，一次用于复制，另一次用于防止无限循环。您可以使用 `ReplicatorBytesInPerSec` 指标跟踪每个复制器处理的总数据量。请参阅[监控复制](msk-replicator-monitor.md)。此指标包括复制到目标集群的数据以及由 MSK 复制器筛选的数据，以防止数据被复制回其来源的同一主题。

**注意**  
如果使用相同主题名称复制（控制台中为**保留相同的主题名称**）来设置主动-主动拓扑，请在删除主题后至少等待 30 秒，然后再重新创建同名主题。此等待期有助于防止重复的消息被复制回源集群。您的使用器必须能够在不影响下游的情况下重新处理重复的消息。请参阅[构建多区域 Apache Kafka 应用程序的注意事项](msk-replicator-increase-resiliency.md#msk-replication-multi-region-kafka-applications)。

# 使用 MSK 复制器在 Amazon MSK 集群之间迁移
<a name="msk-replicator-migrate-cluster"></a>

您可以使用相同主题名称复制进行集群迁移，但您的消费者必须能够处理重复消息而不会对下游造成影响。这是因为 MSK Replicator 提供了 at-least-once复制功能，这在极少数情况下可能会导致消息重复。如果您的消费者满足此要求，请按照以下步骤进行操作。

1. 创建一个用于将数据从旧集群复制到新集群的复制器，将复制器的起始位置设置为*最早*，且使用相同主题名称复制（控制台中为**保留相同的主题名称**）。

1. 在新集群上配置集群级别的设置和权限。您无需配置主题级别的设置和 “文字” 读取 ACLs，因为 MSK Replicator 会自动复制它们。

1. 在 Amazon 中监控该`MessageLag`指标， CloudWatch 直到该指标达到 0，表示所有数据都已复制。

1. 所有数据复制完成后，停止生产者将数据写入旧集群。

1. 重新配置这些生产者以连接到新的集群并启动它们。

1. 监控您的消费者从旧集群读取数据的 `MaxOffsetLag` 指标，直到它变为 `0`，这表明所有现有数据都已处理。

1. 停止连接到旧集群的消费者。

1. 重新配置消费者以连接到新集群并启动它们。

# 从自我管理 MirrorMaker 2 迁移到 MSK Replicator
<a name="msk-replicator-migrate-mirrormaker2"></a>

要从 MirrorMaker (MM2) 迁移到 MSK Replicator，请执行以下步骤：

1. 停止正在向源 Amazon MSK 集群写入的生产者。

1.  MM2 允许复制源集群主题上的所有消息。您可以监控源 MSK 集群上 MM2 消费者的使用者延迟，以确定何时复制了所有数据。

1. 创建一个新的复制器，将起始位置设置为*最新*，将主题名称配置设置为 `IDENTICAL`（控制台中为**相同主题名称复制**）。

1. 在复制器处于 RUNNING 状态后，您可以再次启动向源集群写入数据的生产者。

# 排查 MSK 复制器问题
<a name="msk-replicator-troubleshooting"></a>

以下信息可帮助您排查 MSK 复制器可能存在的问题。有关其他 Amazon MSK 功能的问题解决信息，请参阅[排查 Amazon MSK 集群的问题](troubleshooting.md)。您也可以将问题发布到 [AWS re:Post](https://repost.aws/)。

## MSK 复制器状态从 CREATING 变为 FAILED
<a name="msk-replicator-troubleshooting-failed-state"></a>

以下是 MSK 复制器创建失败的一些常见原因。

1. 请验证您在目标集群部分中为创建复制器提供的安全组具有出站规则，以允许流量进入目标集群的安全组。此外，请验证目标集群的安全组是否有入站规则，这些规则接受来自您在目标集群部分中为创建复制器提供的安全组的流量。请参阅[选择目标集群](msk-replicator-create-console.md#msk-replicator-create-console-choose-target)。

1. 如果您正在为跨区域复制创建复制器，请确认您的源集群已为 IAM 访问控制身份验证方法开启了多 VPC 连接。请参阅[单区域中的 Amazon MSK 多 VPC 私有连接](aws-access-mult-vpc.md)。此外，请验证是否已在源集群上设置集群策略，以便 MSK 复制器可以连接到源集群。请参阅[准备 Amazon MSK 源集群](msk-replicator-prepare-cluster.md)。

1. 验证您在创建 MSK 复制器期间提供的 IAM 角色是否具有读写源集群和目标集群所需的权限。此外，还要验证 IAM 角色是否具有写入主题的权限。请参阅 [配置复制器设置和权限](msk-replicator-create-console.md#msk-replicator-create-settings)。

1. 确认您的网络 ACLs 没有阻塞 MSK Replicator 与您的源集群和目标集群之间的连接。

1. 当 MSK 复制器尝试连接源集群或目标集群时，源集群或目标集群可能无法完全使用。这可能是由于过度负载、磁盘使用率或 CPU 使用率过高导致复制器无法连接到代理。修复代理的问题，然后重试创建复制器。

执行上述验证后，再次创建 MSK 复制器。

## MSK 复制器似乎停留在 CREATING 状态
<a name="msk-replicator-troubleshooting-stuck-creating"></a>

有时，MSK 复制器创建可能需要长达 30 分钟。请等待 30 分钟，然后再次检查集群的状态。

## MSK 复制器没有复制数据或只复制部分数据
<a name="msk-replicator-troubleshooting-not-replicating"></a>

请按照以下步骤排查数据复制问题。

1. 使用亚马逊 MSK Replicator 提供的 AuthError 指标，确认您的 Replicator 没有遇到任何身份验证错误。 CloudWatch如果此指标大于 0，请检查您为复制器提供的 IAM 角色的策略是否有效，并且没有为集群权限设置拒绝权限。根据 clusterAlias 维度，您可以确定源集群或目标集群是否遇到身份验证错误。

1. 请验证您的源集群和目标集群没有遇到任何问题。复制器可能无法连接到您的源集群或目标集群。这可能是由于连接过多、磁盘容量已满或 CPU 使用率过高所致。

1. 使用亚马逊中的 KafkaClusterPingSuccessCount 指标，验证您的源集群和目标集群是否可以从 MSK Replicator 访问。 CloudWatch根据 clusterAlias 维度，您可以确定源集群或目标集群是否遇到身份验证错误。如果该指标为 0 或没有数据点，则连接不正常。您应该检查 MSK 复制器用于连接到集群的网络和 IAM 角色权限。

1. 使用 Amazon 中的指标，确认您的 Replicator 没有因为缺少主题级别权限而出现故障。 ReplicatorFailure CloudWatch如果此指标大于 0，请检查您为主题级别权限提供的 IAM 角色。

1. 验证您在创建复制器时在允许列表中提供的正则表达式是否与您要复制的主题的名称相匹配。此外，请确认主题没有因为拒绝列表中的正则表达式而被排除在复制之外。

1. 请注意，复制器可能需要长达 30 秒的时间才能在目标集群上检测并创建新的主题或主题分区。如果复制器的起始位置是“最新”（默认），则在目标集群上创建主题之前向源主题生成的任何消息都不会被复制。或者，如果想要在目标集群上复制主题上的现有消息，则可以从源集群主题分区中最早的偏移量开始复制。请参阅[配置复制器设置和权限](msk-replicator-create-console.md#msk-replicator-create-settings)。

## 目标集群中的消息偏移量与源集群不同
<a name="msk-replicator-troubleshooting-different-message-offsets"></a>

作为复制数据的一部分，MSK 复制器使用来自源集群的消息并将其生成到目标集群。这可能导致消息在源集群和目标集群上具有不同的偏移量。但是，如果您在复制器创建期间启用了消费者组偏移量同步功能，MSK 复制器将在复制元数据时自动转换偏移量，以便在失效转移到目标集群后，您的消费者可以从源集群中断的地方附近恢复处理。

## MSK 复制器未同步消费组偏移量或目标集群上不存在消费者组
<a name="msk-replicator-troubleshooting-not-syncing-consumer-groups"></a>

请按照以下步骤排查元数据复制问题。

1. 验证您的数据复制是否按预期运行。如果不是，请参阅[MSK 复制器没有复制数据或只复制部分数据](#msk-replicator-troubleshooting-not-replicating)。

1. 验证您在创建复制器时在允许列表中提供的正则表达式是否与要复制的消费者组的名称相匹配。此外，请确认消费者组没有因为拒绝列表中的正则表达式而被排除在复制之外。

1. 验证 MSK 复制器是否已在目标集群上创建了该主题。复制器可能需要长达 30 秒的时间才能在目标集群上检测并创建新的主题或主题分区。如果复制器的起始位置是*最新*（默认），则在目标集群上创建主题之前向源主题生成的任何消息都不会被复制。如果源集群上的消费者组仅使用了 MSK 复制器未复制的消息，则该消费者组不会被复制到目标集群。在目标集群上成功创建主题后，MSK 复制器将开始将源集群上新写入的消息复制到目标集群。一旦您的消费者组开始从源读取这些消息，MSK 复制器就会自动将该消费者组复制到目标集群。或者，如果想要在目标集群上复制主题上的现有消息，则可以从源集群主题分区中最早的偏移量开始复制。请参阅[配置复制器设置和权限](msk-replicator-create-console.md#msk-replicator-create-settings)。

**注意**  
MSK 复制器为源集群上的消费者优化了消费者组偏移量同步，这些消费者正在从更接近主题分区末端的位置进行读取。如果您的消费者组在源集群上出现延迟，那么与源相比，您可能会看到目标上的消费者组的延迟更高。这意味着在失效转移到目标集群后，您的消费者将重新处理更多重复消息。为了减少此延迟后，源集群上的消费者需要赶上进度并从流末端（主题分区的末尾）开始消耗。当您的消费者赶上时，MSK 复制器将自动减少延迟。

## 复制延迟很高或持续增加
<a name="msk-replicator-troubleshooting-high-latency"></a>

以下是复制延迟较高的一些常见原因。

1. 验证源和目标 MSK 集群上的分区数量是否正确。分区过少或过多会影响性能。有关选择分区数量的指导，请参阅[使用 MSK 复制器的最佳实践](msk-replicator-best-practices.md)。下表显示要使用 MSK 复制器实现所需吞吐量的建议最小分区数。  
**吞吐量和建议的最小分区数**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/msk-replicator-troubleshooting.html)

1. 验证您的源和目标 MSK 集群中是否有足够的读取和写入容量来支持复制流量。MSK 复制器充当源集群（出口）的使用器，也充当目标集群（入口）的生成器。因此，除了集群上的其他流量外，您还应预置集群容量以支持复制流量。有关调整 MSK 集群大小的指导，请参阅[使用 MSK 复制器的最佳实践](msk-replicator-best-practices.md)。

1. 不同源和目标 AWS 区域对中的 MSK 集群的复制延迟可能会有所不同，具体取决于集群在地理上相隔的距离。例如，与欧洲地区（爱尔兰）和亚太地区（悉尼）区域的集群之间的复制相比，在欧洲地区（爱尔兰）和欧洲地区（伦敦）区域的集群之间进行复制时，复制延迟通常较低。

1. 验证复制器没有因为在源集群或目标集群上设置的限额过于激进而受到限制。您可以使用 Amazon 中的 MSK Replicator 提供的 ThrottleTime指标 CloudWatch 来查看集群上的代理限制请求的平均时间（以毫秒为单位）。 source/target 如果此指标大于 0，则应调整 Kafka 限额以减少节流，以便复制器能够赶上。有关管理复制器的 Kafka 限额的信息，请参阅[使用 Kafka 限额管理 MSK 复制器吞吐量](msk-replicator-best-practices.md#msk-replicator-manage-throughput-kafka-quotas)。

1. ReplicationLatency 当一个 AWS 区域退化时， MessageLag 可能会增加。使用 [AWS 服务运行状况控制面板](https://health.aws.amazon.com/health/status)查看您的主 MSK 集群所在的区域中是否有 MSK 服务事件。如果有服务事件，可以临时将应用程序的读取和写入重定向到另一个区域。

## 使用指标对 MSK 复制器故障进行故障排除 ReplicatorFailure
<a name="msk-replicator-troubleshooting-ReplicatorFailure"></a>

该 ReplicatorFailure 指标可帮助您监控和检测 MSK Replicator 中的复制问题。此指标若为非零值，通常表示存在复制失败的问题，原因可能包括如下因素：
+ 消息大小限制
+ 时间戳超出范围
+ 记录批处理大小问题

如果该 ReplicatorFailure 指标报告的值为非零，请按照以下步骤解决问题。

**注意**  
有关该指标的更多信息，请参阅[MSK 复制器指标](msk-replicator-monitor.md#msk-replicator-metrics)。

1. 配置一个能连接至目标 MSK 集群并设置了 Apache Kafka CLI 工具的客户端。有关设置客户端与 Kafka CLI 工具的信息，请参阅[连接到预置 Amazon MSK 集群](client-access.md)。

1. 在[https://console.aws.amazon.com/msk/家打开亚马逊 MSK 控制台？ region=us](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)-east-1\$1/home/。

   然后执行以下操作：

   1. 获取 MSK Replicator 和目标 MSK 集群的。 ARNs 

   1. [获取目标 MSK 集群的代理端点](get-bootstrap-console.md)。这些端点将在后续步骤中使用。

1. 运行以下命令，导出在上一步中获得的 MSK 复制器 ARN 与代理端点。

   确保将以下示例中使用的 < *ReplicatorARN* >、< *BootstrapServerString* > 和 < *ConsumerConfigFile* > 占位符值替换为实际值。

   ```
   export TARGET_CLUSTER_SERVER_STRING=<BootstrapServerString>
   ```

   ```
   export REPLICATOR_ARN=<ReplicatorARN>
   ```

   ```
   export CONSUMER_CONFIG_FILE=<ConsumerConfigFile>
   ```

1. 在 `<path-to-your-kafka-installation>/bin` 目录中，执行以下操作：

   1. 保存以下脚本，并将其命名为 **query-replicator-failure-message.sh**。

      ```
      #!/bin/bash
      
      # Script: Query MSK Replicator Failure Message
      # Description: This script queries exceptions from AWS MSK Replicator status topics
      # It takes a replicator ARN and bootstrap server as input and searches for replicator exceptions
      # in the replicator's status topic, formatting and displaying them in a readable manner
      #
      # Required Arguments:
      #   --replicator-arn: The ARN of the AWS MSK Replicator
      #   --bootstrap-server: The Kafka bootstrap server to connect to
      #   --consumer.config: Consumer config properties file
      # Usage Example:
      #   ./query-replicator-failure-message.sh ./query-replicator-failure-message.sh --replicator-arn <replicator-arn> --bootstrap-server <bootstrap-server> --consumer.config <consumer.config>
      
      print_usage() {
        echo "USAGE: $0 ./query-replicator-failure-message.sh --replicator-arn <replicator-arn> --bootstrap-server <bootstrap-server> --consumer.config <consumer.config>"
        echo "--replicator-arn <String: MSK Replicator ARN>      REQUIRED: The ARN of AWS MSK Replicator."
        echo "--bootstrap-server <String: server to connect to>  REQUIRED: The Kafka server to connect to."
        echo "--consumer.config <String: config file>            REQUIRED: Consumer config properties file."
        exit 1
      }
      
      # Initialize variables
      replicator_arn=""
      bootstrap_server=""
      consumer_config=""
      
      # Parse arguments
      while [[ $# -gt 0 ]]; do
        case "$1" in
          --replicator-arn)
            if [ -z "$2" ]; then
              echo "Error: --replicator-arn requires an argument."
              print_usage
            fi
            replicator_arn="$2"; shift 2 ;;
          --bootstrap-server)
            if [ -z "$2" ]; then
              echo "Error: --bootstrap-server requires an argument."
              print_usage
            fi
            bootstrap_server="$2"; shift 2 ;;
          --consumer.config)
            if [ -z "$2" ]; then
              echo "Error: --consumer.config requires an argument."
              print_usage
            fi
            consumer_config="$2"; shift 2 ;;
          *) echo "Unknown option: $1"; print_usage ;;
        esac
      done
      
      # Check for required arguments
      if [ -z "$replicator_arn" ] || [ -z "$bootstrap_server" ] || [ -z "$consumer_config" ]; then
        echo "Error: --replicator-arn, --bootstrap-server, and --consumer.config are required."
        print_usage
      fi
      
      # Extract replicator name and suffix from ARN
      replicator_arn_suffix=$(echo "$replicator_arn" | awk -F'/' '{print $NF}')
      replicator_name=$(echo "$replicator_arn" | awk -F'/' '{print $(NF-1)}')
      echo "Replicator name: $replicator_name"
      
      # List topics and find the status topic
      topics=$(./kafka-topics.sh --command-config client.properties --list --bootstrap-server "$bootstrap_server")
      status_topic_name="__amazon_msk_replicator_status_${replicator_name}_${replicator_arn_suffix}"
      
      # Check if the status topic exists
      if echo "$topics" | grep -Fq "$status_topic_name"; then
        echo "Found replicator status topic: '$status_topic_name'"
        ./kafka-console-consumer.sh --bootstrap-server "$bootstrap_server" --consumer.config "$consumer_config" --topic "$status_topic_name" --from-beginning | stdbuf -oL grep "Exception" | stdbuf -oL sed -n 's/.*Exception:\(.*\) Topic: \([^,]*\), Partition: \([^\]*\).*/ReplicatorException:\1 Topic: \2, Partition: \3/p'
      else
        echo "No topic matching the pattern '$status_topic_name' found."
      fi
      ```

   1. 运行此脚本，查询 MSK 复制器失败消息。

      ```
      <path-to-your-kafka-installation>/bin/query-replicator-failure-message.sh --replicator-arn $REPLICATOR_ARN --bootstrap-server $TARGET_CLUSTER_SERVER_STRING --consumer.config $CONSUMER_CONFIG_FILE
      ```

      此脚本输出了所有错误及其异常消息，以及受影响的主题分区。可使用该异常信息来缓解故障，如[常见 MSK 复制器故障及其解决方案](#msk-replicator-ReplicatorFailure-error-mitigation)中所述。由于该主题包含了所有历史失败消息，因此应使用最后一条消息开始调查。以下是失败消息的示例。

      ```
      ReplicatorException: The request included a message larger than the max message size the server will accept. Topic: test, Partition: 1
      ```

### 常见 MSK 复制器故障及其解决方案
<a name="msk-replicator-ReplicatorFailure-error-mitigation"></a>

以下列表描述了您可能遇到的 MSK 复制器故障，以及如何缓解这些故障。

**消息大小大于 max.request.size**  
**原因**  
当 MSK 复制器因为单个消息大小超过 10 MB 而无法复制数据时，就会发生这种故障。默认情况下，MSK 复制器会复制大小不超过 10 MB 的消息。
下面是此失败消息类型的示例。  

```
ReplicatorException: The message is 20635370 bytes when serialized which is larger than 10485760, which is the value of the max.request.size configuration. Topic: test, Partition: 1
```
**解决方案**  
减小主题中单个消息的大小。如果无法执行此操作，请按照以下说明[申请提高限额](limits.md#request-msk-quota-increase)。

**消息大小大于服务器能接受的最大消息大小**  
**原因**  
当消息大小超过目标集群的最大消息大小时，就会发生这种故障。
下面是此失败消息类型的示例。  

```
ReplicatorException: The request included a message larger than the max message size the server will accept. Topic: test, Partition: 1
```
**解决方案**  
增大目标集群或相应目标集群主题的 `max.message.bytes` 配置。将目标集群的 `max.message.bytes` 配置设置为与最大未压缩消息大小一致。有关执行此操作的信息，请参阅 [max.message.bytes](https://kafka.apache.org/documentation/#topicconfigs_max.message.bytes)。

**时间戳超出范围**  
**原因**  
出现这种故障的原因是，单个消息的时间戳超出了目标集群的允许范围。
下面是此失败消息类型的示例。  

```
ReplicatorException: Timestamp 1730137653724 of message with offset 0 is out of range. The timestamp should be within [1730137892239, 1731347492239] Topic: test, Partition: 1
```
**解决方案**  
将目标集群的 `message.timestamp.before.max.ms` 配置更新为允许发送带较旧时间戳的消息。有关执行此操作的信息，请参阅 [message.timestamp.before.max.ms](https://kafka.apache.org/documentation/#topicconfigs_message.timestamp.before.max.ms)。

**记录批处理过大**  
**原因**  
出现这种故障的原因是，记录批处理大小超过了为目标集群上的主题设置的区段大小。MSK 复制器的最大批处理大小为 1MB。
下面是此失败消息类型的示例。  

```
ReplicatorException: The request included message batch larger than the configured segment size on the server. Topic: test, Partition: 1
```
**解决方案**  
目标集群的 segment.bytes 配置至少须与批处理大小 (1 MB) 一样大，复制器才能顺利进行操作。将目标集群的 segment.bytes 更新为至少 1048576 (1 MB)。有关此操作的信息，请参阅 [segment.bytes](https://kafka.apache.org/documentation/#topicconfigs_segment.bytes)。

**注意**  
如果在应用这些解决方案后 ReplicatorFailure 指标继续发出非零值，请重复故障排除过程，直到该指标发出零值。

# 使用 MSK 复制器的最佳实践
<a name="msk-replicator-best-practices"></a>

本节介绍了使用 Amazon MSK 复制器的常见最佳实践和实现策略。

**Contents**
+ [使用 Kafka 限额管理 MSK 复制器吞吐量](#msk-replicator-manage-throughput-kafka-quotas)
+ [设置集群保留期](#msk-replicator-retention-period)

## 使用 Kafka 限额管理 MSK 复制器吞吐量
<a name="msk-replicator-manage-throughput-kafka-quotas"></a>

由于 MSK 复制器充当源集群的使用器，复制可能会导致源集群上的其他使用器受到节流。节流量取决于源集群的读取容量和要复制的数据吞吐量。我们建议您为源集群和目标集群配置相同的容量，并在计算所需容量时考虑复制吞吐量。

您还可以在源集群和目标集群上为复制器设置 Kafka 限额，以控制 MSK 复制器可以使用的容量。建议使用网络带宽限额。网络带宽限额定义了一个或多个共享限额的客户端的字节速率阈值，定义为每秒字节数。此限额依不同代理而定义。

请按照以下步骤申请限额。

1. 检索源集群的引导服务器字符串。请参阅[获取 Amazon MSK 集群的引导代理](msk-get-bootstrap-brokers.md)。

1. 检索 MSK 复制器使用的服务执行角色（SER）。这是您用于 `CreateReplicator` 请求的 SER。您也可以从现有 Replicator 的 DescribeReplicator 响应中提取 SER。

1. 使用 Kafka CLI 工具，对源集群运行以下命令。

   ```
   ./kafka-configs.sh --bootstrap-server <source-cluster-bootstrap-server> --alter --add-config 'consumer_byte_
   rate=<quota_in_bytes_per_second>' --entity-type users --entity-name arn:aws:sts::<customer-account-id>:assumed-role/<ser-role-name>/<customer-account-id> --command-config <client-properties-for-iam-auth></programlisting>
   ```

1. 执行上述命令后，请确认该 `ReplicatorThroughput` 指标未超过您设置的限额。

请注意，如果您在多个 MSK 复制器之间重复使用服务执行角色，则它们都受此限额的约束。如果要为每个复制器保留单独的限额，请使用不同的服务执行角色。

有关使用带限额的 MSK IAM 身份验证的更多信息，请参阅 [Multi-tenancy Apache Kafka clusters in Amazon MSK with IAM access control and Kafka Quotas – Part 1](https://aws.amazon.com/blogs/big-data/multi-tenancy-apache-kafka-clusters-in-amazon-msk-with-iam-access-control-and-kafka-quotas-part-1/)。

**警告**  
设置极低的 consumer\$1byte\$1rate 可能会导致 MSK 复制器以意外的方式运行。

## 设置集群保留期
<a name="msk-replicator-retention-period"></a>

您可以为 MSK 预置的集群和无服务器集群设置日志保留期。默认的保留期为 7 天。请参阅[集群配置更改](msk-replicator-serverless-requirements.md#msk-replicator-config-changes)或[支持的 MSK Serverless 集群配置](msk-replicator-serverless-requirements.md)。