View a markdown version of this page

CreateReplicator API examples for self-managed Kafka clusters - Amazon Managed Streaming for Apache Kafka

CreateReplicator API examples for self-managed Kafka clusters

Forward replication (Self-managed Kafka to MSK Express)

Use the following AWS CLI command to create a Replicator that replicates data from your self-managed Kafka cluster to an Amazon MSK Express cluster.

aws kafka create-replicator \ --replicator-name my-selfmanaged-to-msk-replicator \ --description "Replicating from self-managed Kafka to MSK Express" \ --service-execution-role-arn arn:aws:iam::123456789012:role/MSKReplicatorRole \ --kafka-clusters '[ { "apacheKafkaCluster": { "bootstrapBrokerString": "broker1:9094,broker2:9094,broker3:9094", "apacheKafkaClusterId": "<self-managed-cluster-id>" }, "clientAuthentication": { "saslScram": { "mechanism": "SHA256", "secretArn": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-scram-creds" } }, "encryptionInTransit": { "encryptionType": "TLS", "rootCaCertificate": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-scram-creds" }, "vpcConfig": { "subnetIds": ["subnet-aaa","subnet-bbb","subnet-ccc"], "securityGroupIds": ["sg-xxxxxxxxxxxxxxxxx"] } }, { "amazonMskCluster": { "mskClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx" }, "vpcConfig": { "subnetIds": ["subnet-ddd","subnet-eee","subnet-fff"], "securityGroupIds": ["sg-yyyyyyyyy"] } }]' \ --replication-info-list '[{ "sourceKafkaClusterId": "<self-managed-cluster-id>", "targetKafkaClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx", "targetCompressionType": "NONE", "topicReplication": { "topicsToReplicate": [".*"], "topicNameConfiguration": {"type": "IDENTICAL"}, "startingPosition": {"type": "EARLIEST"}, "detectAndCopyNewTopics": true, "copyTopicConfigurations": true, "copyAccessControlListsForTopics": true }, "consumerGroupReplication": { "consumerGroupsToReplicate": [".*"], "detectAndCopyNewConsumerGroups": true, "synchroniseConsumerGroupOffsets": true }}]'

Bidirectional replication example

To set up bidirectional replication for rollback capability, both the forward and reverse Replicators must be created with consumerGroupOffsetSyncMode set to ENHANCED. This ensures consumer group offsets are synchronized in a way that supports seamless cutover in either direction.

Create the forward Replicator (self-managed Kafka to MSK Express) with ENHANCED offset sync mode:

aws kafka create-replicator \ --replicator-name my-selfmanaged-to-msk-replicator \ --description "Replicating from self-managed Kafka to MSK Express" \ --service-execution-role-arn arn:aws:iam::123456789012:role/MSKReplicatorRole \ --kafka-clusters '[ { "apacheKafkaCluster": { "bootstrapBrokerString": "broker1:9094,broker2:9094,broker3:9094", "apacheKafkaClusterId": "<self-managed-cluster-id>" }, "clientAuthentication": { "saslScram": { "mechanism": "SHA256", "secretArn": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-scram-creds" } }, "encryptionInTransit": { "encryptionType": "TLS", "rootCaCertificate": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-ca-cert" }, "vpcConfig": { "subnetIds": ["subnet-aaa","subnet-bbb","subnet-ccc"], "securityGroupIds": ["sg-xxxxxxxxxxxxxxxxx"] } }, { "amazonMskCluster": { "mskClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx" }, "vpcConfig": { "subnetIds": ["subnet-ddd","subnet-eee","subnet-fff"], "securityGroupIds": ["sg-yyyyyyyyy"] } }]' \ --replication-info-list '[{ "sourceKafkaClusterId": "<self-managed-cluster-id>", "targetKafkaClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx", "targetCompressionType": "NONE", "topicReplication": { "topicsToReplicate": [".*"], "topicNameConfiguration": {"type": "IDENTICAL"}, "startingPosition": {"type": "EARLIEST"}, "detectAndCopyNewTopics": true, "copyTopicConfigurations": true, "copyAccessControlListsForTopics": true }, "consumerGroupReplication": { "consumerGroupsToReplicate": [".*"], "detectAndCopyNewConsumerGroups": true, "synchroniseConsumerGroupOffsets": true, "consumerGroupOffsetSyncMode": "ENHANCED" }}]'

Then create the reverse Replicator (MSK Express to self-managed Kafka) also with ENHANCED offset sync mode:

aws kafka create-replicator \ --replicator-name my-msk-to-selfmanaged-replicator \ --description "Reverse replication for rollback" \ --service-execution-role-arn arn:aws:iam::123456789012:role/MSKReplicatorRole \ --kafka-clusters '[ { "amazonMskCluster": { "mskClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx" }, "vpcConfig": { "subnetIds": ["subnet-ddd","subnet-eee","subnet-fff"], "securityGroupIds": ["sg-yyyyyyyyy"] } }, { "apacheKafkaCluster": { "bootstrapBrokerString": "broker1:9094,broker2:9094,broker3:9094", "apacheKafkaClusterId": "<self-managed-cluster-id>" }, "clientAuthentication": { "saslScram": { "mechanism": "SHA256", "secretArn": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-scram-creds" } }, "encryptionInTransit": { "encryptionType": "TLS", "rootCaCertificate": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-ca-cert" }, "vpcConfig": { "subnetIds": ["subnet-aaa","subnet-bbb","subnet-ccc"], "securityGroupIds": ["sg-xxxxxxxxxxxxxxxxx"] } }]' \ --replication-info-list '[{ "sourceKafkaClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx", "targetKafkaClusterId": "<self-managed-cluster-id>", "targetCompressionType": "NONE", "topicReplication": { "topicsToReplicate": [".*"], "topicNameConfiguration": {"type": "IDENTICAL"}, "startingPosition": {"type": "LATEST"}, "detectAndCopyNewTopics": true, "copyTopicConfigurations": true, "copyAccessControlListsForTopics": true }, "consumerGroupReplication": { "consumerGroupsToReplicate": [".*"], "detectAndCopyNewConsumerGroups": true, "synchroniseConsumerGroupOffsets": true, "consumerGroupOffsetSyncMode": "ENHANCED" }}]'

Verify Replicator status

Check the status of your Replicator using the describe-replicator CLI command:

aws kafka describe-replicator \ --replicator-arn arn:aws:kafka:us-east-1:123456789012:replicator/my-replicator/xxx

The Replicator will progress through CREATINGRUNNING states. Allow approximately 30 minutes for the Replicator to reach RUNNING status.