

# Understand MSK Connect
<a name="msk-connect"></a>

MSK Connect is a feature of Amazon MSK that makes it easy for developers to stream data to and from their Apache Kafka clusters. MSK Connect uses Kafka Connect versions 2.7.1 or 3.7.x, which are open-source frameworks for connecting Apache Kafka clusters with external systems such as databases, search indexes, and file systems. With MSK Connect, you can deploy fully managed connectors built for Kafka Connect that move data into or pull data from popular data stores like Amazon S3 and Amazon OpenSearch Service. You can deploy connectors developed by 3rd parties like Debezium for streaming change logs from databases into an Apache Kafka cluster, or deploy an existing connector with no code changes. Connectors automatically scale to adjust for changes in load and you pay only for the resources that you use.

Use source connectors to import data from external systems into your topics. With sink connectors, you can export data from your topics to external systems.

MSK Connect supports connectors for any Apache Kafka cluster with connectivity to an Amazon VPC, whether it is an MSK cluster or an independently hosted Apache Kafka cluster. 

MSK Connect continuously monitors connector health and delivery state, patches and manages the underlying hardware, and autoscales the connectors to match changes in throughput.

To get started using MSK Connect, see [Getting started with MSK Connect](msk-connect-getting-started.md). 

To learn about the AWS resources that you can create with MSK Connect, see [Understand connectors](msk-connect-connectors.md), [Create custom plugins](msk-connect-plugins.md), and [Understand MSK Connect workers](msk-connect-workers.md).

