

# Getting started with MSK Connect
<a name="msk-connect-getting-started"></a>

This is a step-by-step tutorial that uses the AWS Management Console to create an MSK cluster and a sink connector that sends data from the cluster to an S3 bucket.

**Topics**
+ [Set up resources required for MSK Connect](mkc-tutorial-setup.md)
+ [Create custom plugin](mkc-create-plugin.md)
+ [Create client machine and Apache Kafka topic](mkc-create-topic.md)
+ [Create connector](mkc-create-connector.md)
+ [Send data to the MSK cluster](mkc-send-data.md)

# Set up resources required for MSK Connect
<a name="mkc-tutorial-setup"></a>

In this step you create the following resources that you need for this getting-started scenario:
+ An Amazon S3 bucket to serve as the destination that receives data from the connector.
+ An MSK cluster to which you will send data. The connector will then read the data from this cluster and send it to the destination S3 bucket.
+ An IAM policy that contains the permissions to write to the destination S3 bucket.
+ An IAM role that allows the connector to write to the destination S3 bucket. You'll add the IAM policy that you create to this role.
+ An Amazon VPC endpoint to make it possible to send data from the Amazon VPC that has the cluster and the connector to Amazon S3.

**To create the S3 bucket**

1. Sign in to the AWS Management Console and open the Amazon S3 console at [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Choose **Create bucket**.

1. For the name of the bucket, enter a descriptive name such as `amzn-s3-demo-bucket-mkc-tutorial`.

1. Scroll down and choose **Create bucket**.

1. In the list of buckets, choose the newly created bucket.

1. Choose **Create folder**.

1. Enter `tutorial` for the name of the folder, then scroll down and choose **Create folder**.

**To create the cluster**

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. In the left pane, under **MSK Clusters**, choose **Clusters**.

1. Choose **Create cluster**.

1. In **Creation method**, choose **Custom create**.

1. For the cluster name enter **mkc-tutorial-cluster**.

1. In **Cluster type**, choose **Provisioned**.

1. Choose **Next**.

1. Under **Networking**, choose an Amazon VPC. Then select the Availability Zones and subnets that you want to use. Remember the IDs of the Amazon VPC and subnets that you selected because you need them later in this tutorial.

1. Choose **Next**.

1. Under **Access control methods** ensure that only **Unauthenticated access** is selected.

1. Under **Encryption** ensure that only **Plaintext** is selected.

1. Continue through the wizard and then choose **Create cluster**. This takes you to the details page for the cluster. On that page, under **Security groups applied**, find the security group ID. Remember that ID because you need it later in this tutorial.

**To create an IAM policy with permissions to write to the S3 bucket**

1. Open the IAM console at [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. On the navigation pane, choose **Policies**.

1. Choose **Create policy**.

1. In **Policy editor**, choose **JSON**, and then replace the JSON in the editor window with the following JSON.

   In the following example, replace *<amzn-s3-demo-bucket-my-tutorial>* with the name of your S3 bucket.

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Sid": "AllowListBucket",
         "Effect": "Allow",
         "Action": [
           "s3:ListBucket",
           "s3:GetBucketLocation"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>"
       },
       {
         "Sid": "AllowObjectActions",
         "Effect": "Allow",
         "Action": [
           "s3:PutObject",
           "s3:GetObject",
           "s3:DeleteObject",
           "s3:AbortMultipartUpload",
           "s3:ListMultipartUploadParts",
           "s3:ListBucketMultipartUploads"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>/*"
       }
     ]
   }
   ```

------

   For instructions about how to write secure policies, see [IAM access control](iam-access-control.md).

1. Choose **Next**.

1. On the **Review and create** page, do the following:

   1. For **Policy name**, enter a descriptive name, such as **mkc-tutorial-policy**.

   1. In **Permissions defined in this policy**, review and/or edit the permissions defined in your policy.

   1. (Optional) To help identify, organize, or search for the policy, choose **Add new tag** to add tags as key-value pairs. For example, add a tag to your policy with the key-value pair of **Environment** and **Test**.

      For more information about using tags, see [Tags for AWS Identity and Access Management resources](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html) in the *IAM User Guide*.

1. Choose **Create policy**.

**To create the IAM role that can write to the destination bucket**

1. On the navigation pane of the IAM console, choose **Roles**, and then choose **Create role**.

1. On the **Select trusted entity** page, do the following:

   1. For **Trusted entity type**, choose **AWS service**.

   1. For **Service or use case**, choose **S3**.

   1. Under **Use case**, choose **S3**.

1. Choose **Next**.

1. On the **Add permissions** page, do the following:

   1. In the search box under **Permissions policies**, enter the name of the policy that you previously created for this tutorial. For example, **mkc-tutorial-policy**. Then, choose the box to the left of the policy name.

   1. (Optional) Set a [permissions boundary](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_boundaries.html). This is an advanced feature that is available for service roles, but not service-linked roles. For information about setting a permissions boundary, see [Creating roles and attaching policies (console)](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_job-functions_create-policies.html) in the *IAM User Guide*.

1. Choose **Next**.

1. On the **Name, review, and create** page, do the following:

   1. For **Role name**, enter a descriptive name, such as **mkc-tutorial-role**.
**Important**  
When you name a role, note the following:  
Role names must be unique within your AWS account, and can't be made unique by case.  
For example, don't create roles named both **PRODROLE** and **prodrole**. When a role name is used in a policy or as part of an ARN, the role name is case sensitive, however when a role name appears to customers in the console, such as during the sign-in process, the role name is case insensitive.
You can't edit the name of the role after it's created because other entities might reference the role.

   1. (Optional) For **Description**, enter a description for the role.

   1. (Optional) To edit the use cases and permissions for the role, in **Step 1: Select trusted entities** or **Step 2: Add permissions** sections, choose **Edit**.

   1. (Optional) To help identify, organize, or search for the role, choose **Add new tag** to add tags as key-value pairs. For example, add a tag to your role with the key-value pair of **ProductManager** and **John**.

      For more information about using tags, see [Tags for AWS Identity and Access Management resources](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html) in the *IAM User Guide*.

1. Review the role, and then choose **Create role**.

**To allow MSK Connect to assume the role**

1. In the IAM console, in the left pane, under **Access management**, choose **Roles**.

1. Find the `mkc-tutorial-role` and choose it.

1. Under the role's **Summary**, choose the **Trust relationships** tab.

1. Choose **Edit trust relationship**.

1. Replace the existing trust policy with the following JSON.

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "kafkaconnect.amazonaws.com"
         },
         "Action": "sts:AssumeRole"
       }
     ]
   }
   ```

------

1. Choose **Update Trust Policy**.

**To create an Amazon VPC endpoint from the cluster's VPC to Amazon S3**

1. Open the Amazon VPC console at [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. In the left pane, choose **Endpoints**.

1. Choose **Create endpoint**.

1. Under **Service Name** choose the **com.amazonaws.us-east-1.s3** service and the **Gateway** type.

1. Choose the cluster's VPC and then select the box to the left of the route table that is associated with the cluster's subnets.

1. Choose **Create endpoint**.

**Next Step**

[Create custom plugin](mkc-create-plugin.md)

# Create custom plugin
<a name="mkc-create-plugin"></a>

A plugin contains the code that defines the logic of the connector. In this step you create a custom plugin that has the code for the Lenses Amazon S3 Sink Connector. In a later step, when you create the MSK connector, you specify that its code is in this custom plugin. You can use the same plugin to create multiple MSK connectors with different configurations.

**To create the custom plugin**

1. Download the [S3 connector](https://www.confluent.io/hub/confluentinc/kafka-connect-s3).

1. Upload the ZIP file to an S3 bucket to which you have access. For information on how to upload files to Amazon S3, see [Uploading objects](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html) in the Amazon S3 user guide.

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. In the left pane expand **MSK Connect**, then choose **Custom plugins**.

1. Choose **Create custom plugin**.

1. Choose **Browse S3**.

1. In the list of buckets find the bucket where you uploaded the ZIP file, and choose that bucket.

1. In the list of objects in the bucket, select the radio button to the left of the ZIP file, then choose the button labeled **Choose**.

1. Enter `mkc-tutorial-plugin` for the custom plugin name, then choose **Create custom plugin**.

It might take AWS a few minutes to finish creating the custom plugin. When the creation process is complete, you see the following message in a banner at the top of the browser window.

```
Custom plugin mkc-tutorial-plugin was successfully created
The custom plugin was created. You can now create a connector using this custom plugin.
```

**Next Step**

[Create client machine and Apache Kafka topic](mkc-create-topic.md)

# Create client machine and Apache Kafka topic
<a name="mkc-create-topic"></a>

In this step you create an Amazon EC2 instance to use as an Apache Kafka client instance. You then use this instance to create a topic on the cluster.

**To create a client machine**

1. Open the Amazon EC2 console at [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Choose **Launch instances**.

1. Enter a **Name** for your client machine, such as **mkc-tutorial-client**.

1. Leave **Amazon Linux 2 AMI (HVM) - Kernel 5.10, SSD Volume Type** selected for **Amazon Machine Image (AMI) type**.

1. Choose the **t2.xlarge** instance type.

1. Under **Key pair (login)**, choose **Create a new key pair**. Enter **mkc-tutorial-key-pair** for **Key pair name**, and then choose **Download Key Pair**. Alternatively, you can use an existing key pair.

1. Choose **Launch instance**.

1. Choose **View Instances**. Then, in the **Security Groups** column, choose the security group that is associated with your new instance. Copy the ID of the security group, and save it for later.

**To allow the newly created client to send data to the cluster**

1. Open the Amazon VPC console at [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. In the left pane, under **SECURITY**, choose **Security Groups**. In the **Security group ID** column, find the security group of the cluster. You saved the ID of this security group when you created the cluster in [Set up resources required for MSK Connect](mkc-tutorial-setup.md). Choose this security group by selecting the box to the left of its row. Make sure no other security groups are simultaneously selected.

1. In the bottom half of the screen, choose the **Inbound rules** tab.

1. Choose **Edit inbound rules**.

1. In the bottom left of the screen, choose **Add rule**.

1. In the new rule, choose **All traffic** in the **Type** column. In the field to the right of the **Source** column, enter the ID of the security group of the client machine. This is the security group ID that you saved after you created the client machine.

1. Choose **Save rules**. Your MSK cluster will now accept all traffic from the client you created in the previous procedure.

**To create a topic**

1. Open the Amazon EC2 console at [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. In the table of instances choose `mkc-tutorial-client`.

1. Near the top of the screen, choose **Connect**, then follow the instructions to connect to the instance.

1. Install Java on the client instance by running the following command:

   ```
   sudo yum install java-1.8.0
   ```

1. Run the following command to download Apache Kafka. 

   ```
   wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
   ```
**Note**  
If you want to use a mirror site other than the one used in this command, you can choose a different one on the [Apache](https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.11-1.1.1.tgz) website.

1. Run the following command in the directory where you downloaded the TAR file in the previous step.

   ```
   tar -xzf kafka_2.12-2.2.1.tgz
   ```

1. Go to the **kafka\$12.12-2.2.1** directory.

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. In the left pane choose **Clusters**, then choose the name `mkc-tutorial-cluster`.

1. Choose **View client information**.

1. Copy the **Plaintext** connection string.

1. Choose **Done**.

1. Run the following command on the client instance (`mkc-tutorial-client`), replacing *bootstrapServerString* with the value that you saved when you viewed the cluster's client information.

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server bootstrapServerString --replication-factor 2 --partitions 1 --topic mkc-tutorial-topic
   ```

   If the command succeeds, you see the following message: `Created topic mkc-tutorial-topic.`

**Next Step**

[Create connector](mkc-create-connector.md)

# Create connector
<a name="mkc-create-connector"></a>

This procedure describes how to create a connector using the AWS Management Console.

**To create the connector**

1. Sign in to the AWS Management Console, and open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. In the left pane, expand **MSK Connect**, then choose **Connectors**.

1. Choose **Create connector**.

1. In the list of plugins, choose `mkc-tutorial-plugin`, then choose **Next**.

1. For the connector name enter `mkc-tutorial-connector`.

1. In the list of clusters, choose `mkc-tutorial-cluster`.

1. In the **Connector network settings** section, choose one of the following for network type:
   + **IPv4** (default) - For connectivity to destinations over IPv4 only
   + **Dual-stack** - For connectivity to destinations over both IPv4 and IPv6 (only available if your subnets have IPv4 and IPv6 CIDR blocks associated with them)

1. Copy the following configuration and paste it into the connector configuration field.

   Make sure that you replace region with the code of the AWS Region where you're creating the connector. Also, replace the Amazon S3 bucket name *<amzn-s3-demo-bucket-my-tutorial>* with the name of your bucket in the following example.

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   s3.region=us-east-1
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   flush.size=1
   schema.compatibility=NONE
   tasks.max=2
   topics=mkc-tutorial-topic
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   storage.class=io.confluent.connect.s3.storage.S3Storage
   s3.bucket.name=<amzn-s3-demo-bucket-my-tutorial>
   topics.dir=tutorial
   ```

1. Under **Access permissions** choose `mkc-tutorial-role`.

1. Choose **Next**. On the **Security** page, choose **Next** again.

1. On the **Logs** page choose **Next**.

1. On the **Review and create** page, review your connector configuration and choose **Create connector**.

**Next Step**

[Send data to the MSK cluster](mkc-send-data.md)

# Send data to the MSK cluster
<a name="mkc-send-data"></a>

In this step you send data to the Apache Kafka topic that you created earlier, and then look for that same data in the destination S3 bucket.

**To send data to the MSK cluster**

1. In the `bin` folder of the Apache Kafka installation on the client instance, create a text file named `client.properties` with the following contents.

   ```
   security.protocol=SASL_SSL
   sasl.mechanism=AWS_MSK_IAM
   ```

1. Run the following command to create a console producer. Replace *BootstrapBrokerString* with the value that you obtained when you ran the previous command.

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list BootstrapBrokerString --producer.config client.properties --topic mkc-tutorial-topic
   ```

1. Enter any message that you want, and press **Enter**. Repeat this step two or three times. Every time you enter a line and press **Enter**, that line is sent to your Apache Kafka cluster as a separate message.

1. Look in the destination Amazon S3 bucket to find the messages that you sent in the previous step.