

# Migrate from non-MSK Apache Kafka clusters to Amazon MSK Express brokers
<a name="msk-replicator-migrate-external"></a>

You can use MSK Replicator to migrate Apache Kafka workloads from self-managed environments to Amazon MSK Provisioned clusters with Express brokers. MSK Replicator supports data migration from Kafka deployments (Kafka version 2.8.1 or later) that have SASL/SCRAM authentication enabled.

**Note**  
SASL/SCRAM authentication is required only for MSK Replicator to connect to your self-managed Kafka cluster. Your client applications can continue using their existing authentication mechanisms.

**Prerequisites**  
Before you begin, ensure you have the following:

1. Source Apache Kafka cluster running version 2.8.1 or later

1. SASL/SCRAM authentication enabled on source cluster

1. SSL encryption configured on source cluster

1. Network connectivity via AWS Site-to-Site VPN or AWS Direct Connect

1. VPC subnets configured for Secrets Manager access

For detailed instructions, see [Set up prerequisites for MSK Replicator with self-managed Apache Kafka clusters](msk-replicator-external-prereqs.md).

**Step 1: Create an Amazon MSK Express cluster**  
Create an MSK Provisioned cluster with Express brokers with IAM authentication enabled. Minimum three brokers across three AZs. See [Prepare the target cluster](msk-replicator-prepare-clusters.md#msk-replicator-prepare-target).

**Step 2: Create an IAM execution role**  
Attach `AWSMSKReplicatorExecutionRole` and `AWSSecretsManagerClientReadOnlyAccess` managed policies. Configure trust policy for `kafka.amazonaws.com`. See [Set up prerequisites for MSK Replicator with self-managed Apache Kafka clusters](msk-replicator-external-prereqs.md).

**Step 3: Configure SASL/SCRAM and SSL on self-managed cluster**  
Create dedicated SCRAM user with required ACL permissions. Configure SSL certificates. See [Set up prerequisites for MSK Replicator with self-managed Apache Kafka clusters](msk-replicator-external-prereqs.md).

**Step 4: Store credentials in AWS Secrets Manager**  
Create secret with `username`, `password`, and `certificate` key-value pairs. See [Set up prerequisites for MSK Replicator with self-managed Apache Kafka clusters](msk-replicator-external-prereqs.md).

**Step 5: Create the Replicator**  
Use `CreateReplicator` API with `EARLIEST` starting position, Identical topic name replication, and `synchroniseConsumerGroupOffsets` set to `true`. If you plan to set up bidirectional replication for rollback capability (Step 6), also set `consumerGroupOffsetSyncMode` to `ENHANCED` on both the forward and reverse Replicators. Allow approximately 30 minutes for the Replicator to reach RUNNING status. See [CreateReplicator API examples for self-managed Kafka clusters](msk-replicator-external-api-examples.md).

**Step 6: (Optional) Set up bidirectional replication**  
Create a reverse Replicator from the MSK Express cluster back to the self-managed cluster for rollback capabilities. See [CreateReplicator API examples for self-managed Kafka clusters](msk-replicator-external-api-examples.md).

**Step 7: Monitor replication progress**  
Monitor the following metrics:
+ `MessageLag` (should reach 0)
+ `ReplicationLatency`
+ `ConsumerGroupOffsetSyncFailure` (should be 0)
+ `ConsumerGroupCount`
+ `OffsetLag (MSK Cluster)` and `OffsetLag (Non-MSK Cluster)`

For more information, see [Monitor replication](msk-replicator-monitor.md).

**Step 8: Migrate applications**  
Follow these steps to migrate your applications:

1. Stop producers writing to self-managed cluster

1. Reconfigure producers to MSK Express cluster with IAM authentication

1. Monitor `MessageLag` until it reaches 0

1. Stop consumers on self-managed cluster

1. Reconfigure consumers to MSK Express cluster

**Step 9: (Optional) Roll back to self-managed cluster**  
If bidirectional replication was configured, you can reverse the migration steps to roll back to the self-managed cluster. The reverse Replicator (MSK Express → External) will have been keeping the self-managed cluster in sync, so consumers can be redirected back without data loss.

# Set up prerequisites for MSK Replicator with self-managed Apache Kafka clusters
<a name="msk-replicator-external-prereqs"></a>

## Create an IAM execution role
<a name="msk-replicator-external-iam-role"></a>

Create an IAM role with a trust policy for `kafka.amazonaws.com`. Attach the `AWSMSKReplicatorExecutionRole` and `AWSSecretsManagerClientReadOnlyAccess` managed policies.

Example trust policy:

```
{
  "Statement": [{
    "Effect": "Allow",
    "Principal": {"Service": "kafka.amazonaws.com"},
    "Action": "sts:AssumeRole"
  }]
}
```

## Configure SASL/SCRAM user and ACL permissions
<a name="msk-replicator-external-scram"></a>

Create a dedicated SCRAM user on your self-managed Kafka cluster. The following ACL permissions are required:

1. Read, Describe on all topics

1. Read, Describe on all consumer groups

1. Describe on cluster resource

Example kafka-acls.sh commands:

```
# Grant Read and Describe on all topics
kafka-acls.sh --bootstrap-server <broker>:9092 \
  --add --allow-principal User:msk-replicator \
  --operation Read --operation Describe \
  --topic '*'

# Grant Read and Describe on all consumer groups
kafka-acls.sh --bootstrap-server <broker>:9092 \
  --add --allow-principal User:msk-replicator \
  --operation Read --operation Describe \
  --group '*'

# Grant Describe on cluster
kafka-acls.sh --bootstrap-server <broker>:9092 \
  --add --allow-principal User:msk-replicator \
  --operation Describe --cluster
```

## Configure SSL on self-managed cluster
<a name="msk-replicator-external-ssl"></a>

Configure SSL listeners on your brokers. For publicly trusted certificates, no additional configuration is required. For private or self-signed certificates, include the full CA certificate chain in the secret stored in AWS Secrets Manager.

## Store credentials in AWS Secrets Manager
<a name="msk-replicator-external-secrets"></a>

Create a secret of type *Other* (not RDS/Redshift) in AWS Secrets Manager with the following key-value pairs:

1. `username` — SCRAM username for the self-managed cluster

1. `password` — SCRAM password for the self-managed cluster

1. `certificate` — CA certificate chain (PEM format; required for private/self-signed certs)

## Configure network connectivity
<a name="msk-replicator-external-network"></a>

MSK Replicator requires network connectivity to your self-managed Kafka cluster. Supported options:
+ **AWS Site-to-Site VPN** — Connect on-premises networks to your VPC over the internet.
+ **AWS Direct Connect** — Establish a dedicated private network connection from your premises to AWS.

## Configure security groups
<a name="msk-replicator-external-security-groups"></a>

Ensure security groups allow traffic between MSK Replicator and the self-managed cluster on the SASL\$1SSL port (typically 9096). Update both inbound rules on VPC security groups and outbound rules on the self-managed cluster firewall.

# CreateReplicator API examples for self-managed Kafka clusters
<a name="msk-replicator-external-api-examples"></a>

## Forward replication (Self-managed Kafka to MSK Express)
<a name="msk-replicator-external-forward"></a>

Use the following AWS CLI command to create a Replicator that replicates data from your self-managed Kafka cluster to an Amazon MSK Express cluster.

```
aws kafka create-replicator \
  --replicator-name my-selfmanaged-to-msk-replicator \
  --description "Replicating from self-managed Kafka to MSK Express" \
  --service-execution-role-arn arn:aws:iam::123456789012:role/MSKReplicatorRole \
  --kafka-clusters '[
    {
      "apacheKafkaCluster": {
        "bootstrapBrokerString": "broker1:9094,broker2:9094,broker3:9094",
        "apacheKafkaClusterId": "<self-managed-cluster-id>"
      },
      "clientAuthentication": {
        "saslScram": {
          "mechanism": "SHA256",
          "secretArn": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-scram-creds"
        }
      },
      "encryptionInTransit": {
        "encryptionType": "TLS",
        "rootCaCertificate": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-scram-creds"
      },
      "vpcConfig": {
        "subnetIds": ["subnet-aaa","subnet-bbb","subnet-ccc"],
        "securityGroupIds": ["sg-xxxxxxxxxxxxxxxxx"]
      }
    },
    {
      "amazonMskCluster": {
        "mskClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx"
      },
      "vpcConfig": {
        "subnetIds": ["subnet-ddd","subnet-eee","subnet-fff"],
        "securityGroupIds": ["sg-yyyyyyyyy"]
      }
    }]' \
  --replication-info-list '[{
    "sourceKafkaClusterId": "<self-managed-cluster-id>",
    "targetKafkaClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx",
    "targetCompressionType": "NONE",
    "topicReplication": {
      "topicsToReplicate": [".*"],
      "topicNameConfiguration": {"type": "IDENTICAL"},
      "startingPosition": {"type": "EARLIEST"},
      "detectAndCopyNewTopics": true,
      "copyTopicConfigurations": true,
      "copyAccessControlListsForTopics": true
    },
    "consumerGroupReplication": {
      "consumerGroupsToReplicate": [".*"],
      "detectAndCopyNewConsumerGroups": true,
      "synchroniseConsumerGroupOffsets": true
    }}]'
```

## Bidirectional replication example
<a name="msk-replicator-external-bidirectional"></a>

To set up bidirectional replication for rollback capability, both the forward and reverse Replicators must be created with `consumerGroupOffsetSyncMode` set to `ENHANCED`. This ensures consumer group offsets are synchronized in a way that supports seamless cutover in either direction.

Create the forward Replicator (self-managed Kafka to MSK Express) with `ENHANCED` offset sync mode:

```
aws kafka create-replicator \
  --replicator-name my-selfmanaged-to-msk-replicator \
  --description "Replicating from self-managed Kafka to MSK Express" \
  --service-execution-role-arn arn:aws:iam::123456789012:role/MSKReplicatorRole \
  --kafka-clusters '[
    {
      "apacheKafkaCluster": {
        "bootstrapBrokerString": "broker1:9094,broker2:9094,broker3:9094",
        "apacheKafkaClusterId": "<self-managed-cluster-id>"
      },
      "clientAuthentication": {
        "saslScram": {
          "mechanism": "SHA256",
          "secretArn": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-scram-creds"
        }
      },
      "encryptionInTransit": {
        "encryptionType": "TLS",
        "rootCaCertificate": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-ca-cert"
      },
      "vpcConfig": {
        "subnetIds": ["subnet-aaa","subnet-bbb","subnet-ccc"],
        "securityGroupIds": ["sg-xxxxxxxxxxxxxxxxx"]
      }
    },
    {
      "amazonMskCluster": {
        "mskClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx"
      },
      "vpcConfig": {
        "subnetIds": ["subnet-ddd","subnet-eee","subnet-fff"],
        "securityGroupIds": ["sg-yyyyyyyyy"]
      }
    }]' \
  --replication-info-list '[{
    "sourceKafkaClusterId": "<self-managed-cluster-id>",
    "targetKafkaClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx",
    "targetCompressionType": "NONE",
    "topicReplication": {
      "topicsToReplicate": [".*"],
      "topicNameConfiguration": {"type": "IDENTICAL"},
      "startingPosition": {"type": "EARLIEST"},
      "detectAndCopyNewTopics": true,
      "copyTopicConfigurations": true,
      "copyAccessControlListsForTopics": true
    },
    "consumerGroupReplication": {
      "consumerGroupsToReplicate": [".*"],
      "detectAndCopyNewConsumerGroups": true,
      "synchroniseConsumerGroupOffsets": true,
      "consumerGroupOffsetSyncMode": "ENHANCED"
    }}]'
```

Then create the reverse Replicator (MSK Express to self-managed Kafka) also with `ENHANCED` offset sync mode:

```
aws kafka create-replicator \
  --replicator-name my-msk-to-selfmanaged-replicator \
  --description "Reverse replication for rollback" \
  --service-execution-role-arn arn:aws:iam::123456789012:role/MSKReplicatorRole \
  --kafka-clusters '[
    {
      "amazonMskCluster": {
        "mskClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx"
      },
      "vpcConfig": {
        "subnetIds": ["subnet-ddd","subnet-eee","subnet-fff"],
        "securityGroupIds": ["sg-yyyyyyyyy"]
      }
    },
    {
      "apacheKafkaCluster": {
        "bootstrapBrokerString": "broker1:9094,broker2:9094,broker3:9094",
        "apacheKafkaClusterId": "<self-managed-cluster-id>"
      },
      "clientAuthentication": {
        "saslScram": {
          "mechanism": "SHA256",
          "secretArn": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-scram-creds"
        }
      },
      "encryptionInTransit": {
        "encryptionType": "TLS",
        "rootCaCertificate": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-ca-cert"
      },
      "vpcConfig": {
        "subnetIds": ["subnet-aaa","subnet-bbb","subnet-ccc"],
        "securityGroupIds": ["sg-xxxxxxxxxxxxxxxxx"]
      }
    }]' \
  --replication-info-list '[{
    "sourceKafkaClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx",
    "targetKafkaClusterId": "<self-managed-cluster-id>",
    "targetCompressionType": "NONE",
    "topicReplication": {
      "topicsToReplicate": [".*"],
      "topicNameConfiguration": {"type": "IDENTICAL"},
      "startingPosition": {"type": "LATEST"},
      "detectAndCopyNewTopics": true,
      "copyTopicConfigurations": true,
      "copyAccessControlListsForTopics": true
    },
    "consumerGroupReplication": {
      "consumerGroupsToReplicate": [".*"],
      "detectAndCopyNewConsumerGroups": true,
      "synchroniseConsumerGroupOffsets": true,
      "consumerGroupOffsetSyncMode": "ENHANCED"
    }}]'
```

## Verify Replicator status
<a name="msk-replicator-external-verify"></a>

Check the status of your Replicator using the `describe-replicator` CLI command:

```
aws kafka describe-replicator \
  --replicator-arn arn:aws:kafka:us-east-1:123456789012:replicator/my-replicator/xxx
```

The Replicator will progress through `CREATING` → `RUNNING` states. Allow approximately 30 minutes for the Replicator to reach `RUNNING` status.