For information about the MSK Connect API, see the [Amazon MSK Connect API Reference](https://docs.aws.amazon.com/MSKC/latest/mskc/Welcome.html). 

## Benefits of using Amazon MSK Connect
<a name="msk-connect-benefits"></a>

Apache Kafka is one of the most widely adopted open source streaming platforms for ingesting and processing real-time data streams. With Apache Kafka, you can decouple and independently scale your data-producing and data-consuming applications.

Kafka Connect is an important component of building and running streaming applications with Apache Kafka. Kafka Connect provides a standardized way of moving data between Kafka and external systems. Kafka Connect is highly scalable and can handle large volumes of data Kafka Connect provides a powerful set of API operations and tools for configuring, deploying, and monitoring connectors that move data between Kafka topics and external systems. You can use these tools to customize and extend the functionality of Kafka Connect to meet the specific needs of your streaming application.

You might encounter challenges when you’re operating Apache Kafka Connect clusters on their own, or when you’re trying to migrate open source Apache Kafka Connect applications to AWS. These challenges include time required to setup infrastructure and deploying applications, engineering obstacles when setting up self-managed Apache Kafka Connect clusters, and administrative operational overhead.

To address these challenges, we recommend using Amazon Managed Streaming for Apache Kafka Connect (Amazon MSK Connect) to migrate your open source Apache Kafka Connect applications to AWS. Amazon MSK Connect simplifies using Kafka Connect to stream data to and from between Apache Kafka clusters and external systems, such as databases, search indexes, and file systems.

Here are some of the benefits to migrating to Amazon MSK Connect:
+ **Elimination of operational overhead** — Amazon MSK Connect takes away the operational burden associated with patching, provisioning, and scaling of Apache Kafka Connect clusters. Amazon MSK Connect continuously monitors the health of your Connect clusters and automates patching and version upgrades without causing any disruptions to your workloads.
+ **Automatic restarting of Connect tasks** — Amazon MSK Connect can automatically recover failed tasks to reduce production disruptions. Task failures can be caused by temporary errors, such as breaching the TCP connection limit for Kafka, and task rebalancing when new workers join the consumer group for sink connectors.
+ **Automatic horizontal and vertical scaling** — Amazon MSK Connect enables the connector application to automatically scale to support higher throughputs. Amazon MSK Connect manages scaling for you. You only need to specifying the number of workers in the auto scaling group and the utilization thresholds. You can use the Amazon MSK Connect `UpdateConnector` API operation to vertically scale up or scale down the vCPUs between 1 and 8 vCPUs for supporting variable throughput.
+ **Private network connectivity** — Amazon MSK Connect privately connects to source and sink systems by using AWS PrivateLink and private DNS names.

# Getting started with MSK Connect
<a name="msk-connect-getting-started"></a>

This is a step-by-step tutorial that uses the AWS Management Console to create an MSK cluster and a sink connector that sends data from the cluster to an S3 bucket.

**Topics**
+ [Set up resources required for MSK Connect](mkc-tutorial-setup.md)
+ [Create custom plugin](mkc-create-plugin.md)
+ [Create client machine and Apache Kafka topic](mkc-create-topic.md)
+ [Create connector](mkc-create-connector.md)
+ [Send data to the MSK cluster](mkc-send-data.md)

# Set up resources required for MSK Connect
<a name="mkc-tutorial-setup"></a>

In this step you create the following resources that you need for this getting-started scenario:
+ An Amazon S3 bucket to serve as the destination that receives data from the connector.
+ An MSK cluster to which you will send data. The connector will then read the data from this cluster and send it to the destination S3 bucket.
+ An IAM policy that contains the permissions to write to the destination S3 bucket.
+ An IAM role that allows the connector to write to the destination S3 bucket. You'll add the IAM policy that you create to this role.
+ An Amazon VPC endpoint to make it possible to send data from the Amazon VPC that has the cluster and the connector to Amazon S3.

**To create the S3 bucket**

1. Sign in to the AWS Management Console and open the Amazon S3 console at [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Choose **Create bucket**.

1. For the name of the bucket, enter a descriptive name such as `amzn-s3-demo-bucket-mkc-tutorial`.

1. Scroll down and choose **Create bucket**.

1. In the list of buckets, choose the newly created bucket.

1. Choose **Create folder**.

1. Enter `tutorial` for the name of the folder, then scroll down and choose **Create folder**.

**To create the cluster**

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. In the left pane, under **MSK Clusters**, choose **Clusters**.

1. Choose **Create cluster**.

1. In **Creation method**, choose **Custom create**.

1. For the cluster name enter **mkc-tutorial-cluster**.

1. In **Cluster type**, choose **Provisioned**.

1. Choose **Next**.

1. Under **Networking**, choose an Amazon VPC. Then select the Availability Zones and subnets that you want to use. Remember the IDs of the Amazon VPC and subnets that you selected because you need them later in this tutorial.

1. Choose **Next**.

1. Under **Access control methods** ensure that only **Unauthenticated access** is selected.

1. Under **Encryption** ensure that only **Plaintext** is selected.

1. Continue through the wizard and then choose **Create cluster**. This takes you to the details page for the cluster. On that page, under **Security groups applied**, find the security group ID. Remember that ID because you need it later in this tutorial.

**To create an IAM policy with permissions to write to the S3 bucket**

1. Open the IAM console at [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. On the navigation pane, choose **Policies**.

1. Choose **Create policy**.

1. In **Policy editor**, choose **JSON**, and then replace the JSON in the editor window with the following JSON.

   In the following example, replace *<amzn-s3-demo-bucket-my-tutorial>* with the name of your S3 bucket.

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Sid": "AllowListBucket",
         "Effect": "Allow",
         "Action": [
           "s3:ListBucket",
           "s3:GetBucketLocation"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>"
       },
       {
         "Sid": "AllowObjectActions",
         "Effect": "Allow",
         "Action": [
           "s3:PutObject",
           "s3:GetObject",
           "s3:DeleteObject",
           "s3:AbortMultipartUpload",
           "s3:ListMultipartUploadParts",
           "s3:ListBucketMultipartUploads"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>/*"
       }
     ]
   }
   ```

------

   For instructions about how to write secure policies, see [IAM access control](iam-access-control.md).

1. Choose **Next**.

1. On the **Review and create** page, do the following:

   1. For **Policy name**, enter a descriptive name, such as **mkc-tutorial-policy**.

   1. In **Permissions defined in this policy**, review and/or edit the permissions defined in your policy.

   1. (Optional) To help identify, organize, or search for the policy, choose **Add new tag** to add tags as key-value pairs. For example, add a tag to your policy with the key-value pair of **Environment** and **Test**.

      For more information about using tags, see [Tags for AWS Identity and Access Management resources](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html) in the *IAM User Guide*.

1. Choose **Create policy**.

**To create the IAM role that can write to the destination bucket**

1. On the navigation pane of the IAM console, choose **Roles**, and then choose **Create role**.

1. On the **Select trusted entity** page, do the following:

   1. For **Trusted entity type**, choose **AWS service**.

   1. For **Service or use case**, choose **S3**.

   1. Under **Use case**, choose **S3**.

1. Choose **Next**.

1. On the **Add permissions** page, do the following:

   1. In the search box under **Permissions policies**, enter the name of the policy that you previously created for this tutorial. For example, **mkc-tutorial-policy**. Then, choose the box to the left of the policy name.

   1. (Optional) Set a [permissions boundary](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_boundaries.html). This is an advanced feature that is available for service roles, but not service-linked roles. For information about setting a permissions boundary, see [Creating roles and attaching policies (console)](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_job-functions_create-policies.html) in the *IAM User Guide*.

1. Choose **Next**.

1. On the **Name, review, and create** page, do the following:

   1. For **Role name**, enter a descriptive name, such as **mkc-tutorial-role**.
**Important**  
When you name a role, note the following:  
Role names must be unique within your AWS account, and can't be made unique by case.  
For example, don't create roles named both **PRODROLE** and **prodrole**. When a role name is used in a policy or as part of an ARN, the role name is case sensitive, however when a role name appears to customers in the console, such as during the sign-in process, the role name is case insensitive.
You can't edit the name of the role after it's created because other entities might reference the role.

   1. (Optional) For **Description**, enter a description for the role.

   1. (Optional) To edit the use cases and permissions for the role, in **Step 1: Select trusted entities** or **Step 2: Add permissions** sections, choose **Edit**.

   1. (Optional) To help identify, organize, or search for the role, choose **Add new tag** to add tags as key-value pairs. For example, add a tag to your role with the key-value pair of **ProductManager** and **John**.

      For more information about using tags, see [Tags for AWS Identity and Access Management resources](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html) in the *IAM User Guide*.

1. Review the role, and then choose **Create role**.

**To allow MSK Connect to assume the role**

1. In the IAM console, in the left pane, under **Access management**, choose **Roles**.

1. Find the `mkc-tutorial-role` and choose it.

1. Under the role's **Summary**, choose the **Trust relationships** tab.

1. Choose **Edit trust relationship**.

1. Replace the existing trust policy with the following JSON.

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "kafkaconnect.amazonaws.com"
         },
         "Action": "sts:AssumeRole"
       }
     ]
   }
   ```

------

1. Choose **Update Trust Policy**.

**To create an Amazon VPC endpoint from the cluster's VPC to Amazon S3**

1. Open the Amazon VPC console at [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. In the left pane, choose **Endpoints**.

1. Choose **Create endpoint**.

1. Under **Service Name** choose the **com.amazonaws.us-east-1.s3** service and the **Gateway** type.

1. Choose the cluster's VPC and then select the box to the left of the route table that is associated with the cluster's subnets.

1. Choose **Create endpoint**.

**Next Step**

[Create custom plugin](mkc-create-plugin.md)

# Create custom plugin
<a name="mkc-create-plugin"></a>

A plugin contains the code that defines the logic of the connector. In this step you create a custom plugin that has the code for the Lenses Amazon S3 Sink Connector. In a later step, when you create the MSK connector, you specify that its code is in this custom plugin. You can use the same plugin to create multiple MSK connectors with different configurations.

**To create the custom plugin**

1. Download the [S3 connector](https://www.confluent.io/hub/confluentinc/kafka-connect-s3).

1. Upload the ZIP file to an S3 bucket to which you have access. For information on how to upload files to Amazon S3, see [Uploading objects](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html) in the Amazon S3 user guide.

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. In the left pane expand **MSK Connect**, then choose **Custom plugins**.

1. Choose **Create custom plugin**.

1. Choose **Browse S3**.

1. In the list of buckets find the bucket where you uploaded the ZIP file, and choose that bucket.

1. In the list of objects in the bucket, select the radio button to the left of the ZIP file, then choose the button labeled **Choose**.

1. Enter `mkc-tutorial-plugin` for the custom plugin name, then choose **Create custom plugin**.

It might take AWS a few minutes to finish creating the custom plugin. When the creation process is complete, you see the following message in a banner at the top of the browser window.

```
Custom plugin mkc-tutorial-plugin was successfully created
The custom plugin was created. You can now create a connector using this custom plugin.
```

**Next Step**

[Create client machine and Apache Kafka topic](mkc-create-topic.md)

# Create client machine and Apache Kafka topic
<a name="mkc-create-topic"></a>

In this step you create an Amazon EC2 instance to use as an Apache Kafka client instance. You then use this instance to create a topic on the cluster.

**To create a client machine**

1. Open the Amazon EC2 console at [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Choose **Launch instances**.

1. Enter a **Name** for your client machine, such as **mkc-tutorial-client**.

1. Leave **Amazon Linux 2 AMI (HVM) - Kernel 5.10, SSD Volume Type** selected for **Amazon Machine Image (AMI) type**.

1. Choose the **t2.xlarge** instance type.

1. Under **Key pair (login)**, choose **Create a new key pair**. Enter **mkc-tutorial-key-pair** for **Key pair name**, and then choose **Download Key Pair**. Alternatively, you can use an existing key pair.

1. Choose **Launch instance**.

1. Choose **View Instances**. Then, in the **Security Groups** column, choose the security group that is associated with your new instance. Copy the ID of the security group, and save it for later.

**To allow the newly created client to send data to the cluster**

1. Open the Amazon VPC console at [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. In the left pane, under **SECURITY**, choose **Security Groups**. In the **Security group ID** column, find the security group of the cluster. You saved the ID of this security group when you created the cluster in [Set up resources required for MSK Connect](mkc-tutorial-setup.md). Choose this security group by selecting the box to the left of its row. Make sure no other security groups are simultaneously selected.

1. In the bottom half of the screen, choose the **Inbound rules** tab.

1. Choose **Edit inbound rules**.

1. In the bottom left of the screen, choose **Add rule**.

1. In the new rule, choose **All traffic** in the **Type** column. In the field to the right of the **Source** column, enter the ID of the security group of the client machine. This is the security group ID that you saved after you created the client machine.

1. Choose **Save rules**. Your MSK cluster will now accept all traffic from the client you created in the previous procedure.

**To create a topic**

1. Open the Amazon EC2 console at [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. In the table of instances choose `mkc-tutorial-client`.

1. Near the top of the screen, choose **Connect**, then follow the instructions to connect to the instance.

1. Install Java on the client instance by running the following command:

   ```
   sudo yum install java-1.8.0
   ```

1. Run the following command to download Apache Kafka. 

   ```
   wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
   ```
**Note**  
If you want to use a mirror site other than the one used in this command, you can choose a different one on the [Apache](https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.11-1.1.1.tgz) website.

1. Run the following command in the directory where you downloaded the TAR file in the previous step.

   ```
   tar -xzf kafka_2.12-2.2.1.tgz
   ```

1. Go to the **kafka\$12.12-2.2.1** directory.

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. In the left pane choose **Clusters**, then choose the name `mkc-tutorial-cluster`.

1. Choose **View client information**.

1. Copy the **Plaintext** connection string.

1. Choose **Done**.

1. Run the following command on the client instance (`mkc-tutorial-client`), replacing *bootstrapServerString* with the value that you saved when you viewed the cluster's client information.

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server bootstrapServerString --replication-factor 2 --partitions 1 --topic mkc-tutorial-topic
   ```

   If the command succeeds, you see the following message: `Created topic mkc-tutorial-topic.`

**Next Step**

[Create connector](mkc-create-connector.md)

# Create connector
<a name="mkc-create-connector"></a>

This procedure describes how to create a connector using the AWS Management Console.

**To create the connector**

1. Sign in to the AWS Management Console, and open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. In the left pane, expand **MSK Connect**, then choose **Connectors**.

1. Choose **Create connector**.

1. In the list of plugins, choose `mkc-tutorial-plugin`, then choose **Next**.

1. For the connector name enter `mkc-tutorial-connector`.

1. In the list of clusters, choose `mkc-tutorial-cluster`.

1. In the **Connector network settings** section, choose one of the following for network type:
   + **IPv4** (default) - For connectivity to destinations over IPv4 only
   + **Dual-stack** - For connectivity to destinations over both IPv4 and IPv6 (only available if your subnets have IPv4 and IPv6 CIDR blocks associated with them)

1. Copy the following configuration and paste it into the connector configuration field.

   Make sure that you replace region with the code of the AWS Region where you're creating the connector. Also, replace the Amazon S3 bucket name *<amzn-s3-demo-bucket-my-tutorial>* with the name of your bucket in the following example.

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   s3.region=us-east-1
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   flush.size=1
   schema.compatibility=NONE
   tasks.max=2
   topics=mkc-tutorial-topic
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   storage.class=io.confluent.connect.s3.storage.S3Storage
   s3.bucket.name=<amzn-s3-demo-bucket-my-tutorial>
   topics.dir=tutorial
   ```

1. Under **Access permissions** choose `mkc-tutorial-role`.

1. Choose **Next**. On the **Security** page, choose **Next** again.

1. On the **Logs** page choose **Next**.

1. On the **Review and create** page, review your connector configuration and choose **Create connector**.

**Next Step**

[Send data to the MSK cluster](mkc-send-data.md)

# Send data to the MSK cluster
<a name="mkc-send-data"></a>

In this step you send data to the Apache Kafka topic that you created earlier, and then look for that same data in the destination S3 bucket.

**To send data to the MSK cluster**

1. In the `bin` folder of the Apache Kafka installation on the client instance, create a text file named `client.properties` with the following contents.

   ```
   security.protocol=SASL_SSL
   sasl.mechanism=AWS_MSK_IAM
   ```

1. Run the following command to create a console producer. Replace *BootstrapBrokerString* with the value that you obtained when you ran the previous command.

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list BootstrapBrokerString --producer.config client.properties --topic mkc-tutorial-topic
   ```

1. Enter any message that you want, and press **Enter**. Repeat this step two or three times. Every time you enter a line and press **Enter**, that line is sent to your Apache Kafka cluster as a separate message.

1. Look in the destination Amazon S3 bucket to find the messages that you sent in the previous step.

# Understand connectors
<a name="msk-connect-connectors"></a>

A connector integrates external systems and Amazon services with Apache Kafka by continuously copying streaming data from a data source into your Apache Kafka cluster, or continuously copying data from your cluster into a data sink. A connector can also perform lightweight logic such as transformation, format conversion, or filtering data before delivering the data to a destination. Source connectors pull data from a data source and push this data into the cluster, while sink connectors pull data from the cluster and push this data into a data sink.

The following diagram shows the architecture of a connector. A worker is a Java virtual machine (JVM) process that runs the connector logic. Each worker creates a set of tasks that run in parallel threads and do the work of copying the data. Tasks don't store state, and can therefore be started, stopped, or restarted at any time in order to provide a resilient and scalable data pipeline.

![\[Diagram showing the architecture of a connector cluster.\]](http://docs.aws.amazon.com/msk/latest/developerguide/images/mkc-worker-architecture.png)


# Understand connector capacity
<a name="msk-connect-capacity"></a>

The total capacity of a connector depends on the number of workers that the connector has, as well as on the number of MSK Connect Units (MCUs) per worker. Each MCU represents 1 vCPU of compute and 4 GiB of memory. The MCU memory pertains to the total memory of a worker instance and not the heap memory in use.

MSK Connect workers consume IP addresses in the customer-provided subnets. Each worker uses one IP address from one of the customer-provided subnets. You should ensure that you have enough available IP addresses in the subnets provided to a CreateConnector request to account for their specified capacity, especially when autoscaling connectors where the number of workers can fluctuate.

To create a connector, you must choose between one of the following two capacity modes.
+ *Provisioned* - Choose this mode if you know the capacity requirements for your connector. You specify two values:
  + The number of workers.
  + The number of MCUs per worker.
+ *Autoscaled* - Choose this mode if the capacity requirements for your connector are variable or if you don't know them in advance. When you use autoscaled mode, Amazon MSK Connect overrides your connector's `tasks.max` property with a value that is proportional to the number of workers running in the connector and the number of MCUs per worker. 

  You specify three sets of values:
  + The minimum and maximum number of workers.
  + The scale-in and scale-out percentages for CPU utilization, which is determined by the `CpuUtilization` metric. When the `CpuUtilization` metric for the connector exceeds the scale-out percentage, MSK Connect increases the number of workers that are running in the connector. When the `CpuUtilization` metric goes below the scale-in percentage, MSK Connect decreases the number of workers. The number of workers always remains within the minimum and maximum numbers that you specify when you create the connector.
  + The number of MCUs per worker.
  + (Optional) *Maximum autoscaling task count* - The maximum number of tasks allocated to the connector during autoscaling operations. This parameter allows you to set an upper limit on task creation, providing greater control over resource utilization and parallelism in relation to your Kafka topic partitions.

For more information about workers, see [Understand MSK Connect workers](msk-connect-workers.md), and for more information about maximum autoscaling task count, see [Understand maximum autoscaling task count](msk-connect-max-autoscaling-task-count.md). To learn about MSK Connect metrics, see [Monitoring Amazon MSK Connect](mkc-monitoring-overview.md).

# Understand maximum autoscaling task count
<a name="msk-connect-max-autoscaling-task-count"></a>

The `maxAutoscalingTaskCount` parameter is an optional capacity field available for autoscaling connectors in Amazon MSK Connect. This parameter allows you to set an upper limit on the maximum number of tasks that can be created during connector autoscaling operations, providing greater control over resource utilization and performance.

When you use autoscaled capacity mode, Amazon MSK Connect automatically overrides your connector's `tasks.max` property with a value proportional to the number of workers and MCUs per worker. The `maxAutoscalingTaskCount` parameter provides an additional configurable option to limit the maximum number of tasks created for your connector.

This capability is particularly useful when you want to control the level of parallelism in relation to the number of topic partitions in your Kafka cluster. By setting this limit, you can optimize performance and prevent inefficient task distribution that might occur when the automatically calculated task count exceeds your workload requirements.

## Configuration requirements
<a name="msk-connect-max-autoscaling-task-count-requirements"></a>

The `maxAutoscalingTaskCount` parameter must meet the following requirement:

```
maxAutoscalingTaskCount ≥ maxWorkerCount
```

This requirement ensures efficient resource utilization by maintaining at least one task per worker. The system enforces this minimum to optimize connector functionality.

When you specify `maxAutoscalingTaskCount`, the limit is applied immediately upon connector creation and during all subsequent scaling events. As the number of workers increases or decreases during autoscaling operations, the system continues to honor this limit. The `tasks.max` value adjusts proportionally to the number of workers and MCUs per worker but never exceeds the configured `maxAutoscalingTaskCount` value.

If you don't specify this parameter, the connector uses the standard calculation without any limit: `tasks.max = workerCount × mcuCount × tasksPerMcu` (where tasksPerMcu is 2). 

## When to use maxAutoscalingTaskCount
<a name="msk-connect-max-autoscaling-task-count-when-to-use"></a>

Consider using `maxAutoscalingTaskCount` in the following scenarios:
+ *Limited partition count*: When your Kafka topics have a fixed number of partitions that is lower than the automatically calculated task count, setting a limit prevents the creation of idle tasks with no work to perform.
+ *Performance optimization*: When you've identified that a specific task count provides optimal throughput for your workload, you can cap the maximum tasks to maintain consistent performance.
+ *Resource management*: When you want to control the maximum parallelism and resource consumption of your connector regardless of how many workers are running.

## Example
<a name="msk-connect-max-autoscaling-task-count-example"></a>

For a connector with the following configuration:

```
minWorkerCount: 1
maxWorkerCount: 4
mcuCount: 8
maxAutoscalingTaskCount: 15
```

Without `maxAutoscalingTaskCount`, when scaled to 4 workers, the connector would create 64 tasks (4 workers × 8 MCUs × 2 tasks per MCU). With `maxAutoscalingTaskCount` set to 15, the connector creates only 15 tasks, which may be more appropriate if your Kafka topic has 15 or fewer partitions.

# Configure dual-stack network type
<a name="msk-connect-dual-stack"></a>

Amazon MSK Connect supports dual-stack network type for new connectors. With dual-stack networking, your connectors can connect to destinations over both IPv4 and IPv6. Note that IPv6 connectivity is only available in dual-stack mode (IPv4 \$1 IPv6) - IPv6-only networking is not supported.

By default, new connectors use IPv4 network type. To create a connector with dual-stack network type, make sure you've fulfilled the prerequisites described in the following section. Note that, once you create a connector using dual-stack network type, you cannot modify its network type. To change network types, you must delete and recreate the connector.

Amazon MSK Connect also supports service API endpoint connectivity over both IPv6 and IPv4. To use IPv6 connectivity for API calls, you need to use the dual-stack endpoints. For more information about MSK Connect service endpoints, see [Amazon MSK Connect endpoints and quotas](https://docs.aws.amazon.com/general/latest/gr/msk-connect.html).

## Prerequisites for using dual-stack network type
<a name="dual-stack-prerequisites"></a>

Before you configure dual-stack network type for your connectors, make sure that all subnets you provide during connector creation have both IPv6 and IPv4 CIDR blocks assigned.

## Considerations for using dual-stack network type
<a name="dual-stack-considerations"></a>
+ IPv6 support is currently available only in dual-stack mode (IPv4 \$1 IPv6), not as IPv6-only
+ Connectors with dual-stack enabled can connect over both IPv4 and IPv6 to both MSK and Sink or Source data systems
+ Network type cannot be modified after connector creation - you must delete and recreate the connector to change network types
+ All subnets specified during the connector creation must support dual-stack for the connector creation to succeed with dual-stack network type
+ If using dual-stack subnets but no network type is specified, the connector will default to IPv4-only for backwards compatibility
+ For existing connectors, you cannot update network type - you must delete and recreate the connector to change network types
+ Using dual-stack networking doesn't incur additional costs

# Create a connector
<a name="mkc-create-connector-intro"></a>

This procedure describes how to create a connector using the AWS Management Console.

**Creating a connector using the AWS Management Console**

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. In the left pane, under **MSK Connect**, choose **Connectors**.

1. Choose **Create connector**.

1. You can choose between using an existing custom plugin to create the connector, or creating a new custom plugin first. For information on custom plugins and how to create them, see [Create custom plugins](msk-connect-plugins.md). In this procedure, let's assume you have a custom plugin that you want to use. In the list of custom plugins, find the one that you want to use, and select the box to its left, then choose **Next**.

1. Enter a name and, optionally, a description.

1. Choose the cluster that you want to connect to.

1. In the **Connector network settings** section, choose one of the following for network type:
   + **IPv4** (default) - For connectivity to destinations over IPv4 only
   + **Dual-stack** - For connectivity to destinations over both IPv4 and IPv6 (only available if your subnets have IPv4 and IPv6 CIDR blocks associated with them)

1. Specify the connector configuration. The configuration parameters that you need to specify depend on the type of connector that you want to create. However, some parameters are common to all connectors, for example, the `connector.class` and `tasks.max` parameters. The following is an example configuration for the [Confluent Amazon S3 Sink Connector](https://www.confluent.io/hub/confluentinc/kafka-connect-s3).

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   tasks.max=2
   topics=my-example-topic
   s3.region=us-east-1
   s3.bucket.name=amzn-s3-demo-bucket
   flush.size=1
   storage.class=io.confluent.connect.s3.storage.S3Storage
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   key.converter=org.apache.kafka.connect.storage.StringConverter
   value.converter=org.apache.kafka.connect.storage.StringConverter
   schema.compatibility=NONE
   ```

1. Next, you configure your connector capacity. You can choose between two capacity modes: provisioned and auto scaled. For information about these two options, see [Understand connector capacity](msk-connect-capacity.md).

1. (Optional) In the **Maximum Autoscaling Task Count** section, use the Maximum Autoscaling Task Count field to enter the maximum number of tasks you want to allocate to the connector during autoscaling operations. The value must be at least equal to your maximum worker count. If you don't specify a value, the connector uses the standard calculation without any limit. For more information, see [Understand maximum autoscaling task count](msk-connect-max-autoscaling-task-count.md).

1. Choose either the default worker configuration or a custom worker configuration. For information about creating custom worker configurations, see [Understand MSK Connect workers](msk-connect-workers.md).

1. Next, you specify the service execution role. This must be an IAM role that MSK Connect can assume, and that grants the connector all the permissions that it needs to access the necessary AWS resources. Those permissions depend on the logic of the connector. For information about how to create this role, see [Understand service execution role](msk-connect-service-execution-role.md).

1. Choose **Next**, review the security information, then choose **Next** again.

1. Specify the logging options that you want, then choose **Next**. For information about logging, see [Logging for MSK Connect](msk-connect-logging.md).

1. On the **Review and create** page, review your connector configuration and choose **Create connector**.

To use the MSK Connect API to create a connector, see [CreateConnector](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateConnector.html). 

You can use `UpdateConnector` API to modify the connector's configuration. For more information, see [Update a connector](mkc-update-connector.md).

# Update a connector
<a name="mkc-update-connector"></a>

This procedure describes how to update the configuration of an existing MSK Connect connector using the AWS Management Console.

**Updating connector configuration using the AWS Management Console**

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. In the left pane, under **MSK Connect**, choose **Connectors**.

1. Select an existig connector.

1. Choose **Edit connector configuration**.

1. Update the connector configuration. You can't override `connector.class` using UpdateConnector. The following example shows an example configuration for the Confluent Amazon S3 Sink connector. 

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   tasks.max=2
   topics=my-example-topic
   s3.region=us-east-1
   s3.bucket.name=amzn-s3-demo-bucket
   flush.size=1
   storage.class=io.confluent.connect.s3.storage.S3Storage
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   key.converter=org.apache.kafka.connect.storage.StringConverter
   value.converter=org.apache.kafka.connect.storage.StringConverter
   schema.compatibility=NONE
   ```

1. Choose **Submit**.

1. You can then monitor the current state of the operation in the **Operations** tab of the connector. 

To use the MSK Connect API to update the configuration of a connector, see [UpdateConnector](https://docs.aws.amazon.com/MSKC/latest/mskc/API_UpdateConnector.html).

# Connecting from connectors
<a name="msk-connect-from-connectors"></a>

The following best practices can improve the performance of your connectivity to Amazon MSK Connect.

## Do not overlap IPs for Amazon VPC peering or Transit Gateway
<a name="CIDR-ip-ranges"></a>

If you are using Amazon VPC peering or Transit Gateway with Amazon MSK Connect, do not configure your connector for reaching the peered VPC resources with IPs in the CIDR ranges:
+ "10.99.0.0/16"
+ "192.168.0.0/16"
+ "172.21.0.0/16"

# Create custom plugins
<a name="msk-connect-plugins"></a>

A plugin is an AWS resource that contains the code that defines your connector logic. You upload a JAR file (or a ZIP file that contains one or more JAR files) to an S3 bucket, and specify the location of the bucket when you create the plugin. When the plugin is created, MSK Connect copies the contents of the S3 object at that point in time. It does not maintain a link to the S3 object, so any subsequent modifications to the object will not affect the plugin or its connectors. When you create a connector, you specify the plugin that you want MSK Connect to use for it. The relationship of plugins to connectors is one-to-many: you can create one or more connectors from the same plugin.

**Note**  
Custom plugins cannot be updated in place. To use a new version of your plugin code, delete all connectors that reference the plugin, delete the plugin, and then recreate it.

**Dependency packaging for custom plugins**  
We recommend that you include all required JAR files and dependencies for your plugin. Package your connector as one of the following:  
A ZIP file that contains all required JAR files and dependencies for the plugin.
A single uber JAR that contains all the class files for the plugin and its dependencies.
Not bundling your plugin dependencies may impact availability or compatibility in the runtime environment and cause unexpected errors.

For information on how to develop the code for a connector, see the [Connector Development Guide](https://kafka.apache.org/documentation/#connect_development)in the Apache Kafka documentation.

**Creating a custom plugin using the AWS Management Console**

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. In the left pane, under **MSK Connect**, choose **Custom plugins**.

1. Choose **Create custom plugin**.

1. Choose **Browse S3**.

1. In the list of S3 buckets, choose the bucket that has the JAR or ZIP file for the plugin.

1. In the list of object, select the box to the left of the JAR or ZIP file for the plugin, then choose **Choose**.

1. Choose **Create custom plugin**.

To use the MSK Connect API to create a custom plugin, see [CreateCustomPlugin](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateCustomPlugin.html).

# Understand MSK Connect workers
<a name="msk-connect-workers"></a>

A worker is a Java virtual machine (JVM) process that runs the connector logic. Each worker creates a set of tasks that run in parallel threads and do the work of copying the data. Tasks don't store state, and can therefore be started, stopped, or restarted at any time in order to provide a resilient and scalable data pipeline. Changes to the number of workers, whether due to a scaling event or due to unexpected failures, are automatically detected by the remaining workers. They coordinate to rebalance tasks across the set of remaining workers. Connect workers use Apache Kafka's consumer groups to coordinate and rebalance.

If your connector's capacity requirements are variable or difficult to estimate, you can let MSK Connect scale the number of workers as needed between a lower limit and an upper limit that you specify. Alternatively, you can specify the exact number of workers that you want to run your connector logic. For more information, see [Understand connector capacity](msk-connect-capacity.md).

**MSK Connect workers consume IP addresses**  
MSK Connect workers consume IP addresses in the customer-provided subnets. Each worker uses one IP address from one of the customer-provided subnets. You should ensure that you have enough available IP addresses in the subnets provided to a CreateConnector request to account for their specified capacity, especially when autoscaling connectors where the number of workers can fluctuate.

## Default worker configuration
<a name="msk-connect-default-worker-config"></a>

MSK Connect provides the following default worker configuration:

```
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```

# Supported worker configuration properties
<a name="msk-connect-supported-worker-config-properties"></a>

MSK Connect provides a default worker configuration. You also have the option to create a custom worker configuration to use with your connectors. The following list includes information about the worker configuration properties that Amazon MSK Connect does or does not support.
+ The `key.converter` and `value.converter` properties are required.
+ MSK Connect supports the following `producer.` configuration properties.

  ```
  producer.acks
  producer.batch.size
  producer.buffer.memory
  producer.compression.type
  producer.enable.idempotence
  producer.key.serializer
  producer.linger.ms
  producer.max.request.size
  producer.metadata.max.age.ms
  producer.metadata.max.idle.ms
  producer.partitioner.class
  producer.reconnect.backoff.max.ms
  producer.reconnect.backoff.ms
  producer.request.timeout.ms
  producer.retry.backoff.ms
  producer.value.serializer
  ```
+ MSK Connect supports the following `consumer.` configuration properties.

  ```
  consumer.allow.auto.create.topics
  consumer.auto.offset.reset
  consumer.check.crcs
  consumer.fetch.max.bytes
  consumer.fetch.max.wait.ms
  consumer.fetch.min.bytes
  consumer.heartbeat.interval.ms
  consumer.key.deserializer
  consumer.max.partition.fetch.bytes
  consumer.max.poll.interval.ms
  consumer.max.poll.records
  consumer.metadata.max.age.ms
  consumer.partition.assignment.strategy
  consumer.reconnect.backoff.max.ms
  consumer.reconnect.backoff.ms
  consumer.request.timeout.ms
  consumer.retry.backoff.ms
  consumer.session.timeout.ms
  consumer.value.deserializer
  ```
+ All other configuration properties that don't start with the `producer.` or `consumer.` prefixes are supported *except* for the following properties. 

  ```
  access.control.
  admin.
  admin.listeners.https.
  client.
  connect.
  inter.worker.
  internal.
  listeners.https.
  metrics.
  metrics.context.
  rest.
  sasl.
  security.
  socket.
  ssl.
  topic.tracking.
  worker.
  bootstrap.servers
  config.storage.topic
  connections.max.idle.ms
  connector.client.config.override.policy
  group.id
  listeners
  metric.reporters
  plugin.path
  receive.buffer.bytes
  response.http.headers.config
  scheduled.rebalance.max.delay.ms
  send.buffer.bytes
  status.storage.topic
  ```

For more information about worker configuration properties and what they represent, see [Kafka Connect Configs](https://kafka.apache.org/documentation/#connectconfigs) in the Apache Kafka documentation.

# Create a custom worker configuration
<a name="msk-connect-create-custom-worker-config"></a>

This procedure describes how to create a custom worker configuration using the AWS Management Console.

**Creating a custom worker configuration using the AWS Management Console**

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. In the left pane, under **MSK Connect**, choose **Worker configurations**.

1. Choose **Create worker configuration**.

1. Enter a name and an optional description, then add the properties and values that you want to set them to.

1. Choose **Create worker configuration**.

To use the MSK Connect API to create a worker configuration, see [CreateWorkerConfiguration](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateWorkerConfiguration.html).

# Manage source connector offsets using `offset.storage.topic`
<a name="msk-connect-manage-connector-offsets"></a>

This section provides information to help you manage source connector offsets using the *offset storage topic*. The offset storage topic is an internal topic that Kafka Connect uses to store connector and task configuration offsets.

## Considerations
<a name="msk-connect-manage-connector-offsets-considerations"></a>

Consider the following when you manage source connector offsets.
+ To specify an offset storage topic, provide the name of the Kafka topic where connector offsets are stored as the value for `offset.storage.topic` in your worker configuration.
+ Use caution when you make changes to a connector configuration. Changing configuration values may result in unintended connector behavior if a source connector uses values from the configuration to key offset records. We recommend that you refer to your plugin's documentation for guidance.
+ **Customize default number of partitions** – In addition to customizing the worker configuration by adding `offset.storage.topic`, you can customize the number of partitions for the offset and status storage topics. Default partitions for internal topics are as follows.
  + `config.storage.topic`: 1, not configurable, must be single partition topic
  + `offset.storage.topic`: 25, configurable by providing `offset.storage.partitions`
  + `status.storage.topic`: 5, configurable by providing `status.storage.partitions`
+ **Manually deleting topics** – Amazon MSK Connect creates new Kafka connect internal topics (topic name starts with `__amazon_msk_connect`) on every deployment of connectors. Old topics that are attached to deleted connectors are not automatically removed because internal topics, such as `offset.storage.topic`, can be reused among connectors. However, you can manually delete unused internal topics created by MSK Connect. The internal topics are named following the format `__amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id`.

  The regular expression `__amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id` can be used to delete the internal topics. You should not delete an internal topic that is currently in use by a running connector.
+ **Using the same name for the internal topics created by MSK Connect** – If you want to reuse the offset storage topic to consume offsets from a previously created connector, you must give the new connector the same name as the old connector. The `offset.storage.topic` property can be set using the worker configuration to assign the same name to the `offset.storage.topic` and reused between different connectors. This configuration is described in [Managing connector offsets](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-create-custom-worker-config). MSK Connect does not allow different connectors to share `config.storage.topic` and `status.storage.topic`. Those topics are created each time you create a new connector in MSKC. They are automatically named following the format `__amazon_msk_connect_<status|configs>_connector_name_connector_id`, and so are different across the different connectors that you create.

# Use the default offset storage topic
<a name="msk-connect-default-offset-storage-topic"></a>

By default, Amazon MSK Connect generates a new offset storage topic on your Kafka cluster for each connector that you create. MSK constructs the default topic name using parts of the connector ARN. For example, `__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2`. 

# Use custom offset storage topic
<a name="msk-connect-set-offset-storage-topic"></a>

To provide offset continuity between source connectors, you can use an offset storage topic of your choice instead of the default topic. Specifying an offset storage topic helps you accomplish tasks like creating a source connector that resumes reading from the last offset of a previous connector.

To specify an offset storage topic, you supply a value for the `offset.storage.topic` property in your worker configuration before you create a connector. If you want to reuse the offset storage topic to consume offsets from a previously created connector, you must give the new connector the same name as the old connector. If you create a custom offset storage topic, you must set [https://kafka.apache.org/27/documentation.html#topicconfigs_cleanup.policy](https://kafka.apache.org/27/documentation.html#topicconfigs_cleanup.policy) to `compact` in your topic configuration.

**Note**  
If you specify an offset storage topic when you create a *sink* connector, MSK Connect creates the topic if it does not already exist. However, the topic will not be used to store connector offsets.   
Sink connector offsets are instead managed using the Kafka consumer group protocol. Each sink connector creates a group named `connect-{CONNECTOR_NAME}`. As long as the consumer group exists, any successive sink connectors that you create with the same `CONNECTOR_NAME` value will continue from the last committed offset.

**Important**  
If you want to update an existing connector configuration while maintaining offset continuity, use the UpdateConnector API. For more information, see [Update a connector](mkc-update-connector.md).

**Example : Specifying an offset storage topic when recreating a source connector**  
If you need to delete and recreate a connector while maintaining offset continuity, you can specify an offset storage topic in your worker configuration. For example, suppose you have a change data capture (CDC) connector and you want to recreate it without losing your place in the CDC stream. The following steps demonstrate how to accomplish this task.  

1. On your client machine, run the following command to find the name of your connector's offset storage topic. Replace `<bootstrapBrokerString>` with your cluster's bootstrap broker string. For instructions on getting your bootstrap broker string, see [Get the bootstrap brokers for an Amazon MSK cluster](msk-get-bootstrap-brokers.md).

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server <bootstrapBrokerString>
   ```

   The following output shows a list of all cluster topics, including any default internal connector topics. In this example, the existing CDC connector uses the [default offset storage topic](msk-connect-default-offset-storage-topic.md) created by MSK Connect. This is why the offset storage topic is called `__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2`.

   ```
   __consumer_offsets
   __amazon_msk_canary
   __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   my-msk-topic-1
   my-msk-topic-2
   ```

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk).

1. Choose your connector from the **Connectors** list. Copy and save the contents of the **Connector configuration** field so that you can modify it and use it to create the new connector.

1. Choose **Delete** to delete the connector. Then enter the connector name in the text input field to confirm deletion.

1. Create a custom worker configuration with values that fit your scenario. For instructions, see [Create a custom worker configuration](msk-connect-create-custom-worker-config.md).

   In your worker configuration, you must specify the name of the offset storage topic that you previously retrieved as the value for `offset.storage.topic` like in the following configuration. 

   ```
   config.providers.secretManager.param.aws.region=eu-west-3
   key.converter=<org.apache.kafka.connect.storage.StringConverter>
   value.converter=<org.apache.kafka.connect.storage.StringConverter>
   config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
   config.providers=secretManager
   offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   ```

1. 
**Important**  
You must give your new connector the same name as the old connector.

   Create a new connector using the worker configuration that you set up in the previous step. For instructions, see [Create a connector](mkc-create-connector-intro.md).

# Tutorial: Externalizing sensitive information using config providers
<a name="msk-connect-config-provider"></a>

This example shows how to externalize sensitive information for Amazon MSK Connect using an open source configuration provider. A configuration providers lets you specify variables instead of plaintext in a connector or worker configuration, and workers running in your connector resolve these variables at runtime. This prevents credentials and other secrets from being stored in plaintext. The configuration provider in the example supports retrieving configuration parameters from AWS Secrets Manager, Amazon S3 and Systems Manager (SSM). In [Step 2](#msk-connect-config-providers), you can see how to set up storage and retrieval of sensitive information for the service that you want to configure.

## Considerations
<a name="msk-connect-config-providers-considerations"></a>

Consider the following while using the MSK config provider with Amazon MSK Connect:
+ Assign appropriate permissions when using the config providers to the IAM Service Execution Role.
+ Define the config providers in worker configurations and their implementation in the connector configuration.
+ Sensitive configuration values can appear in connector logs if a plugin does not define those values as secret. Kafka Connect treats undefined configuration values the same as any other plaintext value. To learn more, see [Preventing secrets from appearing in connector logs](msk-connect-logging.md#msk-connect-logging-secrets).
+ By default, MSK Connect frequently restarts a connector when the connector uses a configuration provider. To turn off this restart behavior, you can set the `config.action.reload` value to `none` in your connector configuration.

## Create a custom plugin and upload to S3
<a name="msk-connect-config-providers-create-custom-plugin"></a>

 To create a custom-plugin, create a zip file that contains the connector and the msk-config-provider by running the following commands on your local machine.

**To create a custom plugin using a terminal window and Debezium as the connector**

Use the AWS CLI to run commands as a superuser with credentials that allow you to access your AWS S3 bucket. For information on installing and setting up the AWS CLI, see [Getting started with the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html) in the *AWS Command Line Interface User Guide*. For information on using the AWS CLI with Amazon S3, see [Using Amazon S3 with the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-services-s3.html) in the *AWS Command Line Interface User Guide*.

1. In a terminal window, create a folder named `custom-plugin` in your workspace using the following command.

   ```
   mkdir custom-plugin && cd custom-plugin
   ```

1. Download the latest stable release of the **MySQL Connector Plug-in** from the [Debezium site](https://debezium.io/releases/) using the following command.

   ```
   wget https://repo1.maven.org/maven2/io/debezium/debezium-connectormysql/
   2.2.0.Final/debezium-connector-mysql-2.2.0.Final-plugin.tar.gz
   ```

   Extract the downloaded gzip file in the `custom-plugin` folder using the following command.

   ```
   tar xzf debezium-connector-mysql-2.2.0.Final-plugin.tar.gz
   ```

1. Download the [MSK config provider zip file](https://github.com/aws-samples/msk-config-providers/releases/download/r0.4.0/msk-config-providers-0.4.0-with-dependencies.zip) using the following command.

   ```
   wget https://github.com/aws-samples/msk-config-providers/releases/download/r0.4.0/msk-config-providers-0.4.0-with-dependencies.zip
   ```

   Extract the downloaded zip file in the `custom-plugin` folder using the follwoing command.

   ```
   unzip msk-config-providers-0.4.0-with-dependencies.zip
   ```

1. Zip the contents of the MSK config provider from the above step and the custom connector into a single file named `custom-plugin.zip`.

   ```
   zip -r ../custom-plugin.zip * 
   ```

1. Upload the file to S3 to be referenced later.

   ```
   aws s3 cp ../custom-plugin.zip s3:<S3_URI_BUCKET_LOCATION>
   ```

1. On the Amazon MSK console, under the **MSK Connect** section, choose **Custom Plugin**, then choose **Create custom plugin** and browse the **s3:<*S3\$1URI\$1BUCKET\$1LOCATION*>** S3 bucket to select the custom plugin ZIP file you just uploaded.  
![\[Amazon S3 bucket interface showing a single custom-plugin.zip file in the debezium folder.\]](http://docs.aws.amazon.com/msk/latest/developerguide/images/s3-object-browser.png)

1. Enter **debezium-custom-plugin** for the plugin name. Optionally, enter a description and choose **Create Custom Plugin**.  
![\[Amazon S3 bucket interface showing a single custom-plugin.zip file in the debezium folder.\]](http://docs.aws.amazon.com/msk/latest/developerguide/images/create-custom-plugin.png)

## Configure parameters and permissions for different providers
<a name="msk-connect-config-providers"></a>

You can configure parameter values in these three services:
+ Secrets Manager
+ Systems Manager Parameter Store
+ S3 - Simple Storage Service

Choose one of the tabs below for instructions on setting up parameters and relevant permissions for that service.

------
#### [ Configure in Secrets Manager ]

**To configure parameter values in Secrets Manager**

1. Open the [Secrets Manager console](https://console.aws.amazon.com/secretsmanager/).

1. Create a new secret to store your credentials or secrets. For instructions, see [Create an AWS Secrets Manager secret](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_secret.html) in the *AWS Secrets Manager User Guide*.

1. Copy your secret's ARN.

1. Add the Secrets Manager permissions from the following example policy to your [Service execution role](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html). Replace the example ARN, `arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234`, with the ARN of your secret.

1. Add worker configuration and connector instructions.  
****  

   ```
   {
           "Version":"2012-10-17",		 	 	 
           "Statement": [
               {
                   "Effect": "Allow",
                   "Action": [
                       "secretsmanager:GetResourcePolicy",
                       "secretsmanager:GetSecretValue",
                       "secretsmanager:DescribeSecret",
                       "secretsmanager:ListSecretVersionIds"
                   ],
                   "Resource": [
                   "arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234"
                   ]
               }
           ]
       }
   ```

1. For using the Secrets Manager configuration provider, copy the following lines of code to the worker configuration textbox in Step 3:

   ```
   # define name of config provider:
   
   config.providers = secretsmanager
   
   # provide implementation classes for secrets manager:
   
   config.providers.secretsmanager.class = com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   config.providers.secretsmanager.param.region = us-east-1
   ```

1. For the secrets manager configuration provider, copy the following lines of code in the connector configuration in Step 4.

   ```
   #Example implementation for secrets manager variable
   database.user=${secretsmanager:MSKAuroraDBCredentials:username}
   database.password=${secretsmanager:MSKAuroraDBCredentials:password}
   ```

You may also use the above step with more configuration providers.

------
#### [ Configure in Systems Manager Parameter Store ]

**To configure parameter values in Systems Manager Parameter Store**

1. Open the [Systems Manager console](https://console.aws.amazon.com/systems-manager/).

1. In the navigation pane, choose **Parameter Store**.

1. Create a new parameter to store in the Systems Manager. For instructions, see [Create a Systems Manager parameter (console)](https://docs.aws.amazon.com/systems-manager/latest/userguide/parameter-create-console.html) in the AWS Systems Manager User Guide.

1. Copy your parameter's ARN.

1. Add the Systems Manager permissions from the following example policy to your [Service execution role](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html). Replace *<arn:aws:ssm:us-east-1:123456789000:parameter/MyParameterName>* with the ARN of your parameter.  
****  

   ```
   {
           "Version":"2012-10-17",		 	 	 
           "Statement": [
               {
                   "Sid": "VisualEditor0",
                   "Effect": "Allow",
                   "Action": [
                       "ssm:GetParameterHistory",
                       "ssm:GetParametersByPath",
                       "ssm:GetParameters",
                       "ssm:GetParameter"
                   ],
                   "Resource": "arn:aws:ssm:us-east-1:123456789000:parameter/MyParameterName"
               }
           ]
       }
   ```

1. For using the parameter store configuration provider, copy the following lines of code to the worker configuration textbox in Step 3:

   ```
   # define name of config provider:
   
   config.providers = ssm
   
   # provide implementation classes for parameter store:
   
   config.providers.ssm.class = com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   config.providers.ssm.param.region = us-east-1
   ```

1. For the parameter store configuration provider copy the following lines of code in the connector configuration in Step 5.

   ```
   #Example implementation for parameter store variable
   schema.history.internal.kafka.bootstrap.servers=${ssm::MSKBootstrapServerAddress}
   ```

   You may also bundle the above two steps with more configuration providers.

------
#### [ Configure in Amazon S3 ]

**To configure objects/files in Amazon S3**

1. Open the [Amazon S3 console](https://console.aws.amazon.com/s3/).

1. Upload your object to a bucket in S3. For instructions, see [Uploading objects](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html).

1. Copy your object's ARN.

1. Add the Amazon S3 Object Read permissions from the following example policy to your [Service execution role](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html). Replace the example ARN, `arn:aws:s3:::<MY_S3_BUCKET/path/to/custom-plugin.zip>`, with the ARN of your object.  
****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
               {
                   "Sid": "VisualEditor0",
                   "Effect": "Allow",
                   "Action": "s3:GetObject",
                   "Resource": "arn:aws:s3:::<MY_S3_BUCKET/path/to/custom-plugin.zip>"
               }
           ]
       }
   ```

1. For using the Amazon S3 configuration provider, copy the following lines of code to the worker configuration text-box in Step 3:

   ```
   # define name of config provider:
   
   config.providers = s3import
   # provide implementation classes for S3:
   
   config.providers.s3import.class = com.amazonaws.kafka.config.providers.S3ImportConfigProvider
   ```

1. For the Amazon S3 configuration provider, copy the following lines of code to the connector configuration in Step 4.

   ```
   #Example implementation for S3 object
   
   database.ssl.truststore.location = ${s3import:us-west-2:my_cert_bucket/path/to/trustore_unique_filename.jks}
   ```

   You may also bundle the above two steps with more configuration providers.

------

## Create a custom worker configuration with information about your configuration provider
<a name="msk-connect-config-providers-create-custom-config"></a>

1. Select **Worker configurations** under the **Amazon MSK Connect** section.

1. Select **Create worker configuration**.

1. Enter `SourceDebeziumCustomConfig` in the Worker Configuration Name textbox. The Description is optional.

1. Copy the relevant configuration code based on the providers desired, and paste it in the **Worker configuration** textbox.

1. This is an example of the worker configuration for all the three providers:

   ```
   key.converter=org.apache.kafka.connect.storage.StringConverter
   key.converter.schemas.enable=false
   value.converter=org.apache.kafka.connect.json.JsonConverter
   value.converter.schemas.enable=false
   offset.storage.topic=offsets_my_debezium_source_connector
   
   # define names of config providers:
   
   config.providers=secretsmanager,ssm,s3import
   
   # provide implementation classes for each provider:
   
   config.providers.secretsmanager.class    = com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
   config.providers.ssm.class               = com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
   config.providers.s3import.class          = com.amazonaws.kafka.config.providers.S3ImportConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   
   config.providers.secretsmanager.param.region = us-east-1
   config.providers.ssm.param.region = us-east-1
   ```

1. Click on Create worker configuration.

## Create the connector
<a name="msk-connect-config-providers-create-connector"></a>

1. Create a new connector using the instructions in [Create a new connector](https://docs.aws.amazon.com/msk/latest/developerguide/mkc-create-connector.html).

1. Choose the `custom-plugin.zip` file that you uploaded to your S3 bucket in [Create a custom plugin and upload to S3](#msk-connect-config-providers-create-custom-plugin) as the source for the custom plugin.

1. Copy the relevant configuration code based on the providers desired, and paste them in the Connector configuration field.

1. This is an example for the connector configuration for all the three providers:

   ```
   #Example implementation for parameter store variable
   schema.history.internal.kafka.bootstrap.servers=${ssm::MSKBootstrapServerAddress}
   
   #Example implementation for secrets manager variable
   database.user=${secretsmanager:MSKAuroraDBCredentials:username}
   database.password=${secretsmanager:MSKAuroraDBCredentials:password}
   
   #Example implementation for Amazon S3 file/object
   database.ssl.truststore.location = ${s3import:us-west-2:my_cert_bucket/path/to/trustore_unique_filename.jks}
   ```

1. Select **Use a custom configuration** and choose **SourceDebeziumCustomConfig** from the **Worker Configuration** dropdown.

1. Follow the remaining steps from instructions in [Create connector](https://docs.aws.amazon.com/msk/latest/developerguide/mkc-create-connector.html).

# IAM roles and policies for MSK Connect
<a name="msk-connect-iam"></a>

This section helps you set up the appropriate IAM policies and roles to securely deploy and manage Amazon MSK Connect within your AWS environment. The following sections explain the service execution role that must be used with MSK Connect, including the required trust policy and additional permissions needed when connecting to an IAM-authenticated MSK cluster. The page also provides examples of comprehensive IAM policies to grant full access to MSK Connect functionality, as well as details on AWS managed policies available for the service. 

**Topics**
+ [Understand service execution role](msk-connect-service-execution-role.md)
+ [Example of IAM policy for MSK Connect](mkc-iam-policy-examples.md)
+ [Prevent cross-service confused deputy problem](cross-service-confused-deputy-prevention.md)
+ [AWS managed policies for MSK Connect](mkc-security-iam-awsmanpol.md)
+ [Use service-linked roles for MSK Connect](mkc-using-service-linked-roles.md)

# Understand service execution role
<a name="msk-connect-service-execution-role"></a>

**Note**  
Amazon MSK Connect does not support using the [Service-linked role](mkc-using-service-linked-roles.md) as the service execution role. You must create a separate service execution role. For instructions on how to create a custom IAM role, see [Creating a role to delegate permissions to an AWS service](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html) in the *IAM User Guide*.

When you create a connector with MSK Connect, you're required to specify an AWS Identity and Access Management (IAM) role to use with it. Your service execution role must have the following trust policy so that MSK Connect can assume it. For information about the condition context keys in this policy, see [Prevent cross-service confused deputy problem](cross-service-confused-deputy-prevention.md).

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "123456789012"
        },
        "ArnLike": {
          "aws:SourceArn": "arn:aws:kafkaconnect:us-east-1:123456789012:connector/myConnector/abc12345-abcd-4444-a8b9-123456f513ed-2"
        }
      }
    }   
  ]
}
```

------

If the Amazon MSK cluster that you want to use with your connector is a cluster that uses IAM authentication, then you must add the following permissions policy to the connector's service execution role. For information about how to find your cluster's UUID and how to construct topic ARNs, see [Authorization policy resources](kafka-actions.md#msk-iam-resources).

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:000000000001:cluster/testClusterName/300d0000-0000-0005-000f-00000000000b-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/myCluster/300a0000-0000-0003-000a-00000000000b-6/__amazon_msk_connect_read"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:WriteData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/testCluster/300f0000-0000-0008-000d-00000000000m-7/__amazon_msk_connect_write"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:CreateTopic",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/testCluster/300f0000-0000-0008-000d-00000000000m-7/__amazon_msk_connect_*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:group/testCluster/300d0000-0000-0005-000f-00000000000b-1/__amazon_msk_connect_*",
                "arn:aws:kafka:us-east-1:123456789012:group/testCluster/300d0000-0000-0005-000f-00000000000b-1/connect-*"
            ]
        }
    ]
}
```

------

Depending on the kind of connector, you might also need to attach to the service execution role a permissions policy that allows it to access AWS resources. For example, if your connector needs to send data to an S3 bucket, then the service execution role must have a permissions policy that grants permission to write to that bucket. For testing purposes, you can use one of the pre-built IAM policies that give full access, like `arn:aws:iam::aws:policy/AmazonS3FullAccess`. However, for security purposes, we recommend that you use the most restrictive policy that allows your connector to read from the AWS source or write to the AWS sink.

# Example of IAM policy for MSK Connect
<a name="mkc-iam-policy-examples"></a>

To give a non-admin user full access to all MSK Connect functionality, attach a policy like the following one to the user's IAM role.

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Sid": "MSKConnectFullAccess",
      "Effect": "Allow",
      "Action": [
        "kafkaconnect:CreateConnector",
        "kafkaconnect:DeleteConnector",
        "kafkaconnect:DescribeConnector",
        "kafkaconnect:ListConnectors",
        "kafkaconnect:UpdateConnector",
        "kafkaconnect:CreateCustomPlugin",
        "kafkaconnect:DeleteCustomPlugin",
        "kafkaconnect:DescribeCustomPlugin",
        "kafkaconnect:ListCustomPlugins",
        "kafkaconnect:CreateWorkerConfiguration",
        "kafkaconnect:DeleteWorkerConfiguration",
        "kafkaconnect:DescribeWorkerConfiguration",
        "kafkaconnect:ListWorkerConfigurations"
      ],
      "Resource": "*"
    },
    {
      "Sid": "IAMPassRole",
      "Effect": "Allow",
      "Action": "iam:PassRole",
      "Resource": "arn:aws:iam::123456789012:role/MSKConnectServiceRole",
      "Condition": {
        "StringEquals": {
          "iam:PassedToService": "kafkaconnect.amazonaws.com"
        }
      }
    },
    {
      "Sid": "EC2NetworkAccess",
      "Effect": "Allow",
      "Action": [
        "ec2:CreateNetworkInterface",
        "ec2:DescribeNetworkInterfaces",
        "ec2:DeleteNetworkInterface",
        "ec2:DescribeVpcs",
        "ec2:DescribeSubnets",
        "ec2:DescribeSecurityGroups"
      ],
      "Resource": "*"
    },
    {
      "Sid": "MSKClusterAccess",
      "Effect": "Allow",
      "Action": [
        "kafka:DescribeCluster",
        "kafka:DescribeClusterV2",
        "kafka:GetBootstrapBrokers"
      ],
      "Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/myCluster/"
    },
    {
      "Sid": "MSKLogGroupAccess",
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "logs:DescribeLogStreams",
        "logs:DescribeLogGroups"
      ],
      "Resource": [
        "arn:aws:logs:us-east-1:123456789012:log-group:/aws/msk-connect/*"
      ]
    },
    {
      "Sid": "S3PluginAccess",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket",
        "s3:PutObject"
      ],
      "Resource": [
        "arn:aws:s3:::amzn-s3-demo-bucket1-custom-plugins",
        "arn:aws:s3:::amzn-s3-demo-bucket1-custom-plugins/*"
      ]
    }
  ]
}
```

------

# Prevent cross-service confused deputy problem
<a name="cross-service-confused-deputy-prevention"></a>

The confused deputy problem is a security issue where an entity that doesn't have permission to perform an action can coerce a more-privileged entity to perform the action. In AWS, cross-service impersonation can result in the confused deputy problem. Cross-service impersonation can occur when one service (the *calling service*) calls another service (the *called service*). The calling service can be manipulated to use its permissions to act on another customer's resources in a way it should not otherwise have permission to access. To prevent this, AWS provides tools that help you protect your data for all services with service principals that have been given access to resources in your account. 

We recommend using the [https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourcearn](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourcearn) and [https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourceaccount](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourceaccount) global condition context keys in resource policies to limit the permissions that MSK Connect gives another service to the resource. If the `aws:SourceArn` value does not contain the account ID (for example, an Amazon S3 bucket ARN doesn't contain the account ID), you must use both global condition context keys to limit permissions. If you use both global condition context keys and the `aws:SourceArn` value contains the account ID, the `aws:SourceAccount` value and the account in the `aws:SourceArn` value must use the same account ID when used in the same policy statement. Use `aws:SourceArn` if you want only one resource to be associated with the cross-service access. Use `aws:SourceAccount` if you want to allow any resource in that account to be associated with the cross-service use.

In the case of MSK Connect, the value of `aws:SourceArn` must be an MSK connector.

The most effective way to protect against the confused deputy problem is to use the `aws:SourceArn` global condition context key with the full ARN of the resource. If you don't know the full ARN of the resource or if you are specifying multiple resources, use the `aws:SourceArn` global context condition key with wildcards (`*`) for the unknown portions of the ARN. For example, *arn:aws:kafkaconnect:us-east-1:123456789012:connector/\$1* represents all connectors that belong to the account with ID 123456789012 in the US East (N. Virginia) Region.

The following example shows how you can use the `aws:SourceArn` and `aws:SourceAccount` global condition context keys in MSK Connect to prevent the confused deputy problem. Replace *123456789012* and arn:aws:kafkaconnect:*us-east-1*:*123456789012*:connector/*my-S3-Sink-Connector*/*abcd1234-5678-90ab-cdef-1234567890ab* with your AWS account and connector information.

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": " kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "123456789012"
        },
        "ArnLike": {
        "aws:SourceArn": "arn:aws:kafkaconnect:us-east-1:123456789012:connector/my-S3-Sink-Connector/abcd1234-5678-90ab-cdef-1234567890ab"
        }
      }
    }   
  ]
}
```

------

# AWS managed policies for MSK Connect
<a name="mkc-security-iam-awsmanpol"></a>

An AWS managed policy is a standalone policy that is created and administered by AWS. AWS managed policies are designed to provide permissions for many common use cases so that you can start assigning permissions to users, groups, and roles.

Keep in mind that AWS managed policies might not grant least-privilege permissions for your specific use cases because they're available for all AWS customers to use. We recommend that you reduce permissions further by defining [ customer managed policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html#customer-managed-policies) that are specific to your use cases.

You cannot change the permissions defined in AWS managed policies. If AWS updates the permissions defined in an AWS managed policy, the update affects all principal identities (users, groups, and roles) that the policy is attached to. AWS is most likely to update an AWS managed policy when a new AWS service is launched or new API operations become available for existing services.

For more information, see [AWS managed policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html#aws-managed-policies) in the *IAM User Guide*.

## AWS managed policy: AmazonMSKConnectReadOnlyAccess
<a name="security-iam-awsmanpol-AmazonMSKConnectReadOnlyAccess"></a>

This policy grants the user the permissions that are needed to list and describe MSK Connect resources.

You can attach the `AmazonMSKConnectReadOnlyAccess` policy to your IAM identities.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:ListConnectors",
                "kafkaconnect:ListCustomPlugins",
                "kafkaconnect:ListWorkerConfigurations"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeConnector"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:connector/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeCustomPlugin"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:custom-plugin/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeWorkerConfiguration"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:worker-configuration/*"
            ]
        }
    ]
}
```

------

## AWS managed policy: KafkaConnectServiceRolePolicy
<a name="security-iam-awsmanpol-KafkaConnectServiceRolePolicy"></a>

This policy grants the MSK Connect service the permissions that are needed to create and manage network interfaces that have the tag `AmazonMSKConnectManaged:true`. These network interfaces give MSK Connect network access to resources in your Amazon VPC, such as an Apache Kafka cluster or a source or a sink.

You can't attach KafkaConnectServiceRolePolicy to your IAM entities. This policy is attached to a service-linked role that allows MSK Connect to perform actions on your behalf.

------
#### [ JSON ]

****  

```
{
	"Version":"2012-10-17",		 	 	 
	"Statement": [
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateNetworkInterface"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"aws:RequestTag/AmazonMSKConnectManaged": "true"
				},
				"ForAllValues:StringEquals": {
					"aws:TagKeys": "AmazonMSKConnectManaged"
				}
			}
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateNetworkInterface"
			],
			"Resource": [
				"arn:aws:ec2:*:*:subnet/*",
				"arn:aws:ec2:*:*:security-group/*"
			]
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateTags"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"ec2:CreateAction": "CreateNetworkInterface"
				}
			}
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:DescribeNetworkInterfaces",
				"ec2:CreateNetworkInterfacePermission",
				"ec2:AttachNetworkInterface",
				"ec2:DetachNetworkInterface",
				"ec2:DeleteNetworkInterface"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"ec2:ResourceTag/AmazonMSKConnectManaged": "true"
				}
			}
		}
	]
}
```

------

## MSK Connect updates to AWS managed policies
<a name="security-iam-awsmanpol-updates"></a>

View details about updates to AWS managed policies for MSK Connect since this service began tracking these changes.


| Change | Description | Date | 
| --- | --- | --- | 
|  MSK Connect updated read-only policy  |  MSK Connect updated the AmazonMSKConnectReadOnlyAccess policy to remove the restrictions on listing operations.  | October 13, 2021 | 
|  MSK Connect started tracking changes  |  MSK Connect started tracking changes for its AWS managed policies.  | September 14, 2021 | 

# Use service-linked roles for MSK Connect
<a name="mkc-using-service-linked-roles"></a>

Amazon MSK Connect uses AWS Identity and Access Management (IAM)[ service-linked roles](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_terms-and-concepts.html#iam-term-service-linked-role). A service-linked role is a unique type of IAM role that is linked directly to MSK Connect. Service-linked roles are predefined by MSK Connect and include all the permissions that the service requires to call other AWS services on your behalf. 

A service-linked role makes setting up MSK Connect easier because you don't have to manually add the necessary permissions. MSK Connect defines the permissions of its service-linked roles, and unless defined otherwise, only MSK Connect can assume its roles. The defined permissions include the trust policy and the permissions policy, and that permissions policy cannot be attached to any other IAM entity.

For information about other services that support service-linked roles, see [AWS Services That Work with IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_aws-services-that-work-with-iam.html) and look for the services that have **Yes **in the **Service-Linked Role** column. Choose a **Yes** with a link to view the service-linked role documentation for that service.

## Service-linked role permissions for MSK Connect
<a name="slr-permissions"></a>

MSK Connect uses the service-linked role named **AWSServiceRoleForKafkaConnect** – Allows Amazon MSK Connect to access Amazon resources on your behalf.

The AWSServiceRoleForKafkaConnect service-linked role trusts the `kafkaconnect.amazonaws.com` service to assume the role.

For information about the permissions policy that the role uses, see [AWS managed policy: KafkaConnectServiceRolePolicy](mkc-security-iam-awsmanpol.md#security-iam-awsmanpol-KafkaConnectServiceRolePolicy).

You must configure permissions to allow an IAM entity (such as a user, group, or role) to create, edit, or delete a service-linked role. For more information, see [Service-Linked Role Permissions](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#service-linked-role-permissions) in the *IAM User Guide*.

## Creating a service-linked role for MSK Connect
<a name="create-slr"></a>

You don't need to manually create a service-linked role. When you create a connector in the AWS Management Console, the AWS CLI, or the AWS API, MSK Connect creates the service-linked role for you. 

If you delete this service-linked role, and then need to create it again, you can use the same process to recreate the role in your account. When you create a connector, MSK Connect creates the service-linked role for you again. 

## Editing a service-linked role for MSK Connect
<a name="edit-slr"></a>

MSK Connect does not allow you to edit the AWSServiceRoleForKafkaConnect service-linked role. After you create a service-linked role, you can't change the name of the role because various entities might reference the role. However, you can edit the description of the role using IAM. For more information, see [Editing a Service-Linked Role](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#edit-service-linked-role) in the *IAM User Guide*.

## Deleting a service-linked role for MSK Connect
<a name="delete-slr"></a>

You can use the IAM console, the AWS CLI or the AWS API to manually delete the service-linked role. To do this, you must first manually delete all of your MSK Connect connectors, and then you can manually delete the role. For more information, see [Deleting a Service-Linked Role](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#delete-service-linked-role) in the *IAM User Guide*.

## Supported Regions for MSK Connect service-linked roles
<a name="slr-regions"></a>

MSK Connect supports using service-linked roles in all of the regions where the service is available. For more information, see [AWS Regions and Endpoints](https://docs.aws.amazon.com/general/latest/gr/rande.html).

# Enable internet access for Amazon MSK Connect
<a name="msk-connect-internet-access"></a>

If your connector for Amazon MSK Connect needs access to the internet, we recommend that you use the following Amazon Virtual Private Cloud (VPC) settings to enable that access.
+ Configure your connector with private subnets.
+ Create a public [NAT gateway](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html) or [NAT instance](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_NAT_Instance.html) for your VPC in a public subnet. For more information, see the [Connect subnets to the internet or other VPCs using NAT devices](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html) page in the *Amazon Virtual Private Cloud* *User Guide*. 
+ Allow outbound traffic from your private subnets to your NAT gateway or instance.

# Set up a NAT gateway for Amazon MSK Connect
<a name="msk-connect-internet-access-private-subnets-example"></a>

The following steps show you how to set up a NAT gateway to enable internet access for a connector. You must complete these steps before you create a connector in a private subnet.

## Complete prerequisites for setting up a NAT gateway
<a name="msk-connect-internet-access-private-subnets-prereq"></a>

Make sure you have the following items.
+ The ID of the Amazon Virtual Private Cloud (VPC) associated with your cluster. For example, *vpc-123456ab*.
+ The IDs of the private subnets in your VPC. For example, *subnet-a1b2c3de*, *subnet-f4g5h6ij*, etc. You must configure your connector with private subnets.

## Steps to enable internet access for your connector
<a name="msk-connect-internet-access-private-subnets-steps"></a>

**To enable internet access for your connector**

1. Open the Amazon Virtual Private Cloud console at [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Create a public subnet for your NAT gateway with a descriptive name, and note the subnet ID. For detailed instructions, see [Create a subnet in your VPC](https://docs.aws.amazon.com/vpc/latest/userguide/working-with-vpcs.html#AddaSubnet).

1. Create an internet gateway so that your VPC can communicate with the internet, and note the gateway ID. Attach the internet gateway to your VPC. For instructions, see [Create and attach an internet gateway](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Attach_Gateway).

1. Provision a public NAT gateway so that hosts in your private subnets can reach your public subnet. When you create the NAT gateway, select the public subnet that you created earlier. For instructions, see [Create a NAT gateway](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html#nat-gateway-creating).

1. Configure your route tables. You must have two route tables in total to complete this setup. You should already have a main route table that was automatically created at the same time as your VPC. In this step you create an additional route table for your public subnet.

   1. Use the following settings to modify your VPC's main route table so that your private subnets route traffic to your NAT gateway. For instructions, see [Work with route tables](https://docs.aws.amazon.com/vpc/latest/userguide/WorkWithRouteTables.html) in the *Amazon Virtual Private Cloud* *User Guide*.  
**Private MSKC route table**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-internet-access-private-subnets-example.html)

   1. Follow the instructions in [Create a custom route table](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Routing) to create a route table for your public subnet. When you create the table, enter a descriptive name in the **Name tag** field to help you identify which subnet the table is associated with. For example, **Public MSKC**.

   1. Configure your **Public MSKC** route table using the following settings.  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-internet-access-private-subnets-example.html)

# Understand private DNS hostnames
<a name="msk-connect-dns"></a>

With Private DNS hostname support in MSK Connect, you can configure connectors to reference public or private domain names. Support depends on the DNS servers specified in the VPC *DHCP option set*.

A DHCP option set is a group of network configurations that EC2 instances use in a VPC to communicate over the VPC network. Each VPC has a default DHCP option set, but you can create a custom DHCP option set if you want instances in a VPC to use a different DNS server for domain name resolution, instead of the Amazon-provided DNS server. See [DHCP option sets in Amazon VPC](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_DHCP_Options.html).

Before the Private DNS resolution capability/feature was included with MSK Connect, connectors used the service VPC DNS resolvers for DNS queries from a customer connector. Connectors did not use the DNS servers defined in the customer VPC DHCP option sets for DNS resolution.

Connectors could only reference hostnames in customer connector configurations or plugins that were publicly resolvable. They couldn't resolve private hostnames defined in a privately-hosted zone or use DNS servers in another customer network.

Without Private DNS, customers who chose to make their databases, data warehouses, and systems like the Secrets Manager in their own VPC inaccessible to the internet, couldn't work with MSK connectors. Customers often use private DNS hostnames to comply with corporate security posture.

# Configure a VPC DHCP option set for your connector
<a name="msk-connect-dns-config-dhcp"></a>

Connectors automatically use the DNS servers defined in their VPC DHCP option set when the connector is created. Before you create a connector, make sure that you configure the VPC DHCP option set for your connector's DNS hostname resolution requirements.

Connectors that you created before the Private DNS hostname feature was available in MSK Connect continue to use the previous DNS resolution configuration with no modification required.

If you need only publicly resolvable DNS hostname resolution in your connector, for easier setup we recommend using the default VPC of your account when you create the connector. See [Amazon DNS Server](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#AmazonDNS) in the *Amazon VPC User Guide* for more information on the Amazon-provided DNS server or Amazon Route 53 Resolver.

If you need to resolve private DNS hostnames, make sure the VPC that is passed during connector creation has its DHCP options set correctly configured. For more information, see [Work with DHCP option sets](https://docs.aws.amazon.com/vpc/latest/userguide/DHCPOptionSet.html) in the *Amazon VPC User Guide*.

When you configure a DHCP option set for private DNS hostname resolution, ensure that the connector can reach the custom DNS servers that you configure in the DHCP option set. Otherwise, your connector creation will fail.

After you customize the VPC DHCP option set, connectors subsequently created in that VPC use the DNS servers that you specified in the option set. If you change the option set after you create a connector, the connector adopts the settings in the new option set within a couple of minutes.

# Configure DNS attributes for your VPC
<a name="msk-connect-dns-attributes"></a>

Make sure you have the VPC DNS attributes correctly configured as described in [DNS attributes in your VPC](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#vpc-dns-support) and [DNS hostnames](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#vpc-dns-hostnames) in the *Amazon VPC User Guide*.

See [Resolving DNS queries between VPCs and your network](https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/resolver.html) in the *Amazon Route 53 Developer Guide* for information on using inbound and outbound resolver endpoints to connect other networks to your VPC to work with your connector.

# Handle connector creation failures
<a name="msk-connect-dns-failure-handling"></a>

This section describes possible connector creation failures associated with DNS resolution and suggested actions to resolve the issues.


| Failure | Suggested action | 
| --- | --- | 
|  Connector creation fails if a DNS resolution query fails, or if DNS servers are unreachable from the connector.  |  You can see connector creation failures due to unsuccessful DNS resolution queries in your CloudWatch logs, if you've configured these logs for your connector. Check the DNS server configurations and ensure network connectivity to the DNS servers from the connector.  | 
|  If you change the DNS servers configuration in your VPC DHCP option set while a connector is running, DNS resolution queries from the connector can fail. If the DNS resolution fails, some of the connector tasks can enter a failed state.  |  You can see connector creation failures due to unsuccessful DNS resolution queries in your CloudWatch logs, if you've configured these logs for your connector. The failed tasks should automatically restart to bring the connector back up. If that does not happen, you can contact support to restart the failed tasks for their connector or you can recreate the connector.  | 

# Security for MSK Connect
<a name="msk-connect-security"></a>

You can use an Interface VPC Endpoint, powered by AWS PrivateLink, to prevent traffic between your Amazon VPC and Amazon MSK-Connect compatible APIs from leaving the Amazon network. Interface VPC endpoints don't require an internet gateway, NAT device, VPN connection, or AWS Direct Connect connection. For more information, see [Use Amazon MSK APIs with Interface VPC Endpoints](privatelink-vpc-endpoints.md).

# Logging for MSK Connect
<a name="msk-connect-logging"></a>

MSK Connect can write log events that you can use to debug your connector. When you create a connector, you can specify zero or more of the following log destinations:
+ Amazon CloudWatch Logs: You specify the log group to which you want MSK Connect to send your connector's log events. For information on how to create a log group, see [Create a log group](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html#Create-Log-Group) in the *CloudWatch Logs User Guide*.
+ Amazon S3: You specify the S3 bucket to which you want MSK Connect to send your connector's log events. For information on how to create an S3 bucket, see [Creating a bucket](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) in the *Amazon S3 User Guide*.
+ Amazon Data Firehose: You specify the delivery stream to which you want MSK Connect to send your connector's log events. For information on how to create a delivery stream, see [Creating an Amazon Data Firehose delivery stream](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html) in the *Firehose User Guide*.

To learn more about setting up logging, see [Enabling logging from certain AWS services](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AWS-logs-and-resource-policy.html) in the *Amazon CloudWatch Logs User Guide*.

MSK Connect emits the following types of log events:


****  

| Level | Description | 
| --- | --- | 
| INFO | Runtime events of interest at startup and shutdown. | 
| WARN | Runtime situations that aren't errors but are undesirable or unexpected. | 
| FATAL | Severe errors that cause premature termination. | 
| ERROR | Unexpected conditions and runtime errors that aren't fatal. | 

The following is an example of a log event sent to CloudWatch Logs:

```
[Worker-0bb8afa0b01391c41] [2021-09-06 16:02:54,151] WARN [Producer clientId=producer-1] Connection to node 1 (b-1.my-test-cluster.twwhtj.c2.kafka.us-east-1.amazonaws.com/INTERNAL_IP) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:782)
```

## Preventing secrets from appearing in connector logs
<a name="msk-connect-logging-secrets"></a>

**Note**  
Sensitive configuration values can appear in connector logs if a plugin does not define those values as secret. Kafka Connect treats undefined configuration values the same as any other plaintext value.

If your plugin defines a property as secret, Kafka Connect redacts the property's value from connector logs. For example, the following connector logs demonstrate that if a plugin defines `aws.secret.key` as a `PASSWORD` type, then its value is replaced with `[hidden]`.

```
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] [2022-01-11 15:18:55,150] INFO SecretsManagerConfigProviderConfig values:
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.access.key = my_access_key
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.region = us-east-1
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.secret.key = [hidden]
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] secret.prefix =
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] secret.ttl.ms = 300000
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] (com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProviderConfig:361)
```

To prevent secrets from appearing in connector log files, a plugin developer must use the Kafka Connect enum constant [https://kafka.apache.org/27/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#PASSWORD](https://kafka.apache.org/27/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#PASSWORD) to define sensitive properties. When a property is type `ConfigDef.Type.PASSWORD`, Kafka Connect excludes its value from connector logs even if the value is sent as plaintext. 

# Monitoring Amazon MSK Connect
<a name="mkc-monitoring-overview"></a>

Monitoring is an important part of maintaining the reliability, availability, and performance of MSK Connect and your other AWS solutions. Amazon CloudWatch monitors your AWS resources and the applications that you run on AWS in real-time. You can collect and track metrics, create customized dashboards, and set alarms that notify you or take actions when a specified metric reaches a threshold that you specify. For example, you can have CloudWatch track CPU usage or other metrics of your connector, so that you can increase its capacity if needed. For more information, see the [Amazon CloudWatch User Guide](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/).

You can use the following API operations:
+ `DescribeConnectorOperation`: Monitor the status of connector update operations.
+ `ListConnectorOperations`: Track previous updates run on your connector.

The following table shows the metrics that MSK Connect sends to CloudWatch under the `ConnectorName` dimension. MSK Connect delivers these metrics by default and at no additional cost. CloudWatch keeps these metrics for 15 months, so that you can access historical information and gain a better perspective on how your connectors are performing. You can also set alarms that watch for certain thresholds, and send notifications or take actions when those thresholds are met. For more information, see the [Amazon CloudWatch User Guide](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/).


| Metric name | Description | 
| --- | --- | 
| CpuUtilization | The percentage of CPU consumption by system and user. | 
| ErroredTaskCount | The number of tasks that have errored out. | 
| MemoryUtilization | The percentage of the total memory on a worker instance, not just the Java virtual machine (JVM) heap memory currently in use. JVM doesn't typically release memory back to the operational system. So, JVM heap size (MemoryUtilization) usually starts with a minimum heap size that incrementally increases to a stable maximum of about 80-90%. JVM heap usage might increase or decrease as the connector's actual memory usage changes. | 
| RebalanceCompletedTotal | The total number of rebalances completed by this connector. | 
| RebalanceTimeAvg | The average time in milliseconds spent by the connector on rebalancing.  | 
| RebalanceTimeMax | The maximum time in milliseconds spent by the connector on rebalancing. | 
| RebalanceTimeSinceLast |  The time in milliseconds since this connector completed the most recent rebalance.  | 
| RunningTaskCount | The running number of tasks in the connector. | 
| SinkConsumerByteRate | The average number of bytes consumed per second by the Kafka Connect framework's Sink consumer before any transformations are applied to data. | 
| SinkRecordReadRate | The average per-second number of records read from the Apache Kafka or Amazon MSK cluster. | 
| SinkRecordSendRate | The average per-second number of records that are output from the transformations and sent to the destination. This number doesn't include filtered records. | 
| SourceRecordPollRate | The average per-second number of records produced or polled. | 
| SourceProducerByteRate | The average number of bytes produced per second by the Kafka Connect framework's Source producer after any transformations are applied to data. | 
| SourceRecordWriteRate | The average per-second number of records output from the transformations and written to the Apache Kafka or Amazon MSK cluster. | 
| TaskStartupAttemptsTotal | The total number of task startups that the connector has attempted. You can use this metric to identify anomalies in task startup attempts. | 
| TaskStartupSuccessPercentage | The average percentage of successful task starts for the connector. You can use this metric to identify anomalies in task startup attempts. | 
| WorkerCount | The number of workers that are running in the connector. | 
| BytesInPerSec | Metadata bytes transferred to Kafka Connect framework for communication between workers. | 
| BytesOutPerSec | Metadata bytes transferred from Kafka Connect framework for communication between workers. | 

# Examples to set up Amazon MSK Connect resources
<a name="msk-connect-examples"></a>

This section includes examples to help you set up Amazon MSK Connect resources such as common third-party connectors and configuration providers.

**Topics**
+ [Set up Amazon S3 sink connector](mkc-S3sink-connector-example.md)
+ [Set up EventBridge Kafka sink connector for MSK Connect](mkc-eventbridge-kafka-connector.md)
+ [Use Debezium source connector with configuration provider](mkc-debeziumsource-connector-example.md)

# Set up Amazon S3 sink connector
<a name="mkc-S3sink-connector-example"></a>

This example shows how to use the Confluent [Amazon S3 sink connector](https://www.confluent.io/hub/confluentinc/kafka-connect-s3) and the AWS CLI to create an Amazon S3 sink connector in MSK Connect.

1. Copy the following JSON and paste it in a new file. Replace the placeholder strings with values that correspond to your Amazon MSK cluster's bootstrap servers connection string and the cluster's subnet and security group IDs. For information about how to set up a service execution role, see [IAM roles and policies for MSK Connect](msk-connect-iam.md).

   ```
   {
       "connectorConfiguration": {
           "connector.class": "io.confluent.connect.s3.S3SinkConnector",
           "s3.region": "us-east-1",
           "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
           "flush.size": "1",
           "schema.compatibility": "NONE",
           "topics": "my-test-topic",
           "tasks.max": "2",
           "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
           "storage.class": "io.confluent.connect.s3.storage.S3Storage",
           "s3.bucket.name": "amzn-s3-demo-bucket"
       },
       "connectorName": "example-S3-sink-connector",
       "kafkaCluster": {
           "apacheKafkaCluster": {
               "bootstrapServers": "<cluster-bootstrap-servers-string>",
               "vpc": {
                   "subnets": [
                       "<cluster-subnet-1>",
                       "<cluster-subnet-2>",
                       "<cluster-subnet-3>"
                   ],
                   "securityGroups": ["<cluster-security-group-id>"]
               }
           }
       },
       "capacity": {
           "provisionedCapacity": {
               "mcuCount": 2,
               "workerCount": 4
           }
       },
       "kafkaConnectVersion": "2.7.1",
       "serviceExecutionRoleArn": "<arn-of-a-role-that-msk-connect-can-assume>",
       "plugins": [
           {
               "customPlugin": {
                   "customPluginArn": "<arn-of-custom-plugin-that-contains-connector-code>",
                   "revision": 1
               }
           }
       ],
       "kafkaClusterEncryptionInTransit": {"encryptionType": "PLAINTEXT"},
       "kafkaClusterClientAuthentication": {"authenticationType": "NONE"}
   }
   ```

1. Run the following AWS CLI command in the folder where you saved the JSON file in the previous step.

   ```
   aws kafkaconnect create-connector --cli-input-json file://connector-info.json
   ```

   The following is an example of the output that you get when you run the command successfully.

   ```
   {
       "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-S3-sink-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", 
       "ConnectorState": "CREATING", 
       "ConnectorName": "example-S3-sink-connector"
   }
   ```

# Set up EventBridge Kafka sink connector for MSK Connect
<a name="mkc-eventbridge-kafka-connector"></a>

This topic shows you how to set up the [EventBridge Kafka sink connector](https://github.com/awslabs/eventbridge-kafka-connector) for MSK Connect. This connector lets you send events from your MSK cluster to EventBridge [event buses](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-bus.html). This topic describes the process for creating the required resources and configuring the connector to enable seamless data flow between Kafka and EventBridge. 

**Topics**
+ [Prerequisites](#mkc-eb-kafka-prerequisites)
+ [Set up the resources required for MSK Connect](#mkc-eb-kafka-set-up-resources)
+ [Create the connector](#mkc-eb-kafka-create-connector)
+ [Send messages to Kafka](#mkc-eb-kafka-send-json-encoded-messages)

## Prerequisites
<a name="mkc-eb-kafka-prerequisites"></a>

Before deploying the connector, make sure that you have the following resources:
+ **Amazon MSK cluster**: An active MSK cluster to produce and consume Kafka messages.
+ **Amazon EventBridge event bus**: An EventBridge event bus to receive events from the Kafka topics.
+ **IAM roles**: Create IAM roles with the necessary permissions for MSK Connect and the EventBridge connector.
+ [Access to the public internet](msk-connect-internet-access.md) from MSK Connect or a [VPC interface endpoint](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-related-service-vpc.html) for EventBridge created in the VPC and subnet of your MSK cluster. This helps you avoid traversing the public internet and without requiring NAT gateways.
+ A [client machine](create-serverless-cluster-client.md), such as an Amazon EC2 instance or [AWS CloudShell](https://aws.amazon.com/cloudshell/), to create topics and send records to Kafka.

## Set up the resources required for MSK Connect
<a name="mkc-eb-kafka-set-up-resources"></a>

You create an IAM role for the connector, and then you create the connector. You also create an EventBridge rule to filter Kafka events sent to the EventBridge event bus.

**Topics**
+ [IAM role for the connector](#mkc-eb-kafka-iam-role-connector)
+ [An EventBridge rule for incoming events](#mkc-eb-kafka-create-rule)

### IAM role for the connector
<a name="mkc-eb-kafka-iam-role-connector"></a>

The IAM role that you associate with the connector must have the [PutEvents](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-permissions-reference.html) permission to allow sending events to EventBridge. The following IAM policy example grants you the permissin to send events to an event bus named `example-event-bus`. Make sure that you replace the resource ARN in the following example with the ARN of your event bus.

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "events:PutEvents"
      ],
      "Resource": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus"
    }
  ]
}
```

------

In addition, you must make sure that your IAM role for the connector contains the following trust policy.

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```

------

### An EventBridge rule for incoming events
<a name="mkc-eb-kafka-create-rule"></a>

You create [rules](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html) that match incoming events with event data criteria, known as [https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html). With an event pattern, you can define the criteria to filter incoming events, and determine which events should trigger a particular rule and subsequently be routed to a designated [target](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html). The following example of an event pattern matches Kafka events sent to the EventBridge event bus.

```
{
  "detail": {
    "topic": ["msk-eventbridge-tutorial"]
  }
}
```

The following is an example of an event sent from Kafka to EventBridge using the Kafka sink connector.

```
{
  "version": "0",
  "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57",
  "account": "123456789012",
  "time": "2025-03-26T10:15:00Z",
  "region": "us-east-1",
  "detail-type": "msk-eventbridge-tutorial",
  "source": "kafka-connect.msk-eventbridge-tutorial",
  "resources": [],
  "detail": {
    "topic": "msk-eventbridge-tutorial",
    "partition": 0,
    "offset": 0,
    "timestamp": 1742984100000,
    "timestampType": "CreateTime",
    "headers": [],
    "key": "order-1",
    "value": {
      "orderItems": [
        "item-1",
        "item-2"
      ],
      "orderCreatedTime": "Wed Mar 26 10:15:00 UTC 2025"
    }
  }
}
```

In the EventBridge console, [create a rule](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-create-rule.html) on the event bus using this example pattern and specify a target, such as a CloudWatch Logs group. The EventBridge console will automatically configure the necessary access policy for the CloudWatch Logs group.

## Create the connector
<a name="mkc-eb-kafka-create-connector"></a>

In the following section, you create and deploy the [EventBridge Kafka sink connector](https://github.com/awslabs/eventbridge-kafka-connector) using the AWS Management Console.

**Topics**
+ [Step 1: Download the connector](#mkc-eb-kafka-download-connector)
+ [Step 2: Create an Amazon S3 bucket](#mkc-eb-kafka-s3-bucket-create)
+ [Step 3: Create a plugin in MSK Connect](#mkc-eb-kafka-create-plugin)
+ [Step 4: Create the connector](#mkc-eb-kafka-create-connector)

### Step 1: Download the connector
<a name="mkc-eb-kafka-download-connector"></a>

Download the latest EventBridge connector sink JAR from the [GitHub releases page](https://github.com/awslabs/eventbridge-kafka-connector/releases) for the EventBridge Kafka connector. For example, to download the version v1.4.1, choose the JAR file link, `kafka-eventbridge-sink-with-dependencies.jar`, to download the connector. Then, save the file to a preferred location on your machine.

### Step 2: Create an Amazon S3 bucket
<a name="mkc-eb-kafka-s3-bucket-create"></a>

1. To store the JAR file in Amazon S3 for use with MSK Connect, open the AWS Management Console, and then choose Amazon S3.

1. In the Amazon S3 console, choose **Create bucket**, and enter a unique bucket name. For example, **amzn-s3-demo-bucket1-eb-connector**.

1. Choose an appropriate Region for your Amazon S3 bucket. Make sure that it matches the Region where your MSK cluster is deployed.

1. For **Bucket settings**, keep the default selections or adjust as needed.

1. Choose **Create bucket**

1. Upload the JAR file to the Amazon S3 bucket.

### Step 3: Create a plugin in MSK Connect
<a name="mkc-eb-kafka-create-plugin"></a>

1. Open the AWS Management Console, and then navigate to **MSK Connect**.

1. In the left navigation pane, choose **Custom plugins**.

1. Choose **Create plugin**, and then enter a **Plugin name**. For example, **eventbridge-sink-plugin**.

1. For **Custom plugin location**, paste the **S3 object URL**.

1. Add an optional description for the plugin.

1. Choose **Create plugin**.

After the plugin is created, you can use it to configure and deploy the EventBridge Kafka connector in MSK Connect.

### Step 4: Create the connector
<a name="mkc-eb-kafka-create-connector"></a>

Before creating the connector, we recommend to create the required Kafka topic to avoid connector errors. To create the topic, use your client machine.

1. In the left pane of the MSK console, choose **Connectors**, and then choose **Create connector**.

1. In the list of plugins, choose **eventbridge-sink-plugin**, then choose **Next**.

1. For the connector name, enter **EventBridgeSink**.

1. In the list of clusters, choose your MSK cluster.

1. <a name="connector-ex"></a>Copy the following configuration for the connector and paste it into the **Connector configuration** field

   Replace the placeholders in the following configuration, as required.
   + Remove `aws.eventbridge.endpoint.uri` if your MSK cluster has public internet accesss.
   + If you use PrivateLink to securely connect from MSK to EventBridge, replace the DNS part after `https://` with the correct private DNS name of the (optional) VPC interface endpoint for EventBridge that you created earlier.
   + Replace the EventBridge event bus ARN in the following configuration with the ARN of your event bus.
   + Update any Region-specific values.

   ```
   {
     "connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector",
     "aws.eventbridge.connector.id": "msk-eventbridge-tutorial",
     "topics": "msk-eventbridge-tutorial",
     "tasks.max": "1",
     "aws.eventbridge.endpoint.uri": "https://events.us-east-1.amazonaws.com",
     "aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus",
     "value.converter.schemas.enable": "false",
     "value.converter": "org.apache.kafka.connect.json.JsonConverter",
     "aws.eventbridge.region": "us-east-1",
     "auto.offset.reset": "earliest",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }
   ```

   For more information about connector configuration, see [eventbridge-kafka-connector](https://github.com/awslabs/eventbridge-kafka-connector).

   If needed, change the settings for workers and autoscaling. We also recommend to use the latest available (recommended) Apache Kafka Connect version from the dropdown. Under **Access permissions**, use the role created earlier. We also recommend to enable logging to CloudWatch for observability and troubleshooting. Adjust the other optional settings, such as tags, based on your needs. Then, deploy the connector and wait for the status to enter Running state.

## Send messages to Kafka
<a name="mkc-eb-kafka-send-json-encoded-messages"></a>

You can configure message encodings, such as Apache Avro and JSON, by specifying different converters using the `value.converter` and, optionally, `key.converter` settings available in Kafka Connect.

The [connector example](#connector-ex) in this topic is configured to work with JSON-encoded messages, as indicated by the use of `org.apache.kafka.connect.json.JsonConverter` for `value converter`. When the connector is in Running state, send records to the `msk-eventbridge-tutorial` Kafka topic from your client machine.

# Use Debezium source connector with configuration provider
<a name="mkc-debeziumsource-connector-example"></a>

This example shows how to use the Debezium MySQL connector plugin with a MySQL-compatible [Amazon Aurora](https://aws.amazon.com/rds/aurora/) database as the source. In this example, we also set up the open-source [AWS Secrets Manager Config Provider](https://github.com/jcustenborder/kafka-config-provider-aws) to externalize database credentials in AWS Secrets Manager. To learn more about configuration providers, see [Tutorial: Externalizing sensitive information using config providers](msk-connect-config-provider.md).

**Important**  
The Debezium MySQL connector plugin [supports only one task](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-tasks-max) and does not work with autoscaled capacity mode for Amazon MSK Connect. You should instead use provisioned capacity mode and set `workerCount` equal to one in your connector configuration. To learn more about the capacity modes for MSK Connect, see [Understand connector capacity](msk-connect-capacity.md).

# Complete prerequisites to use Debezium source connector
<a name="mkc-debeziumsource-connector-example-prereqs"></a>

Your connector must be able to access the internet so that it can interact with services such as AWS Secrets Manager that are outside of your Amazon Virtual Private Cloud. The steps in this section help you complete the following tasks to enable internet access.
+ Set up a public subnet that hosts a NAT gateway and routes traffic to an internet gateway in your VPC.
+ Create a default route that directs your private subnet traffic to your NAT gateway.

For more information, see [Enable internet access for Amazon MSK Connect](msk-connect-internet-access.md).

**Prerequisites**

Before you can enable internet access, you need the following items:
+ The ID of the Amazon Virtual Private Cloud (VPC) associated with your cluster. For example, *vpc-123456ab*.
+ The IDs of the private subnets in your VPC. For example, *subnet-a1b2c3de*, *subnet-f4g5h6ij*, etc. You must configure your connector with private subnets.

**To enable internet access for your connector**

1. Open the Amazon Virtual Private Cloud console at [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Create a public subnet for your NAT gateway with a descriptive name, and note the subnet ID. For detailed instructions, see [Create a subnet in your VPC](https://docs.aws.amazon.com/vpc/latest/userguide/working-with-vpcs.html#AddaSubnet).

1. Create an internet gateway so that your VPC can communicate with the internet, and note the gateway ID. Attach the internet gateway to your VPC. For instructions, see [Create and attach an internet gateway](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Attach_Gateway).

1. Provision a public NAT gateway so that hosts in your private subnets can reach your public subnet. When you create the NAT gateway, select the public subnet that you created earlier. For instructions, see [Create a NAT gateway](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html#nat-gateway-creating).

1. Configure your route tables. You must have two route tables in total to complete this setup. You should already have a main route table that was automatically created at the same time as your VPC. In this step you create an additional route table for your public subnet.

   1. Use the following settings to modify your VPC's main route table so that your private subnets route traffic to your NAT gateway. For instructions, see [Work with route tables](https://docs.aws.amazon.com/vpc/latest/userguide/WorkWithRouteTables.html) in the *Amazon Virtual Private Cloud* *User Guide*.  
**Private MSKC route table**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/msk/latest/developerguide/mkc-debeziumsource-connector-example-prereqs.html)

   1. Follow the instructions in [Create a custom route table](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Routing) to create a route table for your public subnet. When you create the table, enter a descriptive name in the **Name tag** field to help you identify which subnet the table is associated with. For example, **Public MSKC**.

   1. Configure your **Public MSKC** route table using the following settings.  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/msk/latest/developerguide/mkc-debeziumsource-connector-example-prereqs.html)

Now that you have enabled internet access for Amazon MSK Connect you are ready to create a connector.

# Create a Debezium source connector
<a name="msk-connect-debeziumsource-connector-example-steps"></a>

This procedure describes how to create a Debezium source connector.

1. 

**Create a custom plugin**

   1. Download the MySQL connector plugin for the latest stable release from the [Debezium](https://debezium.io/releases/) site. Make a note of the Debezium release version you download (version 2.x, or the older series 1.x). Later in this procedure, you'll create a connector based on your Debezium version.

   1. Download and extract the [AWS Secrets Manager Config Provider](https://www.confluent.io/hub/jcustenborder/kafka-config-provider-aws).

   1. Place the following archives into the same directory:
      + The `debezium-connector-mysql` folder
      + The `jcusten-border-kafka-config-provider-aws-0.1.1` folder

   1. Compress the directory that you created in the previous step into a ZIP file and then upload the ZIP file to an S3 bucket. For instructions, see [Uploading objects](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html) in the *Amazon S3 User Guide.*

   1. Copy the following JSON and paste it in a file. For example, `debezium-source-custom-plugin.json`. Replace *<example-custom-plugin-name>* with the name that you want the plugin to have, *<amzn-s3-demo-bucket-arn>* with the ARN of the Amazon S3 bucket where you uploaded the ZIP file, and `<file-key-of-ZIP-object>` with the file key of the ZIP object that you uploaded to S3.

      ```
      {
          "name": "<example-custom-plugin-name>",
          "contentType": "ZIP",
          "location": {
              "s3Location": {
                  "bucketArn": "<amzn-s3-demo-bucket-arn>",
                  "fileKey": "<file-key-of-ZIP-object>"
              }
          }
      }
      ```

   1. Run the following AWS CLI command from the folder where you saved the JSON file to create a plugin.

      ```
      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>
      ```

      You should see output similar to the following example.

      ```
      {
          "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1",
          "CustomPluginState": "CREATING",
          "Name": "example-custom-plugin-name",
          "Revision": 1
      }
      ```

   1. Run the following command to check the plugin state. The state should change from `CREATING` to `ACTIVE`. Replace the ARN placeholder with the ARN that you got in the output of the previous command.

      ```
      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
      ```

1. 

**Configure AWS Secrets Manager and create a secret for your database credentials**

   1. Open the Secrets Manager console at [https://console.aws.amazon.com/secretsmanager/](https://console.aws.amazon.com/secretsmanager/).

   1. Create a new secret to store your database sign-in credentials. For instructions, see [Create a secret](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_create-basic-secret.html) in the *AWS Secrets Manager* User Guide.

   1. Copy your secret's ARN.

   1. Add the Secrets Manager permissions from the following example policy to your [Understand service execution role](msk-connect-service-execution-role.md). Replace *<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>* with the ARN of your secret.

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Action": [
              "secretsmanager:GetResourcePolicy",
              "secretsmanager:GetSecretValue",
              "secretsmanager:DescribeSecret",
              "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": [
            "arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234"
            ]
          }
        ]
      }
      ```

------

      For instructions on how to add IAM permissions, see [Adding and removing IAM identity permissions](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html) in the *IAM User Guide*.

1. 

**Create a custom worker configuration with information about your configuration provider**

   1. Copy the following worker configuration properties into a file, replacing the placeholder strings with values that correspond to your scenario. To learn more about the configuration properties for the AWS Secrets Manager Config Provider, see [SecretsManagerConfigProvider](https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-config-provider-aws/configProviders/SecretsManagerConfigProvider.html) in the plugin's documentation.

      ```
      key.converter=<org.apache.kafka.connect.storage.StringConverter>
      value.converter=<org.apache.kafka.connect.storage.StringConverter>
      config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
      config.providers=secretManager
      config.providers.secretManager.param.aws.region=<us-east-1>
      ```

   1. Run the following AWS CLI command to create your custom worker configuration. 

      Replace the following values:
      + *<my-worker-config-name>* - a descriptive name for your custom worker configuration
      + *<encoded-properties-file-content-string>* - a base64-encoded version of the plaintext properties that you copied in the previous step

      ```
      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
      ```

1. 

**Create a connector**

   1. Copy the following JSON that corresponds to your Debezium version (2.x or 1.x) and paste it in a new file. Replace the `<placeholder>` strings with values that correspond to your scenario. For information about how to set up a service execution role, see [IAM roles and policies for MSK Connect](msk-connect-iam.md).

      Note that the configuration uses variables like `${secretManager:MySecret-1234:dbusername}` instead of plaintext to specify database credentials. Replace `MySecret-1234` with the name of your secret and then include the name of the key that you want to retrieve. You must also replace `<arn-of-config-provider-worker-configuration>` with the ARN of your custom worker configuration.

------
#### [ Debezium 2.x ]

      For Debezium 2.x versions, copy the following JSON and paste it in a new file. Replace the *<placeholder>* strings with values that correspond to your scenario.

      ```
      {
      	"connectorConfiguration": {
      		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
      		"tasks.max": "1",
      		"database.hostname": "<aurora-database-writer-instance-endpoint>",
      		"database.port": "3306",
      		"database.user": "<${secretManager:MySecret-1234:dbusername}>",
      		"database.password": "<${secretManager:MySecret-1234:dbpassword}>",
      		"database.server.id": "123456",
      		"database.include.list": "<list-of-databases-hosted-by-specified-server>",
      		"topic.prefix": "<logical-name-of-database-server>",
      		"schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>",
      		"schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>",
      		"schema.history.internal.consumer.security.protocol": "SASL_SSL",
      		"schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM",
      		"schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"schema.history.internal.producer.security.protocol": "SASL_SSL",
      		"schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM",
      		"schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"include.schema.changes": "true"
      	},
      	"connectorName": "example-Debezium-source-connector",
      	"kafkaCluster": {
      		"apacheKafkaCluster": {
      			"bootstrapServers": "<cluster-bootstrap-servers-string>",
      			"vpc": {
      				"subnets": [
      					"<cluster-subnet-1>",
      					"<cluster-subnet-2>",
      					"<cluster-subnet-3>"
      				],
      				"securityGroups": ["<id-of-cluster-security-group>"]
      			}
      		}
      	},
      	"capacity": {
      		"provisionedCapacity": {
      			"mcuCount": 2,
      			"workerCount": 1
      		}
      	},
      	"kafkaConnectVersion": "2.7.1",
      	"serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>",
      	"plugins": [{
      		"customPlugin": {
      			"customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>",
      			"revision": 1
      		}
      	}],
      	"kafkaClusterEncryptionInTransit": {
      		"encryptionType": "TLS"
      	},
      	"kafkaClusterClientAuthentication": {
      		"authenticationType": "IAM"
      	},
      	"workerConfiguration": {
      		"workerConfigurationArn": "<arn-of-config-provider-worker-configuration>",
      		"revision": 1
      	}
      }
      ```

------
#### [ Debezium 1.x ]

      For Debezium 1.x versions, copy the following JSON and paste it in a new file. Replace the *<placeholder>* strings with values that correspond to your scenario.

      ```
      {
      	"connectorConfiguration": {
      		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
      		"tasks.max": "1",
      		"database.hostname": "<aurora-database-writer-instance-endpoint>",
      		"database.port": "3306",
      		"database.user": "<${secretManager:MySecret-1234:dbusername}>",
      		"database.password": "<${secretManager:MySecret-1234:dbpassword}>",
      		"database.server.id": "123456",
      		"database.server.name": "<logical-name-of-database-server>",
      		"database.include.list": "<list-of-databases-hosted-by-specified-server>",
      		"database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>",
      		"database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>",
      		"database.history.consumer.security.protocol": "SASL_SSL",
      		"database.history.consumer.sasl.mechanism": "AWS_MSK_IAM",
      		"database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"database.history.producer.security.protocol": "SASL_SSL",
      		"database.history.producer.sasl.mechanism": "AWS_MSK_IAM",
      		"database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"include.schema.changes": "true"
      	},
      	"connectorName": "example-Debezium-source-connector",
      	"kafkaCluster": {
      		"apacheKafkaCluster": {
      			"bootstrapServers": "<cluster-bootstrap-servers-string>",
      			"vpc": {
      				"subnets": [
      					"<cluster-subnet-1>",
      					"<cluster-subnet-2>",
      					"<cluster-subnet-3>"
      				],
      				"securityGroups": ["<id-of-cluster-security-group>"]
      			}
      		}
      	},
      	"capacity": {
      		"provisionedCapacity": {
      			"mcuCount": 2,
      			"workerCount": 1
      		}
      	},
      	"kafkaConnectVersion": "2.7.1",
      	"serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>",
      	"plugins": [{
      		"customPlugin": {
      			"customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>",
      			"revision": 1
      		}
      	}],
      	"kafkaClusterEncryptionInTransit": {
      		"encryptionType": "TLS"
      	},
      	"kafkaClusterClientAuthentication": {
      		"authenticationType": "IAM"
      	},
      	"workerConfiguration": {
      		"workerConfigurationArn": "<arn-of-config-provider-worker-configuration>",
      		"revision": 1
      	}
      }
      ```

------

   1. Run the following AWS CLI command in the folder where you saved the JSON file in the previous step.

      ```
      aws kafkaconnect create-connector --cli-input-json file://connector-info.json
      ```

      The following is an example of the output that you get when you run the command successfully.

      ```
      {
          "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", 
          "ConnectorState": "CREATING", 
          "ConnectorName": "example-Debezium-source-connector"
      }
      ```

# Update a Debezium connector configuration
<a name="mkc-debeziumsource-connector-update"></a>

To update the configuration of the Debezium connector, follow these steps: 

1. Copy the following JSON and paste it to a new file. Replace the `<placeholder>` strings with values that correspond to your scenario.

   ```
   {
      "connectorArn": <connector_arn>,
      "connectorConfiguration": <new_configuration_in_json>,
      "currentVersion": <current_version>
   }
   ```

1. Run the following AWS CLI command in the folder where you saved the JSON file in the previous step.

   ```
   aws kafkaconnect update-connector --cli-input-json file://connector-info.json
   ```

   The following is an example of the output when you run the command successfully.

   ```
   {
       "connectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2",
       "connectorOperationArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector-operation/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2/41b6ad56-3184-479b-850a-a8bedd5a02f3",
       "connectorState": "UPDATING"
   }
   ```

1. You can now run the following command to monitor the current state of the operation:

   ```
   aws kafkaconnect describe-connector-operation --connector-operation-arn <operation_arn>
   ```

For a Debezium connector example with detailed steps, see [Introducing Amazon MSK Connect - Stream Data to and from Your Apache Kafka Clusters Using Managed Connectors](https://aws.amazon.com/blogs/aws/introducing-amazon-msk-connect-stream-data-to-and-from-your-apache-kafka-clusters-using-managed-connectors/).

# Migrate to Amazon MSK Connect
<a name="msk-connect-migrating"></a>

This section describes how to migrate your Apache Kafka connector application to Amazon Managed Streaming for Apache Kafka Connect (Amazon MSK Connect). To know more about the benefits of migrating to Amazon MSK Connect, see [Benefits of using Amazon MSK Connect](msk-connect.md#msk-connect-benefits).

This section also describes the state management topics used by Kafka Connect and Amazon MSK Connect and covers procedures for migrating source and sink connectors.

# Understand internal topics used by Kafka Connect
<a name="msk-connect-kafka-connect-topics"></a>

An Apache Kafka Connect application that’s running in distributed mode stores its state by using internal topics in the Kafka cluster and group membership. The following are the configuration values that correspond to the internal topics that are used for Kafka Connect applications:
+ Configuration topic, specified through `config.storage.topic`

  In the configuration topic, Kafka Connect stores the configuration of all the connectors and tasks that have been started by users. Each time users update the configuration of a connector or when a connector requests a reconfiguration (for example, the connector detects that it can start more tasks), a record is emitted to this topic. This topic is compaction enabled, so it always keeps the last state for each entity.
+ Offsets topic, specified through `offset.storage.topic`

  In the offsets topic, Kafka Connect stores the offsets of the source connectors. Like the configuration topic, the offsets topic is compaction enabled. This topic is used to write the source positions only for source connectors that produce data to Kafka from external systems. Sink connectors, which read data from Kafka and send to external systems, store their consumer offsets by using regular Kafka consumer groups.
+ Status topic, specified through `status.storage.topic`

  In the status topic, Kafka Connect stores the current state of connectors and tasks. This topic is used as the central place for the data that is queried by users of the REST API. This topic allows users to query any worker and still get the status of all running plugins. Like the configuration and offsets topics, the status topic is also compaction enabled.

In addition to these topics, Kafka Connect makes extensive use of Kafka’s group membership API. The groups are named after the connector name. For example, for a connector named file-sink, the group is named connect-file-sink. Each consumer in the group provides records to a single task. These groups and their offsets can be retrieved by using regular consumer groups tools, such as `Kafka-consumer-group.sh`. For each sink connector, the Connect runtime runs a regular consumer group that extracts records from Kafka.

# State management of Amazon MSK Connect applications
<a name="msk-connect-state-management"></a>

By default, Amazon MSK Connect creates three separate topics in the Kafka cluster for each Amazon MSK Connector to store the connector’s configuration, offset, and status. The default topic names are structured as follows:
+ \$1\$1msk\$1connect\$1configs\$1*connector-name*\$1*connector-id*
+ \$1\$1msk\$1connect\$1status\$1*connector-name*\$1*connector-id*
+ \$1\$1msk\$1connect\$1offsets\$1*connector-name*\$1*connector-id*

**Note**  
To provide the offset continuity between source connectors, you can use an offset storage topic of your choice, instead of the default topic. Specifying an offset storage topic helps you accomplish tasks like creating a source connector that resumes reading from the last offset of a previous connector. To specify an offset storage topic, supply a value for the [https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-manage-connector-offsets](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-manage-connector-offsets) property in the Amazon MSK Connect worker configuration before creating the connector.

# Migrate source connectors to Amazon MSK Connect
<a name="msk-connect-migrate-source-connectors"></a>

Source connectors are Apache Kafka Connect applications that import records from external systems into Kafka. This section describes the process for migrating Apache Kafka Connect source connector applications that are running on-premises or self-managed Kafka Connect clusters that are running on AWS to Amazon MSK Connect.

The Kafka Connect source connector application stores offsets in a topic that’s named with the value that’s set for the config property `offset.storage.topic`. The following are the sample offset messages for a JDBC connector that’s running two tasks that import data from two different tables named `movies` and `shows`. The most recent row imported from the table movies has a primary ID of `18343`. The most recent row imported from the shows table has a primary ID of `732`.

```
["jdbcsource",{"protocol":"1","table":"sample.movies"}] {"incrementing":18343}
["jdbcsource",{"protocol":"1","table":"sample.shows"}] {"incrementing":732}
```

To migrate source connectors to Amazon MSK Connect, do the following:

1. Create an Amazon MSK Connect [custom plugin](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-plugins.html) by pulling connector libraries from your on-premises or self-managed Kafka Connect cluster.

1. Create Amazon MSK Connect [worker properties](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-config-provider.html#msk-connect-config-providers-create-custom-config) and set the properties `key.converter`, `value.converter`, and `offset.storage.topic` to the same values that are set for the Kafka connector that’s running in your existing Kafka Connect cluster.

1. Pause the connector application on the existing cluster by making a `PUT /connectors/connector-name/pause` request on the existing Kafka Connect cluster.

1. Make sure that all of the connector application’s tasks are completely stopped. You can stop the tasks either by making a `GET /connectors/connector-name/status` request on the existing Kafka Connect cluster or by consuming the messages from the topic name that’s set for the property `status.storage.topic`.

1. Get the connector configuration from the existing cluster. You can get the connector configuration either by making a `GET /connectors/connector-name/config/` request on the existing cluster or by consuming the messages from the topic name that’s set for the property `config.storage.topic`.

1. Create a new [Amazon MSK Connector](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html) with the same name as an existing cluster. Create this connector by using the connector custom plugin that you created in step 1, the worker properties that you created in step 2, and the connector configuration that you extracted in step 5.

1. When the Amazon MSK Connector status is `active`, view the logs to verify that the connector has started importing data from the source system.

1. Delete the connector in the existing cluster by making a `DELETE /connectors/connector-name` request.

# Migrate sink connectors to Amazon MSK Connect
<a name="msk-connect-migrate-sink-connectors"></a>

Sink connectors are Apache Kafka Connect applications that export data from Kafka to external systems. This section describes the process for migrating Apache Kafka Connect sink connector applications that are running on-premises or self-managed Kafka Connect clusters that are running on AWS to Amazon MSK Connect.

Kafka Connect sink connectors use the Kafka group membership API and store offsets in the same `__consumer_offset` topics as a typical consumer application. This behavior simplifies migration of the sink connector from a self-managed cluster to Amazon MSK Connect.

To migrate sink connectors to Amazon MSK Connect, do the following:

1. Create an Amazon MSK Connect [custom plugin](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-plugins.html) by pulling connector libraries from your on-premises or self-managed Kafka Connect cluster.

1. Create Amazon MSK Connect [worker properties](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-config-provider.html#msk-connect-config-providers-create-custom-config) and set the properties `key.converter` and `value.converter` to the same values that are set for the Kafka connector that’s running in your existing Kafka Connect cluster.

1. Pause the connector application on your existing cluster by making a `PUT /connectors/connector-name/pause` request on the existing Kafka Connect cluster.

1. Make sure that all of the connector application’s tasks are completely stopped. You can stop the tasks either by making a `GET /connectors/connector-name/status` request on the existing Kafka Connect cluster, or by consuming the messages from the topic name that’s set for the property `status.storage.topic`.

1. Get the connector configuration from the existing cluster. You can get the connector configuration either by making a `GET /connectors/connector-name/config` request on the existing cluster, or by consuming the messages from the topic name that’s set for the property `config.storage.topic`.

1. Create a new [Amazon MSK Connector](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html) with same name as the existing cluster. Create this connector by using the connector custom plugin that you created in step 1, the worker properties that you created in step 2, and the connector configuration that you extracted in step 5.

1. When the Amazon MSK Connector status is `active`, view the logs to verify that the connector has started importing data from the source system.

1. Delete the connector in the existing cluster by making a `DELETE /connectors/connector-name` request.

# Troubleshoot issues in Amazon MSK Connect
<a name="msk-connect-troubleshooting"></a>

The following information can help you troubleshoot problems that you might have while using MSK Connect. You can also post your issue to the [AWS re:Post](https://repost.aws/).

**Connector is unable to access resources hosted on the public internet**  
See [Enabling internet access for Amazon MSK Connect](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-internet-access.html).

**Connector's number of running tasks is not equal to the number of tasks specified in tasks.max**  
Here are some reasons a connector may use fewer tasks than the specified tasks.max configuration:
+ Some connector implementations limit the number of tasks the can be used. For example, the Debezium connector for MySQL is limited to using a single task.
+ When using autoscaled capacity mode, Amazon MSK Connect overrides a connector's tasks.max property with a value that is proportional to the number of workers running in the connector and the number of MCUs per worker. If you have configured the optional `maxAutoscalingTaskCount` parameter, the `tasks.max` value will not exceed this limit. For more information, see [Understand maximum autoscaling task count](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html#msk-connect-max-autoscaling-task-count).
+ For sink connectors, the level of parallelism (number of tasks) cannot be more than the number of topic partitions. While you can set the tasks.max larger than that, a single partition is never processed by more than a single task at a time.
+ In Kafka Connect 2.7.x, the default consumer partition assignor is `RangeAssignor`. The behavior of this assignor is to give the first partition of every topic to a single consumer, the second partition of every topic to a single consumer, etc. This means that the maximum number of active tasks for a sink connector using `RangeAssignor` is equal to the maximum number of partitions in any single topic being consumed. If this doesn't work for your use case, you should [create a Worker Configuration](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-create-custom-worker-config) in which the `consumer.partition.assignment.strategy` property is set to a more suitable consumer partition assignor. See [Kafka 2.7 Interface ConsumerPartitionAssignor: *All Known Implementing Classes*](https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html).