

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Saiba mais sobre o MSK Connect
<a name="msk-connect"></a>

O MSK Connect é um recurso do Amazon MSK que facilita o streaming de dados de e para os clusters do Apache Kafka. O MSK Connect usa o Kafka Connect 2.7.1 ou 3.7.x, que são estruturas de código aberto para conectar clusters do Apache Kafka a sistemas externos, como bancos de dados, índices de pesquisa e sistemas de arquivos. Com o MSK Connect, você pode implantar conectores totalmente gerenciados criados para o Kafka Connect que movem dados para ou extraem dados de datastores populares, como Amazon S3 e Amazon Service. OpenSearch Você pode implantar conectores desenvolvidos por terceiros, como o Debezium, para transmitir logs de alterações de bancos de dados para um cluster do Apache Kafka ou implantar um conector existente sem alterações no código. Os conectores escalam automaticamente para se ajustar às mudanças na carga, e você paga apenas pelos recursos que usa.

Use conectores de origem para importar dados de sistemas externos para seus tópicos. Com conectores de coletor, você pode exportar dados de seus tópicos para sistemas externos.

O MSK Connect é compatível com conectores para qualquer cluster do Apache Kafka com conectividade com uma Amazon VPC, seja um cluster do MSK ou um cluster do Apache Kafka hospedado de maneira independente. 

O MSK Connect monitora continuamente a integridade e o estado de entrega dos conectores, corrige e gerencia o hardware subjacente e dimensiona automaticamente a escala dos conectores para corresponder às mudanças no throughput.

Para começar a usar o MSK Connect, consulte [Conceitos básicos sobre o MSK Connect](msk-connect-getting-started.md). 

Para saber mais sobre os AWS recursos que você pode criar com o MSK Connect[Saiba mais sobre conectores](msk-connect-connectors.md), consulte[Criar plug-ins personalizados](msk-connect-plugins.md), e. [Saiba mais sobre os operadores do MSK Connect](msk-connect-workers.md)

Para obter informações sobre a API do MSK Connect, consulte a [Referência de API do Amazon MSK Connect](https://docs.aws.amazon.com/MSKC/latest/mskc/Welcome.html). 

## Benefícios de usar o Amazon MSK Connect
<a name="msk-connect-benefits"></a>

O Apache Kafka é uma das plataformas de streaming de código aberto mais amplamente adotadas para ingerir e processar fluxos de dados em tempo real. Com o Apache Kafka, você pode desacoplar e escalar de forma independente as aplicações que produzem e consomem dados.

O Kafka Connect é um componente importante da criação e execução de aplicações de streaming com o Apache Kafka. O Kafka Connect fornece uma maneira padronizada de mover dados entre o Kafka e sistemas externos. O Kafka Connect é altamente escalável e pode lidar com grandes volumes de dados. Ele fornece um conjunto avançado de operações e ferramentas de API para configurar, implantar e monitorar conectores que movem dados entre tópicos do Kafka e sistemas externos. Você pode usar essas ferramentas para personalizar e ampliar a funcionalidade do Kafka Connect para atender às necessidades específicas da aplicação de streaming.

Você pode enfrentar desafios ao operar clusters do Apache Kafka Connect por conta própria, ou ao tentar migrar aplicações de código aberto do Apache Kafka Connect para a AWS. Esses desafios incluem o tempo necessário para configurar a infraestrutura e implantar aplicações, obstáculos de engenharia ao configurar clusters autogerenciados do Apache Kafka Connect e sobrecarga operacional administrativa.

Para enfrentar esses desafios, recomendamos usar o Amazon Managed Streaming for Apache Kafka Connect (Amazon MSK Connect) para migrar as aplicações do Apache Kafka Connect de código aberto para a AWS. O Amazon MSK Connect simplifica o uso do Kafka Connect para transmitir dados de e para entre clusters do Apache Kafka e sistemas externos, como bancos de dados, índices de pesquisa e sistemas de arquivos.

Veja abaixo alguns benefícios de migrar para o Amazon MSK Connect:
+ **Eliminação da sobrecarga operacional**: o Amazon MSK Connect elimina a carga operacional associada à aplicação de patches, provisionamento e escalabilidade dos clusters do Apache Kafka Connect. O Amazon MSK Connect monitora continuamente a integridade dos clusters do Connect e automatiza a aplicação de patches e as atualizações de versão sem causar interrupções nas workloads.
+ **Reinício automático das tarefas do Connect**: o Amazon MSK Connect pode recuperar automaticamente tarefas com falha para reduzir as interrupções na produção. As falhas nas tarefas podem ser causadas por erros temporários, como a violação do limite de conexão TCP do Kafka e o rebalanceamento de tarefas quando novos operadores se juntam ao grupo de consumidores para conectores de coletor.
+ **Escalabilidade horizontal e vertical automática**: o Amazon MSK Connect permite que a aplicação de conectores seja escalada automaticamente para ser compatível com maiores taxas de transferência. O Amazon MSK Connect gerencia a escalabilidade para você. Você só precisa especificar o número de operadores no grupo do Auto Scaling e os limites de utilização.o Você pode usar a operação da `UpdateConnector` API Amazon MSK Connect para aumentar ou reduzir verticalmente o v CPUs entre 1 e 8 v CPUs para suportar a taxa de transferência variável.
+ **Conectividade de rede privada** — O Amazon MSK Connect se conecta de forma privada aos sistemas de origem e coletor usando nomes AWS PrivateLink DNS privados.

# Conceitos básicos sobre o MSK Connect
<a name="msk-connect-getting-started"></a>

Este é um step-by-step tutorial que usa o Console de gerenciamento da AWS para criar um cluster MSK e um conector de coletor que envia dados do cluster para um bucket S3.

**Topics**
+ [Configurar os recursos necessários para o MSK Connect](mkc-tutorial-setup.md)
+ [Criar plug-in personalizado](mkc-create-plugin.md)
+ [Criar a máquina cliente e o tópico do Apache Kafka](mkc-create-topic.md)
+ [Criar um conector](mkc-create-connector.md)
+ [Enviar dados para o cluster do MSK](mkc-send-data.md)

# Configurar os recursos necessários para o MSK Connect
<a name="mkc-tutorial-setup"></a>

Nesta etapa, você cria os seguintes recursos necessários para esse cenário inicial:
+ Um bucket do Amazon S3 para servir como destino que recebe dados do conector.
+ Um cluster do MSK para o qual você enviará dados. Em seguida, o conector lerá os dados desse cluster e os enviará para o bucket S3 de destino.
+ Uma política do IAM que contém as permissões para gravar no bucket do S3 de destino.
+ Um perfil do IAM que permite ao conector gravar no bucket do S3 de destino. Você adicionará a política do IAM criada a esse perfil.
+ Um endpoint da Amazon VPC para possibilitar o envio de dados da Amazon VPC que tem o cluster e o conector para o Amazon S3.

**Para criar um bucket do S3**

1. Faça login no Console de gerenciamento da AWS e abra o console do Amazon S3 em. [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)

1. Selecione **Criar bucket**.

1. Para o nome do bucket, insira um nome descritivo, como `amzn-s3-demo-bucket-mkc-tutorial`.

1. Role para baixo e escolha **Criar bucket**.

1. Na lista de buckets, escolha o bucket recém-criado.

1. Selecione **Criar pasta**.

1. Digite `tutorial` para o nome da pasta, depois role para baixo e escolha **Criar pasta**.

**Para criar um cluster**

1. Abra o console Amazon MSK em [https://console.aws.amazon.com/msk/casa? region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. No painel esquerdo, em **Clusters do MSK**, escolha **Clusters**.

1. Selecione **Criar cluster**.

1. Em **Método de criação**, escolha **Criação personalizada**.

1. Insira **mkc-tutorial-cluster** para o nome do cluster.

1. Em **Tipo de cluster**, escolha **Provisionado**.

1. Escolha **Próximo**.

1. Em **Rede**, escolha uma Amazon VPC. Em seguida, selecione as zonas de disponibilidade e as sub-redes que deseja usar. Lembre-se IDs da Amazon VPC e das sub-redes que você selecionou porque precisa delas posteriormente neste tutorial.

1. Escolha **Próximo**.

1. Em **Métodos de controle de acesso**, verifique se somente o **Acesso não autenticado** está selecionado.

1. Em **Criptografia**, certifique-se de que somente **Texto simples** esteja selecionado.

1. Continue com o assistente e escolha **Criar cluster**. Você será redirecionado para a página detalhes do cluster. Nessa página, em **Grupos de segurança aplicados**, encontre o ID do grupo de segurança. Lembre-se desse ID porque você precisará dele posteriormente neste tutorial.

**Para criar uma política do IAM com permissões para gravar no bucket do S3.**

1. Abra o console do IAM em [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. No painel de navegação, escolha **Políticas**.

1. Selecione **Criar política**.

1. Em **Editor de política**, escolha a guia **JSON** e substitua o JSON na janela do editor pelo JSON a seguir.

   No exemplo a seguir, *<amzn-s3-demo-bucket-my-tutorial>* substitua pelo nome do seu bucket do S3.

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Sid": "AllowListBucket",
         "Effect": "Allow",
         "Action": [
           "s3:ListBucket",
           "s3:GetBucketLocation"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>"
       },
       {
         "Sid": "AllowObjectActions",
         "Effect": "Allow",
         "Action": [
           "s3:PutObject",
           "s3:GetObject",
           "s3:DeleteObject",
           "s3:AbortMultipartUpload",
           "s3:ListMultipartUploadParts",
           "s3:ListBucketMultipartUploads"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>/*"
       }
     ]
   }
   ```

------

   Para obter instruções sobre como gravar políticas seguras, consulte [Controle de acesso do IAM](iam-access-control.md).

1. Escolha **Próximo**.

1. Na página **Revisar e criar**, faça o seguinte:

   1. Em **Nome da política**, insira um nome descritivo, como **mkc-tutorial-policy**.

   1. Em **Permissões definidas nesta política**, revise e and/or edite as permissões definidas em sua política.

   1. (Opcional) Para ajudar a identificar, organizar ou pesquisar a política, escolha **Adicionar nova tag** para adicioná-la como pares de chave-valor. Por exemplo, adicione uma tag à sua política com o par de valores-chave **Environment** e **Test**.

      Para obter mais informações sobre o uso de tags, consulte [Tags para AWS Identity and Access Management recursos](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html) no *Guia do usuário do IAM*.

1. Selecione **Criar política**.

**Para criar o perfil do IAM capaz de gravar no bucket de destino**

1. No painel de navegação do console do IAM, escolha **Perfis** e, em seguida, **Criar perfil**.

1. Na página **Selecionar entidade confiável**, faça o seguinte:

   1. Em **Tipo de entidade confiável**, escolha **AWS service (Serviço da AWS)**.

   1. Em **Serviço ou caso de uso**, escolha **S3**.

   1. Em **Caso de uso**, escolha **S3**.

1. Escolha **Próximo**.

1. Na página **Adicionar permissões**, faça o seguinte:

   1. Na caixa de pesquisa sob **Políticas de permissão**, insira o nome da política que você criou anteriormente para este tutorial. Por exemplo, .**mkc-tutorial-policy** Depois, à esquerda do nome da política, marque a caixa de seleção.

   1. (Opcional) Defina um [limite de permissões](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_boundaries.html). Esse é um atributo avançado que está disponível para perfis de serviço, mas não para perfis vinculados ao serviço. Para obter informações sobre como definir um limite de permissões, consulte [Como criar perfis e anexar políticas (console)](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_job-functions_create-policies.html) no *Guia do usuário do IAM*.

1. Escolha **Próximo**.

1. Na página **Nomear, revisar e criar**, faça o seguinte:

   1. Em **Nome do perfil**, insira um nome descritivo, como **mkc-tutorial-role**.
**Importante**  
Quando nomear um perfil, observe o seguinte:  
Os nomes das funções devem ser exclusivos dentro de você Conta da AWS e não podem ser diferenciados por maiúsculas e minúsculas.  
Por exemplo, não crie dois perfis denominados **PRODROLE** e **prodrole**. Quando usado em uma política ou como parte de um ARN, o nome de perfil diferencia maiúsculas de minúsculas. No entanto, quando exibido para os clientes no console, como durante o processo de login, o nome de perfil diferencia maiúsculas de minúsculas.
Não é possível editar o nome do perfil depois de criá-lo porque outras entidades podem referenciar o perfil.

   1. (Opcional) Em **Descrição**, insira uma descrição para o perfil.

   1. (Opcional) Para editar os casos de uso e as permissões do perfil, escolha **Etapa 1: Selecionar entidades confiáveis** ou **Etapa 2: Adicionar permissões**, depois **Editar**.

   1. (Opcional) Para ajudar a identificar, organizar ou pesquisar o perfil, escolha **Adicionar nova tag** para adicioná-las como pares de chave-valor. Por exemplo, adicione uma tag ao seu perfil com o par de valores-chave **ProductManager** e **John**.

      Para obter mais informações sobre o uso de tags, consulte [Tags para AWS Identity and Access Management recursos](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html) no *Guia do usuário do IAM*.

1. Reveja a função e escolha **Criar função**.

**Para permitir que o MSK Connect assuma o perfil**

1. No console do IAM, em **Gerenciamento de acesso** no painel esquerdo, escolha **Perfis**.

1. Encontre e escolha o `mkc-tutorial-role`.

1. Na página **Resumo** do perfil, escolha a guia **Relações de confiança**.

1. Selecione **Editar relação de confiança**.

1. Substitua a política de confiança existente pelo seguinte JSON.

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "kafkaconnect.amazonaws.com"
         },
         "Action": "sts:AssumeRole"
       }
     ]
   }
   ```

------

1. Selecione **Atualizar política de confiança**.

**Para criar um endpoint da Amazon VPC da VPC do cluster para o Amazon S3**

1. Abra o console da Amazon VPC em [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. No painel esquerdo, escolha **Endpoints**.

1. Escolha **Criar endpoint**.

1. Em **Nome do serviço**, escolha o serviço **com.amazonaws.us-east-1.s3** e o tipo **Gateway**.

1. Escolha a VPC do cluster e, em seguida, selecione a caixa à esquerda da tabela de rotas associada às sub-redes do cluster.

1. Escolha **Criar endpoint**.

**Próxima etapa**

[Criar plug-in personalizado](mkc-create-plugin.md)

# Criar plug-in personalizado
<a name="mkc-create-plugin"></a>

Um plug-in contém o código que define a lógica do conector. Nesta etapa, você criará um plug-in personalizado contendo o código para o Lenses Amazon S3 Sink Connector. Em uma etapa posterior, ao criar o conector do MSK, você especificará que seu código está nesse plug-in personalizado. Você pode usar o mesmo plug-in para criar vários conectores do MSK com configurações diferentes.

**Para criar o plug-in personalizado**

1. Baixe o [conector do S3](https://www.confluent.io/hub/confluentinc/kafka-connect-s3).

1. Faça upload do arquivo ZIP para um bucket do S3 ao qual você tenha acesso. Para obter informações sobre como fazer upload de arquivos para o Amazon S3, consulte [Carregar objetos](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html) no Guia do usuário do Amazon S3.

1. Abra o console do Amazon MSK em [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. No painel esquerdo, expanda **MSK Connect** e escolha **Plug-ins personalizados**.

1. Escolha **Criar plug-in personalizado**.

1. Selecione **Navegar no S3**.

1. Na lista de buckets, encontre o bucket no qual você fez o upload do arquivo ZIP e escolha-o.

1. Na lista de objetos no bucket, marque o botão de seleção à esquerda do arquivo ZIP e selecione o botão **Escolher**.

1. Insira `mkc-tutorial-plugin` para o nome do plug-in personalizado e escolha **Criar plug-in personalizado**.

Pode levar AWS alguns minutos para concluir a criação do plug-in personalizado. Quando o processo de criação estiver concluído, você verá a seguinte mensagem em um banner na parte superior da janela do navegador.

```
Custom plugin mkc-tutorial-plugin was successfully created
The custom plugin was created. You can now create a connector using this custom plugin.
```

**Próxima etapa**

[Criar a máquina cliente e o tópico do Apache Kafka](mkc-create-topic.md)

# Criar a máquina cliente e o tópico do Apache Kafka
<a name="mkc-create-topic"></a>

Nesta etapa, você vai criar uma instância do Amazon EC2 para usar como uma instância do cliente do Apache Kafka. Em seguida, você usará essa instância para criar um tópico no cluster.

**Como criar uma máquina cliente**

1. Abra o console do Amazon EC2 em [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Selecione **Iniciar instâncias**.

1. Insira um **Nome** para sua máquina cliente, como **mkc-tutorial-client**.

1. Deixe a opção **AMI do Amazon Linux 2 (HVM) – Kernel 5.10, tipo de volume SSD** selecionada para **Tipo de imagem de máquina da Amazon (AMI)**.

1. Escolha o tipo de instância **t2.xlarge**.

1. Na seção **Par de chaves**, escolha **Criar um novo par de chaves**. Digite **mkc-tutorial-key-pair** em **Nome do par de chaves** e, em seguida, escolha **Baixar par de chaves**. Se preferir, use um par de chaves existente.

1. Escolha **Iniciar instância**.

1. Escolha **Exibir instâncias**. Na coluna **Grupos de segurança**, escolha o grupo de segurança que está associado à sua nova instância. Copie o ID do grupo de segurança e salve-o para usar posteriormente.

**Para permitir que o cliente recém-criado envie dados para o cluster**

1. Abra o console da Amazon VPC em [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. No painel esquerdo, em **SEGURANÇA**, escolha **Grupos de segurança)**. Na coluna **ID do grupo de segurança**, localize o grupo de segurança do cluster. Você salvou o ID desse grupo de segurança ao criar o cluster em [Configurar os recursos necessários para o MSK Connect](mkc-tutorial-setup.md). Escolha esse grupo de segurança marcando a caixa à esquerda de sua linha. Certifique-se de que nenhum outro grupo de segurança seja selecionado simultaneamente.

1. Na metade inferior da tela, escolha a guia **Regras de entrada**.

1. Escolha **Editar regras de entrada**.

1. Na parte inferior esquerda da tela, escolha **Adicionar regra**.

1. Na nova regra, escolha **Todo o tráfego** na coluna **Tipo**. No campo à direita da coluna **Origem**, insira o ID do grupo de segurança da máquina cliente. Trata-se do ID do grupo de segurança que você salvou após criar a máquina cliente.

1. Selecione **Salvar regras**. Agora, seu cluster do MSK aceitará todo o tráfego do cliente criado no procedimento anterior.

**Para criar um tópico**

1. Abra o console do Amazon EC2 em [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Na tabela de instâncias, escolha `mkc-tutorial-client`.

1. Na parte superior da tela, escolha **Connect** e siga as instruções para se conectar à instância.

1. Instale o Java na instância do cliente executando o seguinte comando:

   ```
   sudo yum install java-1.8.0
   ```

1. Execute o comando a seguir para fazer download do Apache Kafka. 

   ```
   wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
   ```
**nota**  
Se quiser usar um local de espelhamento diferente do usado neste comando, você poderá escolher um local diferente no site do [Apache](https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.11-1.1.1.tgz).

1. Execute o comando a seguir no diretório onde você fez download do arquivo TAR na etapa anterior.

   ```
   tar -xzf kafka_2.12-2.2.1.tgz
   ```

1. Acesse o diretório **kafka\$12.12-2.2.1**.

1. Abra o console Amazon MSK em [https://console.aws.amazon.com/msk/casa? region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. No painel esquerdo, escolha **Clusters** e, em seguida, escolha o nome `mkc-tutorial-cluster`.

1. Escolha **Exibir informações do cliente**.

1. Copie a string de conexão em **texto simples**.

1. Selecione **Concluído**.

1. Execute o comando a seguir na instância do cliente (`mkc-tutorial-client`), *bootstrapServerString* substituindo-o pelo valor que você salvou ao visualizar as informações do cliente do cluster.

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server bootstrapServerString --replication-factor 2 --partitions 1 --topic mkc-tutorial-topic
   ```

   Se o comando tiver êxito, a seguinte mensagem será exibida: `Created topic mkc-tutorial-topic.`

**Próxima etapa**

[Criar um conector](mkc-create-connector.md)

# Criar um conector
<a name="mkc-create-connector"></a>

Este procedimento descreve como criar um conector usando o Console de gerenciamento da AWS.

**Para criar o conector**

1. Faça login no Console de gerenciamento da AWS e abra o console do Amazon MSK em [https://console.aws.amazon.com/msk/casa? region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. No painel esquerdo, expanda **MSK Connect** e escolha **Conectores**.

1. Escolha **Criar conector**.

1. Na lista de plug-ins, escolha `mkc-tutorial-plugin` e escolha **Próximo**.

1. Para o nome do conector, insira `mkc-tutorial-connector`.

1. Na lista de clusters, escolha `mkc-tutorial-cluster`.

1. Na seção **Configurações de rede do conector**, escolha uma das seguintes opções para o tipo de rede:
   + **IPv4**(padrão) - Para conectividade IPv4 somente com destinos
   + **Dual-stack** - Para conectividade com destinos em ambos IPv4 e IPv6 (disponível somente se suas sub-redes tiverem blocos IPv6 CIDR associados a IPv4 elas)

1. Copie a seguinte configuração e cole no campo de configuração do conector.

   Certifique-se de substituir a região pelo código de Região da AWS onde você está criando o conector. Além disso, substitua o nome do bucket do *<amzn-s3-demo-bucket-my-tutorial>* Amazon S3 pelo nome do seu bucket no exemplo a seguir.

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   s3.region=us-east-1
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   flush.size=1
   schema.compatibility=NONE
   tasks.max=2
   topics=mkc-tutorial-topic
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   storage.class=io.confluent.connect.s3.storage.S3Storage
   s3.bucket.name=<amzn-s3-demo-bucket-my-tutorial>
   topics.dir=tutorial
   ```

1. Em **Permissões de acesso**, escolha `mkc-tutorial-role`.

1. Escolha **Próximo**. Na página **Segurança**, escolha **Próximo** novamente.

1. Na página **Logs**, escolha **Próximo**.

1. Na página **Revisar e criar**, revise a configuração do conector e escolha **Criar conector**.

**Próxima etapa**

[Enviar dados para o cluster do MSK](mkc-send-data.md)

# Enviar dados para o cluster do MSK
<a name="mkc-send-data"></a>

Nesta etapa, você envia dados para o tópico do Apache Kafka que você criou anteriormente e, em seguida, procura esses mesmos dados no bucket do S3 de destino.

**Para enviar dados para o cluster do MSK**

1. Na pasta `bin` da instalação do Apache Kafka na instância do cliente, crie um arquivo de texto chamado `client.properties` com o conteúdo a seguir.

   ```
   security.protocol=SASL_SSL
   sasl.mechanism=AWS_MSK_IAM
   ```

1. Execute o comando a seguir para criar um produtor de console. *BootstrapBrokerString*Substitua pelo valor obtido ao executar o comando anterior.

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list BootstrapBrokerString --producer.config client.properties --topic mkc-tutorial-topic
   ```

1. Insira a mensagem que desejar e pressione **Enter**. Repita esta etapa duas ou três vezes. Toda vez que você inserir uma linha e pressionar **Enter**, essa linha será enviada para o cluster do Apache Kafka como uma mensagem separada.

1. Verifique o bucket do Amazon S3 de destino para encontrar as mensagens que você enviou na etapa anterior.

# Saiba mais sobre conectores
<a name="msk-connect-connectors"></a>

Um conector integra sistemas externos e serviços da Amazon ao Apache Kafka, copiando continuamente dados de streaming de uma fonte de dados para o cluster do Apache Kafka ou copiando continuamente os dados do cluster para um coletor de dados. Antes de entregar os dados a um destino, um conector também pode executar uma lógica leve, como transformação, conversão de formato ou filtragem de dados. Os conectores de origem extraem dados de uma fonte de dados e os enviam para o cluster, enquanto os conectores coletam dados do cluster e os enviam para um coletor de dados.

O diagrama a seguir mostra a arquitetura de um conector. Um operador é um processo de máquina virtual Java (JVM) que executa a lógica do conector. Cada operador cria um conjunto de tarefas que são executadas em threads paralelos e fazem o trabalho de copiar os dados. As tarefas não armazenam o estado e, portanto, podem ser iniciadas, interrompidas ou reiniciadas a qualquer momento para fornecer um pipeline de dados resiliente e escalável.

![\[Diagrama mostrando a arquitetura de um cluster de conectores.\]](http://docs.aws.amazon.com/pt_br/msk/latest/developerguide/images/mkc-worker-architecture.png)


# Saiba mais sobre a capacidade de conectores
<a name="msk-connect-capacity"></a>

A capacidade total de um conector depende do número de trabalhadores que o conector tem, bem como do número de MSK Connect Units (MCUs) por trabalhador. Cada MCU representa 1 vCPU de computação e 4 GiB de memória. A memória da MCU pertence à memória total de uma instância de trabalho e não à memória de pilha em uso.

Os operadores do MSK Connect consomem endereços IP nas sub-redes fornecidas pelo cliente. Cada operador usa um endereço IP de uma das sub-redes fornecidas pelo cliente. Você deve garantir que tenha endereços IP disponíveis suficientes nas sub-redes fornecidas a uma CreateConnector solicitação para considerar a capacidade especificada, especialmente ao escalar automaticamente conectores em que o número de trabalhadores pode flutuar.

Para criar um conector, você deve escolher entre um dos dois modos de capacidade a seguir.
+ *Provisionado*: escolha esse modo se você conhecer os requisitos de capacidade do seu conector. Você especifica dois valores:
  + O número de operadores.
  + O número de MCUs por trabalhador.
+ *Escalonamento automático*: escolha esse modo se os requisitos de capacidade do seu conector forem variáveis ou se você não os conhecer com antecedência. Quando você usa o modo de escalabilidade automática, o Amazon MSK Connect substitui a propriedade `tasks.max` do seu conector por um valor proporcional ao número de trabalhadores em execução no conector e ao número de trabalhadores por trabalhador. MCUs 

  Você especifica três conjuntos de valores:
  + O número mínimo e máximo de operadores.
  + Os percentuais de expansão e de redução da utilização da CPU, que são determinados pela métrica. `CpuUtilization` Quando a métrica `CpuUtilization` do conector excede o percentual de expansão, o MSK Connect aumenta o número de operadores em execução no conector. Quando a métrica `CpuUtilization` fica abaixo do percentual de expansão, o MSK Connect diminui o número de operadores. O número de operadores sempre permanece dentro dos números mínimo e máximo que você especifica ao criar o conector.
  + O número de MCUs por trabalhador.
  + (Opcional) *Contagem máxima de tarefas de escalonamento automático* - O número máximo de tarefas alocadas ao conector durante as operações de escalonamento automático. Esse parâmetro permite que você defina um limite superior na criação de tarefas, fornecendo maior controle sobre a utilização de recursos e o paralelismo em relação às partições de tópicos do Kafka.

Para obter mais informações sobre trabalhadores, consulte e[Saiba mais sobre os operadores do MSK Connect](msk-connect-workers.md), para obter mais informações sobre a contagem máxima de tarefas de escalonamento automático, consulte. [Entenda a contagem máxima de tarefas de escalonamento automático](msk-connect-max-autoscaling-task-count.md) Para saber mais sobre as métricas do MSK Connect, consulte [Monitoramento do Amazon MSK Connect](mkc-monitoring-overview.md).

# Entenda a contagem máxima de tarefas de escalonamento automático
<a name="msk-connect-max-autoscaling-task-count"></a>

O `maxAutoscalingTaskCount` parâmetro é um campo de capacidade opcional disponível para conectores de escalabilidade automática no Amazon MSK Connect. Esse parâmetro permite definir um limite máximo para o número máximo de tarefas que podem ser criadas durante as operações de escalonamento automático do conector, fornecendo maior controle sobre a utilização e o desempenho dos recursos.

Quando você usa o modo de capacidade com escalabilidade automática, o Amazon MSK Connect substitui automaticamente a propriedade `tasks.max` do seu conector com um valor proporcional ao número de trabalhadores e por trabalhador. MCUs O `maxAutoscalingTaskCount` parâmetro fornece uma opção adicional configurável para limitar o número máximo de tarefas criadas para seu conector.

Esse recurso é particularmente útil quando você deseja controlar o nível de paralelismo em relação ao número de partições de tópicos em seu cluster Kafka. Ao definir esse limite, você pode otimizar o desempenho e evitar a distribuição ineficiente de tarefas que pode ocorrer quando a contagem de tarefas calculada automaticamente excede seus requisitos de carga de trabalho.

## Requisitos de configuração
<a name="msk-connect-max-autoscaling-task-count-requirements"></a>

O `maxAutoscalingTaskCount` parâmetro deve atender aos seguintes requisitos:

```
maxAutoscalingTaskCount ≥ maxWorkerCount
```

Esse requisito garante a utilização eficiente dos recursos mantendo pelo menos uma tarefa por trabalhador. O sistema impõe esse mínimo para otimizar a funcionalidade do conector.

Quando você especifica`maxAutoscalingTaskCount`, o limite é aplicado imediatamente após a criação do conector e durante todos os eventos de escalabilidade subsequentes. À medida que o número de trabalhadores aumenta ou diminui durante as operações de escalonamento automático, o sistema continua cumprindo esse limite. O `tasks.max` valor se ajusta proporcionalmente ao número de trabalhadores e MCUs por trabalhador, mas nunca excede o valor configurado`maxAutoscalingTaskCount`.

Se você não especificar esse parâmetro, o conector usará o cálculo padrão sem limite: `tasks.max = workerCount × mcuCount × tasksPerMcu` (onde tasksPerMcu é 2). 

## Quando usar o maxAutoscalingTask Count
<a name="msk-connect-max-autoscaling-task-count-when-to-use"></a>

Considere o uso `maxAutoscalingTaskCount` nos seguintes cenários:
+ *Contagem limitada de partições*: quando seus tópicos do Kafka têm um número fixo de partições menor do que a contagem de tarefas calculada automaticamente, definir um limite impede a criação de tarefas ociosas sem trabalho a ser executado.
+ *Otimização de desempenho*: quando você identifica que uma contagem específica de tarefas fornece uma taxa de transferência ideal para sua carga de trabalho, você pode limitar o máximo de tarefas para manter um desempenho consistente.
+ *Gerenciamento de recursos*: quando você deseja controlar o máximo de paralelismo e consumo de recursos do seu conector, independentemente de quantos trabalhadores estejam em execução.

## Exemplo
<a name="msk-connect-max-autoscaling-task-count-example"></a>

Para um conector com a seguinte configuração:

```
minWorkerCount: 1
maxWorkerCount: 4
mcuCount: 8
maxAutoscalingTaskCount: 15
```

Sem isso`maxAutoscalingTaskCount`, quando escalado para 4 trabalhadores, o conector criaria 64 tarefas (4 trabalhadores × 8 MCUs × 2 tarefas por MCU). Com `maxAutoscalingTaskCount` definido como 15, o conector cria somente 15 tarefas, o que pode ser mais apropriado se o tópico do Kafka tiver 15 ou menos partições.

# Configurar o tipo de rede de pilha dupla
<a name="msk-connect-dual-stack"></a>

O Amazon MSK Connect oferece suporte ao tipo de rede de pilha dupla para novos conectores. Com a rede de pilha dupla, seus conectores podem se conectar a destinos por meio de e. IPv4 IPv6 Observe que a IPv6 conectividade só está disponível no modo de pilha dupla (IPv4 \$1 IPv6) - IPv6 somente a rede não é suportada.

Por padrão, os novos conectores usam o tipo IPv4 de rede. Para criar um conector com o tipo de rede de pilha dupla, verifique se você atendeu aos pré-requisitos descritos na seção a seguir. Observe que, depois de criar um conector usando o tipo de rede de pilha dupla, você não pode modificar seu tipo de rede. Para alterar os tipos de rede, você deve excluir e recriar o conector.

O Amazon MSK Connect também oferece suporte à conectividade de endpoints de API de serviço por meio de e. IPv6 IPv4 Para usar a IPv6 conectividade para chamadas de API, você precisa usar os endpoints de pilha dupla. Para obter mais informações sobre os endpoints do serviço MSK Connect, consulte os endpoints e cotas do [Amazon MSK Connect](https://docs.aws.amazon.com/general/latest/gr/msk-connect.html).

## Pré-requisitos para usar o tipo de rede de pilha dupla
<a name="dual-stack-prerequisites"></a>

Antes de configurar o tipo de rede de pilha dupla para seus conectores, certifique-se de que todas as sub-redes fornecidas durante a criação do conector tenham ambos os blocos CIDR atribuídos. IPv6 IPv4 

## Considerações sobre o uso do tipo de rede de pilha dupla
<a name="dual-stack-considerations"></a>
+ IPv6 atualmente, o suporte está disponível somente no modo de pilha dupla (IPv4 \$1 IPv6), não somente como -only IPv6
+ Conectores com pilha dupla habilitada podem se conectar tanto IPv6 aos sistemas de dados MSK IPv4 quanto Sink ou Source
+ O tipo de rede não pode ser modificado após a criação do conector - você deve excluir e recriar o conector para alterar os tipos de rede
+ Todas as sub-redes especificadas durante a criação do conector devem suportar pilha dupla para que a criação do conector seja bem-sucedida com o tipo de rede de pilha dupla
+ Se estiver usando sub-redes de pilha dupla, mas nenhum tipo de rede for especificado, o conector usará como padrão -only para compatibilidade com versões anteriores IPv4
+ Para conectores existentes, você não pode atualizar o tipo de rede - você deve excluir e recriar o conector para alterar os tipos de rede
+ O uso da rede de pilha dupla não incorre em custos adicionais

# Criar um conector do
<a name="mkc-create-connector-intro"></a>

Este procedimento descreve como criar um conector usando o Console de gerenciamento da AWS.

**Criando um conector usando o Console de gerenciamento da AWS**

1. Abra o console do Amazon MSK em [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. No painel esquerdo, em **MSK Connect**, escolha **Conectores**.

1. Escolha **Criar conector**.

1. Você pode escolher entre usar um plug-in personalizado existente para criar o conector ou criar primeiro um novo plug-in personalizado. Para obter informações sobre plug-ins personalizados e como criá-los, consulte [Criar plug-ins personalizados](msk-connect-plugins.md). Neste procedimento, vamos supor que você tenha um plug-in personalizado que deseja usar. Na lista de plug-ins personalizados, encontre o que você deseja usar, marque a caixa à esquerda e escolha **Próximo**.

1. Insira um nome e, se desejar, uma descrição.

1. Escolha o cluster ao qual deseja se conectar.

1. Na seção **Configurações de rede do conector**, escolha uma das seguintes opções para o tipo de rede:
   + **IPv4**(padrão) - Para conectividade IPv4 somente com destinos
   + **Dual-stack** - Para conectividade com destinos em ambos IPv4 e IPv6 (disponível somente se suas sub-redes tiverem blocos IPv6 CIDR associados a IPv4 elas)

1. Especifique a configuração do conector. Os parâmetros de configuração que você precisa especificar dependerão do tipo de conector que você deseja criar. No entanto, alguns parâmetros são comuns a todos os conectores, por exemplo, os parâmetros `connector.class` e `tasks.max`. Veja a seguir um exemplo de configuração para o [Conector de coletor Confluent para Amazon S3](https://www.confluent.io/hub/confluentinc/kafka-connect-s3).

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   tasks.max=2
   topics=my-example-topic
   s3.region=us-east-1
   s3.bucket.name=amzn-s3-demo-bucket
   flush.size=1
   storage.class=io.confluent.connect.s3.storage.S3Storage
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   key.converter=org.apache.kafka.connect.storage.StringConverter
   value.converter=org.apache.kafka.connect.storage.StringConverter
   schema.compatibility=NONE
   ```

1. Em seguida, configure a capacidade do conector. Você pode escolher entre dois modos de capacidade: provisionado e escalonado automaticamente. Para obter informações sobre essas duas opções, consulte [Saiba mais sobre a capacidade de conectores](msk-connect-capacity.md).

1. (Opcional) Na seção **Contagem máxima de tarefas de escalonamento automático, use o campo Contagem** máxima de tarefas de escalonamento automático para inserir o número máximo de tarefas que você deseja alocar ao conector durante as operações de escalonamento automático. O valor deve ser pelo menos igual à sua contagem máxima de trabalhadores. Se você não especificar um valor, o conector usará o cálculo padrão sem limite. Para obter mais informações, consulte [Entenda a contagem máxima de tarefas de escalonamento automático](msk-connect-max-autoscaling-task-count.md).

1. Escolha a configuração padrão do operador ou uma configuração personalizada do operador. Para obter informações sobre como criar configurações personalizadas de operador, consulte [Saiba mais sobre os operadores do MSK Connect](msk-connect-workers.md).

1. Em seguida, você especifica o perfil de execução do serviço. Essa deve ser uma função do IAM que o MSK Connect possa assumir e que conceda ao conector todas as permissões necessárias para acessar os AWS recursos necessários. Essas permissões dependem da lógica do conector. Para obter informações sobre como criar essa função, consulte [Saiba mais sobre o perfil de execução do serviço](msk-connect-service-execution-role.md).

1. Escolha **Próximo**, revise as informações de segurança e escolha **Próximo** novamente.

1. Especifique as opções de registro em log que deseja e escolha **Próximo**. Para obter informações sobre registro em log, consulte [Registro em log no MSK Connect](msk-connect-logging.md).

1. Na página **Revisar e criar**, revise a configuração do conector e escolha **Criar conector**.

Para usar a API MSK Connect para criar um conector, consulte [CreateConnector](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateConnector.html). 

Você pode usar a API `UpdateConnector` para modificar a configuração do conector. Para obter mais informações, consulte [Atualizar um conector](mkc-update-connector.md).

# Atualizar um conector
<a name="mkc-update-connector"></a>

Este procedimento descreve como atualizar a configuração de um conector MSK Connect existente usando o Console de gerenciamento da AWS.

**Atualizando a configuração do conector usando o Console de gerenciamento da AWS**

1. Abra o console do Amazon MSK em [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. No painel esquerdo, em **MSK Connect**, escolha **Conectores**.

1. Selecione um conector existente.

1. Escolha **Editar configuração do conector**.

1. Atualize a configuração do conector. Você não pode substituir o `connector.class` uso UpdateConnector de. Veja a seguir um exemplo de configuração para o Conector de coletor Confluent para Amazon S3. 

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   tasks.max=2
   topics=my-example-topic
   s3.region=us-east-1
   s3.bucket.name=amzn-s3-demo-bucket
   flush.size=1
   storage.class=io.confluent.connect.s3.storage.S3Storage
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   key.converter=org.apache.kafka.connect.storage.StringConverter
   value.converter=org.apache.kafka.connect.storage.StringConverter
   schema.compatibility=NONE
   ```

1. Selecione **Enviar**.

1. Assim você pode monitorar o estado atual da operação na guia **Operações** do conector. 

Para usar a API MSK Connect para atualizar a configuração de um conector, consulte [UpdateConnector](https://docs.aws.amazon.com/MSKC/latest/mskc/API_UpdateConnector.html).

# Conexão de conectores
<a name="msk-connect-from-connectors"></a>

As práticas recomendadas a seguir podem melhorar o desempenho da sua conectividade com o Amazon MSK Connect.

## Não se sobreponha IPs ao Amazon VPC peering ou ao Transit Gateway
<a name="CIDR-ip-ranges"></a>

Se você estiver usando o Amazon VPC peering ou o Transit Gateway com o Amazon MSK Connect, não configure seu conector para alcançar os recursos de VPC emparelhados dentro dos intervalos CIDR: IPs 
+ “10.99.0.0/16”
+ “192.168.0.0/16”
+ “172.21.0.0/16”

# Criar plug-ins personalizados
<a name="msk-connect-plugins"></a>

Um plug-in é um AWS recurso que contém o código que define a lógica do conector. Você carrega um arquivo JAR (ou um arquivo ZIP contendo um ou mais arquivos JAR) em um bucket do S3 e especifica a localização do bucket ao criar o plug-in. Quando o plug-in é criado, o MSK Connect copia o conteúdo do objeto S3 naquele momento. Ele não mantém um link para o objeto S3, portanto, quaisquer modificações subsequentes no objeto não afetarão o plug-in ou seus conectores. Ao criar um conector, você especifica o plug-in que deseja que o MSK Connect use para ele. A relação dos plug-ins com os conectores é one-to-many: você pode criar um ou mais conectores do mesmo plug-in.

**nota**  
Os plug-ins personalizados não podem ser atualizados no local. Para usar uma nova versão do código do plug-in, exclua todos os conectores que fazem referência ao plug-in, exclua o plug-in e recrie-o.

**Pacote de dependências para plug-ins personalizados**  
Recomendamos que você inclua todos os arquivos e dependências JAR necessários para seu plug-in. Package seu conector como um dos seguintes:  
Um arquivo ZIP que contém todos os arquivos JAR e dependências necessários para o plug-in.
Um único uber JAR que contém todos os arquivos de classe do plug-in e suas dependências.
Não agrupar suas dependências de plug-ins pode afetar a disponibilidade ou a compatibilidade no ambiente de execução e causar erros inesperados.

Para obter informações sobre como desenvolver o código para um conector, consulte o [Guia de desenvolvimento de conectores](https://kafka.apache.org/documentation/#connect_development) na documentação do Apache Kafka.

**Criando um plug-in personalizado usando o Console de gerenciamento da AWS**

1. Abra o console do Amazon MSK em [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. No painel esquerdo, em **MSK Connect** e escolha **Plug-ins personalizados**.

1. Escolha **Criar plug-in personalizado**.

1. Selecione **Navegar no S3**.

1. Na lista de buckets do S3, escolha o bucket que contém o arquivo JAR ou ZIP do plug-in.

1. Na lista de objetos, marque a caixa à esquerda do arquivo JAR ou ZIP do plug-in e clique em **Escolher**.

1. Escolha **Criar plug-in personalizado**.

Para usar a API MSK Connect para criar um plug-in personalizado, consulte [CreateCustomPlugin](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateCustomPlugin.html).

# Saiba mais sobre os operadores do MSK Connect
<a name="msk-connect-workers"></a>

Um operador é um processo de máquina virtual Java (JVM) que executa a lógica do conector. Cada operador cria um conjunto de tarefas que são executadas em threads paralelos e fazem o trabalho de copiar os dados. As tarefas não armazenam o estado e, portanto, podem ser iniciadas, interrompidas ou reiniciadas a qualquer momento para fornecer um pipeline de dados resiliente e escalável. Alterações no número de operadores, seja devido a um evento de escalonamento ou devido a falhas inesperadas, são detectadas automaticamente pelos demais operadores. Eles se organizam para reequilibrar as tarefas no conjunto de operadores restantes. Os operadores do Connect usam os grupos de consumidores do Apache Kafka para coordenar e reequilibrar.

Se os requisitos de capacidade do seu conector forem variáveis ou difíceis de estimar, você pode deixar o MSK Connect escalar o número de operadores conforme necessário entre um limite inferior e um limite superior que você determina. Como alternativa, você pode especificar o número exato de operadores que deseja executar em sua lógica de conector. Para obter mais informações, consulte [Saiba mais sobre a capacidade de conectores](msk-connect-capacity.md).

**Os operadores do MSK Connect consomem endereços IP**  
Os operadores do MSK Connect consomem endereços IP nas sub-redes fornecidas pelo cliente. Cada operador usa um endereço IP de uma das sub-redes fornecidas pelo cliente. Você deve garantir que tenha endereços IP disponíveis suficientes nas sub-redes fornecidas a uma CreateConnector solicitação para considerar a capacidade especificada, especialmente ao escalar automaticamente conectores em que o número de trabalhadores pode flutuar.

## Configuração padrão de operador
<a name="msk-connect-default-worker-config"></a>

O MSK Connect fornece a seguinte configuração padrão de operador:

```
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```

# Propriedades de configuração de operador compatíveis
<a name="msk-connect-supported-worker-config-properties"></a>

O MSK Connect fornece uma configuração padrão de operador. Também há a opção de criar uma configuração personalizada de operador para usar com os conectores. A lista a seguir inclui informações sobre as propriedades de configuração do operador compatíveis ou não com o Amazon MSK Connect.
+ As propriedades `key.converter` e `value.converter` são obrigatórias.
+ O MSK Connect é compatível com as seguintes propriedades de configuração de `producer.`.

  ```
  producer.acks
  producer.batch.size
  producer.buffer.memory
  producer.compression.type
  producer.enable.idempotence
  producer.key.serializer
  producer.linger.ms
  producer.max.request.size
  producer.metadata.max.age.ms
  producer.metadata.max.idle.ms
  producer.partitioner.class
  producer.reconnect.backoff.max.ms
  producer.reconnect.backoff.ms
  producer.request.timeout.ms
  producer.retry.backoff.ms
  producer.value.serializer
  ```
+ O MSK Connect é compatível com as seguintes propriedades de configuração de `consumer.`.

  ```
  consumer.allow.auto.create.topics
  consumer.auto.offset.reset
  consumer.check.crcs
  consumer.fetch.max.bytes
  consumer.fetch.max.wait.ms
  consumer.fetch.min.bytes
  consumer.heartbeat.interval.ms
  consumer.key.deserializer
  consumer.max.partition.fetch.bytes
  consumer.max.poll.interval.ms
  consumer.max.poll.records
  consumer.metadata.max.age.ms
  consumer.partition.assignment.strategy
  consumer.reconnect.backoff.max.ms
  consumer.reconnect.backoff.ms
  consumer.request.timeout.ms
  consumer.retry.backoff.ms
  consumer.session.timeout.ms
  consumer.value.deserializer
  ```
+ Todas as outras propriedades de configuração que não comecem com os prefixos `producer.` ou `consumer.` são compatíveis, *exceto* as propriedades a seguir. 

  ```
  access.control.
  admin.
  admin.listeners.https.
  client.
  connect.
  inter.worker.
  internal.
  listeners.https.
  metrics.
  metrics.context.
  rest.
  sasl.
  security.
  socket.
  ssl.
  topic.tracking.
  worker.
  bootstrap.servers
  config.storage.topic
  connections.max.idle.ms
  connector.client.config.override.policy
  group.id
  listeners
  metric.reporters
  plugin.path
  receive.buffer.bytes
  response.http.headers.config
  scheduled.rebalance.max.delay.ms
  send.buffer.bytes
  status.storage.topic
  ```

Para obter mais informações sobre as propriedades de configuração do operador e o que elas representam, consulte [Configurações do Kafka para o Connect](https://kafka.apache.org/documentation/#connectconfigs) na documentação do Apache Kafka.

# Criar uma configuração personalizada de operador
<a name="msk-connect-create-custom-worker-config"></a>

Este procedimento descreve como criar uma configuração personalizada de operador usando o Console de gerenciamento da AWS.

**Criando uma configuração de trabalhador personalizada usando o Console de gerenciamento da AWS**

1. Abra o console do Amazon MSK em [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. No painel esquerdo, em **MSK Connect**, escolha **Configurações do operador**.

1. Escolha **Criar configuração de operador**.

1. Insira um nome e uma descrição opcional e, em seguida, adicione as propriedades e os valores para os quais você deseja defini-los.

1. Escolha **Criar configuração de operador**.

Para usar a API MSK Connect para criar uma configuração de trabalho, consulte [CreateWorkerConfiguration](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateWorkerConfiguration.html).

# Gerenciar deslocamentos do conector de origem usando `offset.storage.topic`
<a name="msk-connect-manage-connector-offsets"></a>

Esta seção fornece informações para ajudar você a gerenciar os deslocamentos do conector de origem usando o *tópico de deslocamento de armazenamento*. O tópico de deslocamento de armazenamento é um tópico interno que o Kafka Connect usa para armazenar deslocamentos de configuração de conectores e tarefas.

## Considerações
<a name="msk-connect-manage-connector-offsets-considerations"></a>

Considere o seguinte ao gerenciar os deslocamentos do conector de origem.
+ Para especificar um tópico de deslocamento de armazenamento, forneça o nome do tópico do Kafka no qual os deslocamentos do conector são armazenados como o valor `offset.storage.topic` em sua configuração de operador.
+ Tenha cuidado ao fazer alterações na configuração de um conector. A alteração dos valores da configuração pode resultar em um comportamento não intencional do conector se um conector de origem usar valores da configuração para os principais registros de deslocamento. Recomendamos que você consulte a documentação do seu plug-in para obter orientação.
+ **Personalize o número padrão de partições**: além de personalizar a configuração do operador adicionando `offset.storage.topic`, você pode personalizar o número de partições para os tópicos de deslocamento e armazenamento de status. As partições padrão para tópicos internos são as seguintes.
  + `config.storage.topic`: 1, não configurável, deve ser tópico de partição única
  + `offset.storage.topic`: 25, configurável fornecendo `offset.storage.partitions`
  + `status.storage.topic`: 5, configurável fornecendo `status.storage.partitions`
+ **Exclusão manual de tópicos**: o Amazon MSK Connect cria novos tópicos internos do Kafka Connect (o nome do tópico começa com `__amazon_msk_connect`) em cada implantação de conectores. Tópicos antigos anexados a conectores excluídos não são removidos automaticamente porque tópicos internos, como `offset.storage.topic`, podem ser reutilizados entre conectores. No entanto, você pode excluir manualmente tópicos internos não utilizados criados pelo MSK Connect. Os tópicos internos são nomeados segundo o formato `__amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id`.

  É possível usar a expressão regular `__amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id` para excluir os tópicos internos. Você não deve excluir um tópico interno que esteja sendo usado atualmente por um conector em execução.
+ **Usar o mesmo nome para os tópicos internos criados pelo MSK Connect**: se quiser reutilizar o tópico de deslocamento de armazenamento para consumir deslocamentos de um conector criado anteriormente, você deverá dar ao novo conector o mesmo nome do conector antigo. A propriedade `offset.storage.topic` pode ser definida usando a configuração do operador para atribuir o mesmo nome ao `offset.storage.topic` e reutilizada entre conectores diferentes. Essa configuração é descrita em [Gerenciamento de deslocamentos de conectores](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-create-custom-worker-config). O MSK Connect não permite que conectores diferentes compartilhem `config.storage.topic` e `status.storage.topic`. Esses tópicos são criados sempre que você cria um novo conector no MSKC. Eles são nomeados automaticamente de acordo com o formato `__amazon_msk_connect_<status|configs>_connector_name_connector_id` e, portanto, são diferentes nos diferentes conectores que você cria.

# Usar o tópico padrão de deslocamento de armazenamento
<a name="msk-connect-default-offset-storage-topic"></a>

Por padrão, o Amazon MSK Connect gera um novo tópico de deslocamento de armazenamento em seu cluster do Kafka para cada conector que você cria. O MSK estrutura o nome do tópico padrão usando partes do ARN do conector. Por exemplo, .`__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2` 

# Usar um tópico personalizado de deslocamento de armazenamento
<a name="msk-connect-set-offset-storage-topic"></a>

Para fornecer continuidade de deslocamento entre conectores de origem, você pode usar um tópico de deslocamento de armazenamento de sua escolha em vez do tópico padrão. Especificar um tópico de deslocamento de armazenamento ajuda você a realizar tarefas como criar um conector de origem que retoma a leitura desde o último deslocamento de um conector anterior.

Para especificar um tópico de deslocamento de armazenamento, você fornece um valor para a propriedade `offset.storage.topic` em sua configuração de operador antes de criar um conector. Se quiser reutilizar o tópico de deslocamento de armazenamento para consumir deslocamentos de um conector criado anteriormente, você deverá dar ao novo conector o mesmo nome do conector antigo. Se você criar um tópico personalizado de deslocamento de armazenamento, deverá definir [https://kafka.apache.org/27/documentation.html#topicconfigs_cleanup.policy](https://kafka.apache.org/27/documentation.html#topicconfigs_cleanup.policy) como `compact` na configuração do tópico.

**nota**  
Se você especificar um tópico de deslocamento de armazenamento ao criar um conector de *coletor*, o MSK Connect criará o tópico se ele ainda não existir. No entanto, o tópico não será usado para armazenar deslocamentos de conectores.   
Em vez disso, os deslocamentos do conector do coletor serão gerenciados usando o protocolo de grupo de consumidores Kafka. Cada conector de coletor cria um grupo chamado `connect-{CONNECTOR_NAME}`. Enquanto o grupo de consumidores existir, todos os conectores de coletor sucessivos que você criar com o mesmo valor `CONNECTOR_NAME` continuarão a partir do último deslocamento confirmado.

**Importante**  
Se você quiser atualizar uma configuração de conector existente enquanto mantém a continuidade do offset, use a UpdateConnector API. Para obter mais informações, consulte [Atualizar um conector](mkc-update-connector.md).

**Example : especificação de um tópico de armazenamento offset ao recriar um conector de origem**  
Se precisar excluir e recriar um conector mantendo a continuidade do offset, você pode especificar um tópico de armazenamento offset em sua configuração de trabalho. Por exemplo, suponha que você tenha um conector de captura de dados de alteração (CDC) e queira recriá-lo sem perder seu lugar no fluxo do CDC. As etapas a seguir demonstram como concluir essa tarefa.  

1. Em sua máquina cliente, execute o comando a seguir para encontrar o nome do tópico de deslocamento de armazenamento do seu conector. Substitua `<bootstrapBrokerString>` pela string do agente de bootstrap do seu cluster. Para obter instruções sobre como obter sua string de agente de bootstrap, consulte [Obter os agentes de bootstrap para um cluster do Amazon MSK](msk-get-bootstrap-brokers.md).

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server <bootstrapBrokerString>
   ```

   A saída a seguir mostra uma lista de todos os tópicos do cluster, incluindo qualquer tópico de conector interno padrão. Neste exemplo, o conector CDC existente usa o [tópico padrão de deslocamento de armazenamento](msk-connect-default-offset-storage-topic.md) criado pelo MSK Connect. É por isso que o tópico de deslocamento de armazenamento é chamado de `__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2`.

   ```
   __consumer_offsets
   __amazon_msk_canary
   __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   my-msk-topic-1
   my-msk-topic-2
   ```

1. Abra o console do Amazon MSK em [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk).

1. Escolha seu conector na lista **Conectores**. Copie e salve o conteúdo do campo **Configuração do conector** para que você possa modificá-lo e usá-lo na criação do novo conector.

1. Selecione **Excluir** para excluir o conector. Em seguida, insira o nome do conector no campo de entrada de texto para confirmar a exclusão.

1. Crie uma configuração personalizada de operador com valores adequados ao seu cenário. Para instruções, consulte [Criar uma configuração personalizada de operador](msk-connect-create-custom-worker-config.md).

   Em sua configuração de operador, você deve especificar o nome do tópico de deslocamento de armazenamento que você recuperou anteriormente como o valor de `offset.storage.topic`, assim como na configuração a seguir. 

   ```
   config.providers.secretManager.param.aws.region=eu-west-3
   key.converter=<org.apache.kafka.connect.storage.StringConverter>
   value.converter=<org.apache.kafka.connect.storage.StringConverter>
   config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
   config.providers=secretManager
   offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   ```

1. 
**Importante**  
Você deve dar ao seu novo conector o mesmo nome do conector antigo.

   Crie um novo conector usando a configuração de operador que você definiu na etapa anterior. Para instruções, consulte [Criar um conector do](mkc-create-connector-intro.md).

# Tutorial: Externalizar informações confidenciais usando provedores de configuração
<a name="msk-connect-config-provider"></a>

Este exemplo mostra como externalizar informações confidenciais para o Amazon MSK Connect usando um provedor de configuração de código aberto. Um provedor de configuração permite que você especifique variáveis, em vez de texto simples, em uma configuração de conector ou de operador, e os operadores em execução em seu conector resolvem essas variáveis em runtime. Isso evita que credenciais e outros segredos sejam armazenados em texto simples. O provedor de configuração no exemplo suporta a recuperação de parâmetros de configuração do AWS Secrets Manager, Amazon S3 e Systems Manager (SSM). Na [Etapa 2](#msk-connect-config-providers), você verá como configurar o armazenamento e a recuperação de informações confidenciais para o serviço que deseja configurar.

## Considerações
<a name="msk-connect-config-providers-considerations"></a>

Considere o seguinte ao usar o provedor de configuração do MSK com o Amazon MSK Connect:
+ Atribua as permissões adequadas ao usar os provedores de configuração para o perfil de execução de serviços do IAM.
+ Defina os provedores de configuração nas configurações de trabalho e sua implementação na configuração do conector.
+ Valores confidenciais de configuração podem aparecer nos registros do conector se um plug-in não definir esses valores como secretos. O Kafka Connect trata valores de configuração indefinidos da mesma forma que qualquer outro valor de texto simples. Para saber mais, consulte [Como evitar que segredos apareçam nos logs do conector](msk-connect-logging.md#msk-connect-logging-secrets).
+ Por padrão, o MSK Connect reinicia frequentemente um conector quando o conector usa um provedor de configuração. Para desativar esse comportamento de reinicialização, você pode definir o valor `config.action.reload` como `none` na configuração do conector.

## Criar um plug-in personalizado e fazer o upload para o S3
<a name="msk-connect-config-providers-create-custom-plugin"></a>

 Para criar um plug-in personalizado, crie um arquivo zip que contenha o conector e o msk-config-provider executando os seguintes comandos em sua máquina local.

**Para criar um plug-in personalizado usando uma janela de terminal e o Debezium como conector**

Use a AWS CLI para executar comandos como superusuário com credenciais que permitem acessar seu bucket do S3. AWS *Para obter informações sobre como instalar e configurar a AWS CLI, consulte [Introdução à AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html) no Guia do usuário.AWS Command Line Interface * *Para obter informações sobre o uso da AWS CLI com o Amazon S3, consulte Usando o [Amazon S3 com a AWS CLI no Guia do usuário](https://docs.aws.amazon.com/cli/latest/userguide/cli-services-s3.html).AWS Command Line Interface *

1. Em uma janela de terminal, crie uma pasta nomeada `custom-plugin` no seu espaço de trabalho usando o comando a seguir.

   ```
   mkdir custom-plugin && cd custom-plugin
   ```

1. Baixe a versão estável mais recente do **plug-in MySQL Connector** no site do [Debezium](https://debezium.io/releases/) usando o comando a seguir.

   ```
   wget https://repo1.maven.org/maven2/io/debezium/debezium-connectormysql/
   2.2.0.Final/debezium-connector-mysql-2.2.0.Final-plugin.tar.gz
   ```

   Extraia o arquivo gzip baixado na pasta `custom-plugin` usando o comando a seguir.

   ```
   tar xzf debezium-connector-mysql-2.2.0.Final-plugin.tar.gz
   ```

1. Baixe o [arquivo zip do provedor de configuração do MSK](https://github.com/aws-samples/msk-config-providers/releases/download/r0.4.0/msk-config-providers-0.4.0-with-dependencies.zip) usando o comando a seguir.

   ```
   wget https://github.com/aws-samples/msk-config-providers/releases/download/r0.4.0/msk-config-providers-0.4.0-with-dependencies.zip
   ```

   Extraia o arquivo zip baixado na `custom-plugin` pasta usando o comando a seguir.

   ```
   unzip msk-config-providers-0.4.0-with-dependencies.zip
   ```

1. Compacte o conteúdo do provedor de configuração do MSK da etapa acima e do conector personalizado em um só arquivo chamado `custom-plugin.zip`.

   ```
   zip -r ../custom-plugin.zip * 
   ```

1. Faça upload do arquivo para o S3 para referência posterior.

   ```
   aws s3 cp ../custom-plugin.zip s3:<S3_URI_BUCKET_LOCATION>
   ```

1. No console do Amazon MSK, na seção **MSK Connect**, escolha **Custom Plugin**, depois escolha **Create custom plugin** e navegue pelo bucket **s3: < *S3\$1URI\$1BUCKET\$1LOCATION* > S3** para selecionar o arquivo ZIP do plug-in personalizado que você acabou de enviar.  
![\[Amazon S3 bucket interface showing a single custom-plugin.zip file in the debezium folder.\]](http://docs.aws.amazon.com/pt_br/msk/latest/developerguide/images/s3-object-browser.png)

1. Insira **debezium-custom-plugin** para o nome do plug-in. Opcionalmente, insira uma descrição e escolha **Criar um plug-in personalizado**.  
![\[Amazon S3 bucket interface showing a single custom-plugin.zip file in the debezium folder.\]](http://docs.aws.amazon.com/pt_br/msk/latest/developerguide/images/create-custom-plugin.png)

## Configurar parâmetros e permissões para diferentes provedores
<a name="msk-connect-config-providers"></a>

Você pode configurar valores de parâmetros nestes três serviços:
+ Secrets Manager 
+ Systems Manager Parameter Store
+ S3: Simple Storage Service

Escolha uma das guias abaixo para obter instruções sobre como configurar parâmetros e permissões relevantes para esse serviço.

------
#### [ Configure in Secrets Manager ]

**Para configurar valores de parâmetros no Secrets Manager**

1. Abra o [console do Secrets Manager](https://console.aws.amazon.com/secretsmanager/).

1. Crie um novo segredo para armazenar suas credenciais ou segredos. Para obter instruções, consulte [Criar um AWS Secrets Manager segredo](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_secret.html) no *Guia AWS Secrets Manager do usuário*.

1. Copie o ARN do seu segredo.

1. Adicione as permissões do Secrets Manager do exemplo de política a seguir ao seu [perfil de execução de serviço](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html). Substitua o ARN do exemplo, `arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234`, pelo ARN do seu segredo.

1. Adicione a configuração do operador e as instruções do conector.  
****  

   ```
   {
           "Version":"2012-10-17",		 	 	 
           "Statement": [
               {
                   "Effect": "Allow",
                   "Action": [
                       "secretsmanager:GetResourcePolicy",
                       "secretsmanager:GetSecretValue",
                       "secretsmanager:DescribeSecret",
                       "secretsmanager:ListSecretVersionIds"
                   ],
                   "Resource": [
                   "arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234"
                   ]
               }
           ]
       }
   ```

1. Para usar o provedor de configuração do Secrets Manager, copie as seguintes linhas de código na caixa de texto de configuração do operador na Etapa 3:

   ```
   # define name of config provider:
   
   config.providers = secretsmanager
   
   # provide implementation classes for secrets manager:
   
   config.providers.secretsmanager.class = com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   config.providers.secretsmanager.param.region = us-east-1
   ```

1. Para o provedor de configuração do Secrets Manager, copie as seguintes linhas de código na configuração do conector na Etapa 4.

   ```
   #Example implementation for secrets manager variable
   database.user=${secretsmanager:MSKAuroraDBCredentials:username}
   database.password=${secretsmanager:MSKAuroraDBCredentials:password}
   ```

Você também pode usar a etapa acima com mais provedores de configuração.

------
#### [ Configure in Systems Manager Parameter Store ]

**Para configurar valores de parâmetros no Systems Manager Parameter Store**

1. Abra o [console do Systems Manager](https://console.aws.amazon.com/systems-manager/).

1. No painel de navegação, selecione **Repositório de parâmetros**.

1. Crie um novo parâmetro para armazenar no Systems Manager. Para obter instruções, consulte [Criar um parâmetro do Systems Manager (console)](https://docs.aws.amazon.com/systems-manager/latest/userguide/parameter-create-console.html) no Guia AWS Systems Manager do usuário.

1. Copie o ARN do seu parâmetro.

1. Adicione as permissões do Systems Manager do exemplo de política a seguir ao seu [perfil de execução de serviço](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html). *<arn:aws:ssm:us-east-1:123456789000:parameter/MyParameterName>*Substitua pelo ARN do seu parâmetro.  
****  

   ```
   {
           "Version":"2012-10-17",		 	 	 
           "Statement": [
               {
                   "Sid": "VisualEditor0",
                   "Effect": "Allow",
                   "Action": [
                       "ssm:GetParameterHistory",
                       "ssm:GetParametersByPath",
                       "ssm:GetParameters",
                       "ssm:GetParameter"
                   ],
                   "Resource": "arn:aws:ssm:us-east-1:123456789000:parameter/MyParameterName"
               }
           ]
       }
   ```

1. Para usar o provedor de configuração do Parameter Store, copie as seguintes linhas de código na caixa de texto de configuração do operador na Etapa 3:

   ```
   # define name of config provider:
   
   config.providers = ssm
   
   # provide implementation classes for parameter store:
   
   config.providers.ssm.class = com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   config.providers.ssm.param.region = us-east-1
   ```

1. Para o provedor de configuração do Parameter Store, copie as seguintes linhas de código na configuração do conector na Etapa 5.

   ```
   #Example implementation for parameter store variable
   schema.history.internal.kafka.bootstrap.servers=${ssm::MSKBootstrapServerAddress}
   ```

   Você também pode agrupar as duas etapas acima com mais provedores de configuração.

------
#### [ Configure in Amazon S3 ]

**Para configurar objects/files no Amazon S3**

1. Abra o [console Amazon S3](https://console.aws.amazon.com/s3/).

1. Carregue um objeto para um bucket no S3. Para obter instruções, consulte [Carregar objetos](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html).

1. Copie o ARN do seu objeto.

1. Adicione as permissões de leitura de objeto do Amazon S3 do exemplo de política a seguir ao seu [perfil de execução de serviço](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html). Substitua o ARN do exemplo, `arn:aws:s3:::<MY_S3_BUCKET/path/to/custom-plugin.zip>`, pelo ARN do seu objeto.  
****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
               {
                   "Sid": "VisualEditor0",
                   "Effect": "Allow",
                   "Action": "s3:GetObject",
                   "Resource": "arn:aws:s3:::<MY_S3_BUCKET/path/to/custom-plugin.zip>"
               }
           ]
       }
   ```

1. Para usar o provedor de configuração do Amazon S3, copie as seguintes linhas de código na caixa de texto de configuração do operador na Etapa 3:

   ```
   # define name of config provider:
   
   config.providers = s3import
   # provide implementation classes for S3:
   
   config.providers.s3import.class = com.amazonaws.kafka.config.providers.S3ImportConfigProvider
   ```

1. Para o provedor de configuração do Amazon S3, copie as seguintes linhas de código na configuração do conector na Etapa 4.

   ```
   #Example implementation for S3 object
   
   database.ssl.truststore.location = ${s3import:us-west-2:my_cert_bucket/path/to/trustore_unique_filename.jks}
   ```

   Você também pode agrupar as duas etapas acima com mais provedores de configuração.

------

## Criar uma configuração personalizada de operador com informações sobre seu provedor de configuração
<a name="msk-connect-config-providers-create-custom-config"></a>

1. Selecione as **Configurações do operador** na seção **Amazon MSK Connect**.

1. Selecione **Criar configuração de operador**.

1. Digite `SourceDebeziumCustomConfig` na caixa de texto Nome da configuração do operador. A descrição é opcional.

1. Copie o código de configuração relevante com base nos provedores desejados e cole-o na caixa de texto de **Configuração do operador**.

1. Este é um exemplo da configuração de operador para todos os três provedores:

   ```
   key.converter=org.apache.kafka.connect.storage.StringConverter
   key.converter.schemas.enable=false
   value.converter=org.apache.kafka.connect.json.JsonConverter
   value.converter.schemas.enable=false
   offset.storage.topic=offsets_my_debezium_source_connector
   
   # define names of config providers:
   
   config.providers=secretsmanager,ssm,s3import
   
   # provide implementation classes for each provider:
   
   config.providers.secretsmanager.class    = com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
   config.providers.ssm.class               = com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
   config.providers.s3import.class          = com.amazonaws.kafka.config.providers.S3ImportConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   
   config.providers.secretsmanager.param.region = us-east-1
   config.providers.ssm.param.region = us-east-1
   ```

1. Clique em Criar configuração de operador.

## Criar o conector
<a name="msk-connect-config-providers-create-connector"></a>

1. Crie um novo conector usando as instruções em [Criar um novo conector](https://docs.aws.amazon.com/msk/latest/developerguide/mkc-create-connector.html).

1. Escolha o arquivo `custom-plugin.zip` que você enviou para o bucket do S3 em [Criar um plug-in personalizado e fazer o upload para o S3](#msk-connect-config-providers-create-custom-plugin) como origem do plug-in personalizado.

1. Copie o código de configuração relevante com base nos provedores desejados e cole-o no campo Configuração do conector.

1. Este é um exemplo da configuração do conector para todos os três provedores:

   ```
   #Example implementation for parameter store variable
   schema.history.internal.kafka.bootstrap.servers=${ssm::MSKBootstrapServerAddress}
   
   #Example implementation for secrets manager variable
   database.user=${secretsmanager:MSKAuroraDBCredentials:username}
   database.password=${secretsmanager:MSKAuroraDBCredentials:password}
   
   #Example implementation for Amazon S3 file/object
   database.ssl.truststore.location = ${s3import:us-west-2:my_cert_bucket/path/to/trustore_unique_filename.jks}
   ```

1. Selecione **Usar uma configuração personalizada** e escolha **SourceDebeziumCustomConfig**no menu suspenso **Configuração do trabalhador**.

1. Siga as etapas restantes das instruções em [Criar conector](https://docs.aws.amazon.com/msk/latest/developerguide/mkc-create-connector.html).

# Perfis e políticas do IAM para o MSK Connect
<a name="msk-connect-iam"></a>

Esta seção ajuda você a configurar as políticas e funções apropriadas do IAM para implantar e gerenciar com segurança o Amazon MSK Connect em AWS seu ambiente. As seções a seguir explicam o perfil de execução do serviço que deve ser usado com o MSK Connect, incluindo a política de confiança necessária e as permissões adicionais necessárias ao se conectar a um cluster do MSK autenticado pelo IAM. A página também fornece exemplos de políticas abrangentes do IAM para conceder acesso total à funcionalidade do MSK Connect, bem como detalhes sobre as políticas AWS gerenciadas disponíveis para o serviço. 

**Topics**
+ [Saiba mais sobre o perfil de execução do serviço](msk-connect-service-execution-role.md)
+ [Exemplo de política do IAM para o MSK Connect](mkc-iam-policy-examples.md)
+ [Prevenção do problema confused deputy entre serviços](cross-service-confused-deputy-prevention.md)
+ [AWS políticas gerenciadas para o MSK Connect](mkc-security-iam-awsmanpol.md)
+ [Usar perfis vinculados ao serviço para o MSK Connect](mkc-using-service-linked-roles.md)

# Saiba mais sobre o perfil de execução do serviço
<a name="msk-connect-service-execution-role"></a>

**nota**  
O Amazon MSK Connect não é compatível com o uso do [perfil vinculado a serviço](mkc-using-service-linked-roles.md) como o perfil de execução do serviço. É necessário criar um perfil de execução do serviço distinto. Para obter instruções sobre como criar uma função personalizada do IAM, consulte Como [criar uma função para delegar permissões a um AWS serviço](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html) no *Guia do usuário do IAM*.

Ao criar um conector com o MSK Connect, você precisa especificar uma função AWS Identity and Access Management (IAM) para usar com ele. Seu perfil de execução do serviço deve ter a seguinte política de confiança para que o MSK Connect possa assumi-lo. Para obter informações sobre as chaves de contexto de condição, consulte [Prevenção do problema confused deputy entre serviços](cross-service-confused-deputy-prevention.md).

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "123456789012"
        },
        "ArnLike": {
          "aws:SourceArn": "arn:aws:kafkaconnect:us-east-1:123456789012:connector/myConnector/abc12345-abcd-4444-a8b9-123456f513ed-2"
        }
      }
    }   
  ]
}
```

------

Se o cluster Amazon MSK que você deseja usar com seu conector for um cluster que usa autenticação do IAM, será necessário adicionar a seguinte política de permissões ao perfil de execução do serviço do conector. Para obter informações sobre como encontrar o UUID do seu cluster e como criar um tópico ARNs, consulte. [Recursos da política de autorização](kafka-actions.md#msk-iam-resources)

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:000000000001:cluster/testClusterName/300d0000-0000-0005-000f-00000000000b-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/myCluster/300a0000-0000-0003-000a-00000000000b-6/__amazon_msk_connect_read"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:WriteData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/testCluster/300f0000-0000-0008-000d-00000000000m-7/__amazon_msk_connect_write"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:CreateTopic",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/testCluster/300f0000-0000-0008-000d-00000000000m-7/__amazon_msk_connect_*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:group/testCluster/300d0000-0000-0005-000f-00000000000b-1/__amazon_msk_connect_*",
                "arn:aws:kafka:us-east-1:123456789012:group/testCluster/300d0000-0000-0005-000f-00000000000b-1/connect-*"
            ]
        }
    ]
}
```

------

Dependendo do tipo de conector, talvez você também precise anexar à função de execução do serviço uma política de permissões que permita que ela acesse AWS recursos. Por exemplo, se seu conector precisar enviar dados para um bucket do S3, o perfil de execução do serviço deverá ter uma política de permissões que conceda permissão para gravar nesse bucket. Para fins de teste, você pode usar uma das políticas predefinidas do IAM que dão acesso total, como `arn:aws:iam::aws:policy/AmazonS3FullAccess`. No entanto, por motivos de segurança, recomendamos que você use a política mais restritiva que permita que seu conector leia da AWS fonte ou grave no AWS coletor.

# Exemplo de política do IAM para o MSK Connect
<a name="mkc-iam-policy-examples"></a>

Para fornecer acesso total a todas as funcionalidades do MSK Connect a um usuário não administrador, anexe uma política como a seguinte ao perfil do IAM do usuário.

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Sid": "MSKConnectFullAccess",
      "Effect": "Allow",
      "Action": [
        "kafkaconnect:CreateConnector",
        "kafkaconnect:DeleteConnector",
        "kafkaconnect:DescribeConnector",
        "kafkaconnect:ListConnectors",
        "kafkaconnect:UpdateConnector",
        "kafkaconnect:CreateCustomPlugin",
        "kafkaconnect:DeleteCustomPlugin",
        "kafkaconnect:DescribeCustomPlugin",
        "kafkaconnect:ListCustomPlugins",
        "kafkaconnect:CreateWorkerConfiguration",
        "kafkaconnect:DeleteWorkerConfiguration",
        "kafkaconnect:DescribeWorkerConfiguration",
        "kafkaconnect:ListWorkerConfigurations"
      ],
      "Resource": "*"
    },
    {
      "Sid": "IAMPassRole",
      "Effect": "Allow",
      "Action": "iam:PassRole",
      "Resource": "arn:aws:iam::123456789012:role/MSKConnectServiceRole",
      "Condition": {
        "StringEquals": {
          "iam:PassedToService": "kafkaconnect.amazonaws.com"
        }
      }
    },
    {
      "Sid": "EC2NetworkAccess",
      "Effect": "Allow",
      "Action": [
        "ec2:CreateNetworkInterface",
        "ec2:DescribeNetworkInterfaces",
        "ec2:DeleteNetworkInterface",
        "ec2:DescribeVpcs",
        "ec2:DescribeSubnets",
        "ec2:DescribeSecurityGroups"
      ],
      "Resource": "*"
    },
    {
      "Sid": "MSKClusterAccess",
      "Effect": "Allow",
      "Action": [
        "kafka:DescribeCluster",
        "kafka:DescribeClusterV2",
        "kafka:GetBootstrapBrokers"
      ],
      "Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/myCluster/"
    },
    {
      "Sid": "MSKLogGroupAccess",
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "logs:DescribeLogStreams",
        "logs:DescribeLogGroups"
      ],
      "Resource": [
        "arn:aws:logs:us-east-1:123456789012:log-group:/aws/msk-connect/*"
      ]
    },
    {
      "Sid": "S3PluginAccess",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket",
        "s3:PutObject"
      ],
      "Resource": [
        "arn:aws:s3:::amzn-s3-demo-bucket1-custom-plugins",
        "arn:aws:s3:::amzn-s3-demo-bucket1-custom-plugins/*"
      ]
    }
  ]
}
```

------

# Prevenção do problema confused deputy entre serviços
<a name="cross-service-confused-deputy-prevention"></a>

O problema "confused deputy" é um problema de segurança em que uma entidade que não tem permissão para executar uma ação pode coagir uma entidade mais privilegiada a executar a ação. Em AWS, a falsificação de identidade entre serviços pode resultar em um problema confuso de delegado. A personificação entre serviços pode ocorrer quando um serviço (o *serviço de chamada*) chama outro serviço (o *serviço chamado*). O serviço de chamada pode ser manipulado de modo a usar suas permissões para atuar nos recursos de outro cliente de uma forma na qual ele não deveria ter permissão para acessar. Para evitar isso, a AWS fornece ferramentas que ajudam você a proteger seus dados para todos os serviços com entidades principais de serviço que receberam acesso aos recursos em sua conta. 

Recomendamos usar as chaves de contexto de condição global [https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourcearn](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourcearn) e [https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourceaccount](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourceaccount) em políticas de recursos para limitar as permissões que o MSK Connect concede a outro serviço para o recurso. Se o valor `aws:SourceArn` não contiver o ID da conta (p. ex., um ARN de um bucket do Amazon S3 não contiver o ID da conta), você deverá usar ambas as chaves de contexto de condição global para limitar as permissões. Se você utilizar ambas as chaves de contexto de condição global e o valor de `aws:SourceArn` contiver o ID da conta, o valor de `aws:SourceAccount` e a conta no valor de `aws:SourceArn` deverão utilizar o mesmo ID de conta quando utilizados na mesma declaração da política. Use `aws:SourceArn` se quiser apenas um recurso associado a acessibilidade de serviço. Use `aws:SourceAccount` se quiser permitir que qualquer recurso nessa conta seja associado ao uso entre serviços.

No caso do MSK Connect, o valor de `aws:SourceArn` deve ser um conector do MSK.

A maneira mais eficaz de se proteger do problema ‘confused deputy’ é usar a chave de contexto de condição global `aws:SourceArn` com o ARN completo do recurso. Se você não souber o ARN completo do recurso ou se estiver especificando vários recursos, use a chave de condição de contexto global `aws:SourceArn` com curingas (`*`) para as partes desconhecidas do ARN. Por exemplo, *arn:aws:kafkaconnect:us-east-1:123456789012:connector/\$1* representa todos os conectores que pertencem à conta com ID 123456789012 na região Leste dos EUA (Norte da Virgínia).

O exemplo a seguir mostra como é possível usar as chaves de contexto de condição globais `aws:SourceArn` e `aws:SourceAccount` no MSK Connect para evitar o problema “confused deputy”. Substitua *123456789012* e arn:aws:kafkaconnect: ::connector//pelas informações suas e do conector*us-east-1*. *123456789012* *my-S3-Sink-Connector* *abcd1234-5678-90ab-cdef-1234567890ab* Conta da AWS 

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": " kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "123456789012"
        },
        "ArnLike": {
        "aws:SourceArn": "arn:aws:kafkaconnect:us-east-1:123456789012:connector/my-S3-Sink-Connector/abcd1234-5678-90ab-cdef-1234567890ab"
        }
      }
    }   
  ]
}
```

------

# AWS políticas gerenciadas para o MSK Connect
<a name="mkc-security-iam-awsmanpol"></a>

Uma política AWS gerenciada é uma política autônoma criada e administrada por AWS. AWS as políticas gerenciadas são projetadas para fornecer permissões para muitos casos de uso comuns, para que você possa começar a atribuir permissões a usuários, grupos e funções.

Lembre-se de que as políticas AWS gerenciadas podem não conceder permissões de privilégio mínimo para seus casos de uso específicos porque elas estão disponíveis para uso de todos os AWS clientes. Recomendamos que você reduza ainda mais as permissões definindo as [ políticas gerenciadas pelo cliente](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html#customer-managed-policies) que são específicas para seus casos de uso.

Você não pode alterar as permissões definidas nas políticas AWS gerenciadas. Se AWS atualizar as permissões definidas em uma política AWS gerenciada, a atualização afetará todas as identidades principais (usuários, grupos e funções) às quais a política está anexada. AWS é mais provável que atualize uma política AWS gerenciada quando uma nova AWS service (Serviço da AWS) é lançada ou novas operações de API são disponibilizadas para serviços existentes.

Para saber mais, consulte [AWS Políticas gerenciadas pela ](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html#aws-managed-policies) no *Guia do usuário do IAM*.

## AWS política gerenciada: Amazon MSKConnect ReadOnlyAccess
<a name="security-iam-awsmanpol-AmazonMSKConnectReadOnlyAccess"></a>

Essa política concede ao usuário as permissões necessárias para listar e descrever os recursos do MSK Connect.

É possível anexar a política `AmazonMSKConnectReadOnlyAccess` às suas identidades do IAM.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:ListConnectors",
                "kafkaconnect:ListCustomPlugins",
                "kafkaconnect:ListWorkerConfigurations"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeConnector"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:connector/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeCustomPlugin"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:custom-plugin/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeWorkerConfiguration"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:worker-configuration/*"
            ]
        }
    ]
}
```

------

## AWS política gerenciada: KafkaConnectServiceRolePolicy
<a name="security-iam-awsmanpol-KafkaConnectServiceRolePolicy"></a>

Essa política concede ao serviço MSK Connect as permissões necessárias para criar e gerenciar interfaces de rede que tenham a tag `AmazonMSKConnectManaged:true`. Essas interfaces de rede permitem que a rede do MSK Connect acesse os recursos em sua Amazon VPC, como um cluster do Apache Kafka ou uma origem ou um coletor.

Você não pode se vincular KafkaConnectServiceRolePolicy às suas entidades do IAM. Essa política é anexada a um perfil vinculado a serviço que permite que o MSK Connect realize ações em seu nome.

------
#### [ JSON ]

****  

```
{
	"Version":"2012-10-17",		 	 	 
	"Statement": [
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateNetworkInterface"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"aws:RequestTag/AmazonMSKConnectManaged": "true"
				},
				"ForAllValues:StringEquals": {
					"aws:TagKeys": "AmazonMSKConnectManaged"
				}
			}
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateNetworkInterface"
			],
			"Resource": [
				"arn:aws:ec2:*:*:subnet/*",
				"arn:aws:ec2:*:*:security-group/*"
			]
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateTags"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"ec2:CreateAction": "CreateNetworkInterface"
				}
			}
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:DescribeNetworkInterfaces",
				"ec2:CreateNetworkInterfacePermission",
				"ec2:AttachNetworkInterface",
				"ec2:DetachNetworkInterface",
				"ec2:DeleteNetworkInterface"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"ec2:ResourceTag/AmazonMSKConnectManaged": "true"
				}
			}
		}
	]
}
```

------

## Atualizações do MSK Connect para políticas AWS gerenciadas
<a name="security-iam-awsmanpol-updates"></a>

Veja detalhes sobre as atualizações das políticas AWS gerenciadas do MSK Connect desde que esse serviço começou a rastrear essas alterações.


| Alteração | Descrição | Data | 
| --- | --- | --- | 
|  Atualização da política somente leitura do MSK Connect  |  O MSK Connect atualizou a MSKConnect ReadOnlyAccess política da Amazon para remover as restrições nas operações de listagem.  | 13 de outubro de 2021 | 
|  O MSK Connect começou a monitorar alterações  |  O MSK Connect começou a monitorar as mudanças em suas políticas AWS gerenciadas.  | 14 de setembro de 2021 | 

# Usar perfis vinculados ao serviço para o MSK Connect
<a name="mkc-using-service-linked-roles"></a>

O Amazon MSK Connect usa funções AWS Identity and Access Management [vinculadas a serviços](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_terms-and-concepts.html#iam-term-service-linked-role) (IAM). Um perfil vinculado a serviço é um tipo especial de perfil do IAM vinculado diretamente ao MSK Connect. As funções vinculadas ao serviço são predefinidas pelo MSK Connect e incluem todas as permissões que o serviço exige para chamar outros AWS serviços em seu nome. 

Um perfil vinculado a serviço facilita a configuração do MSK Connect porque você não precisa adicionar as permissões necessárias manualmente. O MSK Connect define as permissões dos perfis vinculados ao serviço e, exceto se definido de outra forma, somente o MSK Connect pode assumir seus perfis. As permissões definidas incluem a política de confiança e a política de permissões, que não pode ser anexada a nenhuma outra entidade do IAM.

Para obter informações sobre outros serviços compatíveis com funções vinculadas a serviços, consulte [Produtos da AWS compatíveis com o IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_aws-services-that-work-with-iam.html) e procure por serviços que apresentam **Sim** na coluna **Funções vinculadas ao serviço**. Escolha um **Sim** com um link para visualizar a documentação do perfil vinculado para esse serviço.

## Permissões de perfil vinculado a serviço para o MSK Connect
<a name="slr-permissions"></a>

O MSK Connect usa a função vinculada ao serviço chamada — **AWSServiceRoleForKafkaConnect**Permite que o Amazon MSK Connect acesse os recursos da Amazon em seu nome.

A função AWSService RoleForKafkaConnect vinculada ao serviço confia no `kafkaconnect.amazonaws.com` serviço para assumir a função.

Para obter mais informações sobre a política de permissões usada pelo perfil, consulte [AWS política gerenciada: KafkaConnectServiceRolePolicy](mkc-security-iam-awsmanpol.md#security-iam-awsmanpol-KafkaConnectServiceRolePolicy).

Você deve configurar permissões para que uma entidade do IAM (por exemplo, um usuário, grupo ou função) crie, edite ou exclua um perfil vinculado a serviço. Para saber mais, consulte [Permissões de Função Vinculadas ao Serviço](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#service-linked-role-permissions) no *Guia do Usuário do IAM*.

## Criação de um perfil vinculado a serviço para o MSK Connect
<a name="create-slr"></a>

Não é necessário criar manualmente um perfil vinculado ao serviço. Quando você cria um conector na Console de gerenciamento da AWS, na ou na AWS API AWS CLI, o MSK Connect cria a função vinculada ao serviço para você. 

Se excluir esse perfil vinculado ao serviço e precisar criá-lo novamente, será possível usar esse mesmo processo para recriar o perfil em sua conta. Quando você cria um conector, o MSK Connect cria um perfil vinculado a serviço para você novamente. 

## Edição de um perfil vinculado a serviço para o MSK Connect
<a name="edit-slr"></a>

O MSK Connect não permite que você edite a função vinculada ao AWSService RoleForKafkaConnect serviço. Depois que você criar um perfil vinculado ao serviço, não poderá alterar o nome do perfil, pois várias entidades podem fazer referência ao perfil. No entanto, você poderá editar a descrição do perfil usando o IAM. Para saber mais, consulte [Editar uma função vinculada a serviço](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#edit-service-linked-role) no *Guia do usuário do IAM*.

## Exclusão de um perfil vinculado a serviço para o MSK Connect
<a name="delete-slr"></a>

Você pode usar o console do IAM AWS CLI ou a AWS API para excluir manualmente a função vinculada ao serviço. Para isso, primeiro é necessário excluir manualmente todos os conectores do MSK Connect e excluir o perfil manualmente. Para saber mais, consulte [Excluir um perfil vinculado ao serviço](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#delete-service-linked-role) no *Guia do usuário do IAM*.

## Regiões compatíveis com perfis vinculados a serviço do MSK Connect
<a name="slr-regions"></a>

O MSK Connect é compatível com perfis vinculados a serviço em todas as regiões nas quais o serviço esteja disponível. Para obter mais informações, consulte [Regiões e endpoints da AWS](https://docs.aws.amazon.com/general/latest/gr/rande.html).

# Habilitar o acesso à internet para o Amazon MSK Connect
<a name="msk-connect-internet-access"></a>

Se o seu conector para o Amazon MSK Connect precisar de acesso à Internet, recomendamos que você use as seguintes configurações Amazon Virtual Private Cloud (VPC) para habilitar esse acesso.
+ Configure seu conector com sub-redes privadas.
+ Crie um [gateway NAT](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html) público ou uma [instância NAT](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_NAT_Instance.html) pública para sua VPC em uma sub-rede pública. *Para obter mais informações, consulte a página [Conectar sub-redes à Internet ou a outros dispositivos VPCs usando NAT no Guia](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html) do *Amazon Virtual Private Cloud*usuário.* 
+ Permita o tráfego de saída de suas sub-redes privadas para seu gateway ou instância NAT.

# Configurar um gateway NAT para o Amazon MSK Connect
<a name="msk-connect-internet-access-private-subnets-example"></a>

As etapas a seguir mostram como configurar um gateway NAT para permitir o acesso à Internet para um conector. Você deve concluir estas etapas antes de criar um conector em uma sub-rede privada.

## Concluir pré-requisitos para configurar um gateway NAT
<a name="msk-connect-internet-access-private-subnets-prereq"></a>

Certifique-se de ter os seguintes itens.
+ O ID do Amazon Virtual Private Cloud (VPC) associado ao seu cluster. Por exemplo, *vpc-123456ab*.
+ A IDs das sub-redes privadas em sua VPC. Por exemplo, *subnet-a1b2c3de*, *subnet-f4g5h6ij* etc. Você deve configurar seu conector com sub-redes privadas.

## Habilitar o acesso à internet para o conector
<a name="msk-connect-internet-access-private-subnets-steps"></a>

**Para habilitar o acesso à Internet para seu conector**

1. Abra o Amazon Virtual Private Cloud console em [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Crie uma sub-rede pública para seu gateway NAT com um nome descritivo e anote o ID da sub-rede. Para obter instruções detalhadas, consulte [Criar uma sub-rede na VPC](https://docs.aws.amazon.com/vpc/latest/userguide/working-with-vpcs.html#AddaSubnet).

1. Crie um gateway da Internet para que a VPC possa se comunicar com a Internet e anote o ID do gateway. Anexe o gateway da internet à sua VPC. Para obter mais instruções, consulte [Criar e anexar um gateway da Internet à VPC](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Attach_Gateway).

1. Provisione um gateway NAT público para que os hosts em suas sub-redes privadas possam acessar sua sub-rede pública. Ao criar o gateway NAT, selecione a sub-rede pública que você criou anteriormente. Para obter instruções, consulte [Create a NAT gateway](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html#nat-gateway-creating) (Criar um gateway NAT)

1. Configure suas tabelas de rotas. Para concluir essa configuração, você deve ter duas tabelas de rotas no total. Você já deve ter uma tabela de rotas principal criada automaticamente junto com sua VPC. Nesta etapa, você cria uma tabela de rotas adicional para sua sub-rede pública.

   1. Use as configurações a seguir para modificar a tabela de rotas principal da sua VPC para que suas sub-redes privadas roteiem o tráfego para seu gateway NAT. Para obter instruções, consulte [Trabalhar com tabelas de rotas](https://docs.aws.amazon.com/vpc/latest/userguide/WorkWithRouteTables.html) no *Guia do usuário* do *Amazon Virtual Private Cloud*.  
**Tabela de rotas MSKC privado**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/msk/latest/developerguide/msk-connect-internet-access-private-subnets-example.html)

   1. Siga as instruções em [Criar uma tabela de rotas personalizada](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Routing) para criar uma tabela de rotas para sua sub-rede pública. Ao criar a tabela, insira um nome descritivo no campo **Tag de nome** para ajudar você a identificar a qual sub-rede a tabela está associada. Por exemplo, **MSKC público**.

   1. Configure sua tabela de rotas **MSKC público** usando as configurações a seguir.  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/msk/latest/developerguide/msk-connect-internet-access-private-subnets-example.html)

# Saiba mais sobre nomes de host DNS privados
<a name="msk-connect-dns"></a>

Com o suporte a nomes de host DNS privados no MSK Connect, você pode configurar conectores para consultar nomes de domínio públicos ou privados. O suporte dependerá dos servidores DNS especificados no *Conjunto de opções de DHCP* da VPC.

Um conjunto de opções de DHCP é um grupo de configurações de rede que instâncias do EC2 usam em uma VPC para comunicação pela rede da VPC. Cada VPC tem um conjunto padrão de opções de DHCP, mas você pode criar um conjunto personalizado de opções de DHCP se quiser que as instâncias em sua VPC usem um servidor de DNS diferente para a resolução de nomes de domínio em vez do servidor DNS fornecido pela Amazon. Consulte [Conjuntos de opções de DHCP na Amazon VPC](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_DHCP_Options.html).

Antes de a resolução capability/feature de DNS privado ser incluída no MSK Connect, os conectores usavam o serviço de resolvedores de DNS VPC para consultas de DNS de um conector de cliente. Os conectores não usavam os servidores DNS definidos nos conjuntos de opções de DHCP da VPC do cliente para a resolução de DNS.

Os conectores só podiam consultar nomes de host nas configurações de conectores do cliente ou em plug-ins que fossem resolvíveis publicamente. Eles não conseguiam resolver nomes de host privados definidos em uma zona hospedada de maneira privada nem usar servidores DNS em outra rede de clientes.

Sem o DNS privado, os clientes que optaram por tornar seus bancos de dados, data warehouses e sistemas como o Secrets Manager em sua própria VPC inacessíveis à Internet não poderiam trabalhar com conectores do MSK. Geralmente os clientes usam nomes de host DNS privados para atender à postura de segurança corporativa.

# Configurar um conjunto de opções de DHCP da VPC para o conector
<a name="msk-connect-dns-config-dhcp"></a>

Os conectores usam automaticamente os servidores DNS definidos em seu conjunto de opções de DHCP da VPC quando o conector é criado. Antes de criar um conector, certifique-se de configurar o conjunto de opções de DHCP da VPC para os requisitos de resolução de nome de host DNS do seu conector.

Os conectores criados antes da disponibilização do recurso de nome de host DNS privado no MSK Connect continuam usando a configuração de resolução de DNS anterior sem necessidade de modificação.

Se você precisar apenas de uma resolução de nome de host DNS que possa ser resolvida publicamente em seu conector, para facilitar a configuração, recomendamos usar a VPC padrão da sua conta ao criar o conector. Consulte o [Servidor DNS da Amazon](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#AmazonDNS) no *Guia do usuário da Amazon VPC* para obter mais informações sobre o servidor DNS fornecido pela Amazon ou sobre o Amazon Route 53 Resolver.

Se você precisar resolver nomes de host DNS privados, certifique-se de que a VPC transmitida durante a criação do conector tenha suas opções de DHCP configuradas corretamente. Para obter mais informações, consulte [Trabalhar com conjuntos de opções de DHCP](https://docs.aws.amazon.com/vpc/latest/userguide/DHCPOptionSet.html) no *Guia do usuário da Amazon VPC*.

Ao configurar um conjunto de opções de DHCP para resolução de nome de host DNS privado, certifique-se de que o conector possa acessar os servidores DNS personalizados que você configurar no conjunto de opções de DHCP. Caso contrário, a criação do conector falhará.

Após personalizar o conjunto de opções de DHCP da VPC, os conectores criados posteriormente nessa VPC usarão os servidores DNS que você especificou no conjunto de opções. Se você alterar o conjunto de opções após criar um conector, o conector adotará as configurações do novo conjunto de opções em alguns minutos.

# Configurar atributos de DNS para a VPC
<a name="msk-connect-dns-attributes"></a>

Certifique-se de ter os atributos de DNS da VPC configurados corretamente conforme descrito em [Atributos de DNS em sua VPC](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#vpc-dns-support) e [Nomes de host DNS](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#vpc-dns-hostnames) no *Guia do usuário da Amazon VPC*.

Consulte [Resolvendo consultas de DNS entre VPCs e sua rede](https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/resolver.html) no *Guia do desenvolvedor do Amazon Route 53* para obter informações sobre o uso de endpoints de resolução de entrada e saída para conectar outras redes à sua VPC e trabalhar com seu conector.

# Resolver falhas na criação do conector
<a name="msk-connect-dns-failure-handling"></a>

Esta seção descreve possíveis falhas na criação de conectores associadas à resolução de DNS e ações sugeridas para resolver os problemas.


| Falha | Ação sugerida | 
| --- | --- | 
|  A criação do conector falhará se uma consulta de resolução de DNS falhar ou se os servidores DNS estiverem inacessíveis pelo conector.  |  Você pode ver falhas na criação de conectores devido a consultas malsucedidas de resolução de DNS em seus CloudWatch registros, se tiver configurado esses registros para seu conector. Verifique as configurações do servidor DNS e garanta a conectividade de rede com os servidores DNS pelo conector.  | 
|  Se você alterar a configuração dos servidores DNS no conjunto de opções de DHCP da VPC enquanto um conector estiver em execução, as consultas de resolução de DNS do conector poderão falhar. Se a resolução de DNS falhar, algumas das tarefas do conector podem entrar em um estado de falha.  |  Você pode ver falhas na criação de conectores devido a consultas malsucedidas de resolução de DNS em seus CloudWatch registros, se tiver configurado esses registros para seu conector. As tarefas com falha deverão reiniciar automaticamente para que o conector volte a funcionar. Se isso não acontecer, você pode entrar em contato com o suporte para reiniciar as tarefas que falharam no conector ou recriar o conector.  | 

# Segurança no MSK Connect
<a name="msk-connect-security"></a>

Você pode usar uma interface VPC Endpoint, alimentada por AWS PrivateLink, para evitar que o tráfego entre seu Amazon VPC e o Amazon MSK-Connect compatível saia da rede Amazon. APIs Os endpoints VPC de interface não exigem um gateway de internet, dispositivo NAT, conexão VPN ou conexão. AWS Direct Connect Para obter mais informações, consulte [Use o Amazon MSK APIs com endpoints de interface VPC](privatelink-vpc-endpoints.md).

# Registro em log no MSK Connect
<a name="msk-connect-logging"></a>

O MSK Connect pode gravar eventos de log que você pode usar para depurar seu conector. Ao criar um conector, você pode especificar zero ou mais dos seguintes destinos de log:
+ Amazon CloudWatch Logs: você especifica o grupo de logs para o qual deseja que o MSK Connect envie os eventos de log do seu conector. Para obter informações sobre como criar um grupo de registros, consulte [Criar um grupo de registros](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html#Create-Log-Group) no *Guia do usuário de CloudWatch registros*.
+ Amazon S3: você especifica o bucket do S3 para o qual deseja que o MSK Connect envie os eventos de log do seu conector. Para obter mais informações sobre como criar um bucket do S3, consulte [Criar um bucket](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html), no *Guia do usuário do Amazon S3*.
+ Amazon Data Firehose: você especifica o stream de entrega para o qual deseja que o MSK Connect envie os eventos de log do conector. Para obter informações sobre como criar um stream de entrega, consulte [Creating an Amazon Data Firehose delivery stream](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html) no *Guia do usuário do Firehose*.

Para saber mais sobre como configurar o registro em log, consulte [Habilitar o registro em log de determinados serviços da AWS](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AWS-logs-and-resource-policy.html) no *Guia do usuário do Amazon CloudWatch Logs *.

O MSK Connect emite os seguintes tipos de eventos de log:


****  

| Nível | Description | 
| --- | --- | 
| INFO | Eventos de runtime de interesse na inicialização e no desligamento. | 
| WARN | Situações de runtime que não são erros, mas são indesejáveis ou inesperadas. | 
| FATAL | Erros graves que causam encerramento prematuro. | 
| ERROR | Condições inesperadas e erros de runtime que não são fatais. | 

Veja a seguir um exemplo de um evento de registro enviado para o CloudWatch Logs:

```
[Worker-0bb8afa0b01391c41] [2021-09-06 16:02:54,151] WARN [Producer clientId=producer-1] Connection to node 1 (b-1.my-test-cluster.twwhtj.c2.kafka.us-east-1.amazonaws.com/INTERNAL_IP) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:782)
```

## Como evitar que segredos apareçam nos logs do conector
<a name="msk-connect-logging-secrets"></a>

**nota**  
Valores confidenciais de configuração podem aparecer nos registros do conector se um plug-in não definir esses valores como secretos. O Kafka Connect trata valores de configuração indefinidos da mesma forma que qualquer outro valor de texto simples.

Se seu plug-in definir uma propriedade como secreta, o Kafka Connect editará o valor da propriedade nos registros do conector. Por exemplo, os registros de conectores a seguir demonstram que o valor será substituído por `[hidden]` se um plug-in definir `aws.secret.key` como um tipo `PASSWORD`.

```
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] [2022-01-11 15:18:55,150] INFO SecretsManagerConfigProviderConfig values:
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.access.key = my_access_key
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.region = us-east-1
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.secret.key = [hidden]
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] secret.prefix =
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] secret.ttl.ms = 300000
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] (com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProviderConfig:361)
```

Para evitar que segredos apareçam nos arquivos de log do conector, um desenvolvedor de plug-ins deve usar a constante de enumeração [https://kafka.apache.org/27/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#PASSWORD](https://kafka.apache.org/27/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#PASSWORD) do Kafka Connect para definir propriedades confidenciais. Quando uma propriedade for do tipo `ConfigDef.Type.PASSWORD`, o Kafka Connect excluirá seu valor dos registros do conector, mesmo que o valor seja enviado como texto simples. 

# Monitoramento do Amazon MSK Connect
<a name="mkc-monitoring-overview"></a>

O monitoramento é uma parte importante da manutenção da confiabilidade, disponibilidade e desempenho do MSK Connect e de suas outras AWS soluções. A Amazon CloudWatch monitora seus AWS recursos e os aplicativos nos quais você executa AWS em tempo real. Você pode coletar e rastrear métricas, criar painéis personalizados e definir alarmes que o notificam ou que realizam ações quando uma métrica especificada atinge um limite definido. Por exemplo, você pode CloudWatch monitorar o uso da CPU ou outras métricas do seu conector, para que você possa aumentar sua capacidade, se necessário. Para obter mais informações, consulte o [Guia CloudWatch do usuário da Amazon](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/).

Você pode usar as seguintes operações de API:
+ `DescribeConnectorOperation`: monitora o status das operações de atualização do conector.
+ `ListConnectorOperations`: acompanha as atualizações anteriores executadas no seu conector.

A tabela a seguir mostra as métricas que o MSK Connect envia CloudWatch sob a `ConnectorName` dimensão. O MSK Connect fornece essas métricas por padrão e sem custo adicional. CloudWatch mantém essas métricas por 15 meses, para que você possa acessar informações históricas e ter uma perspectiva melhor sobre o desempenho de seus conectores. Você também pode definir alarmes que observam determinados limites e enviam notificações ou realizam ações quando esses limites são atingidos. Para obter mais informações, consulte o [Guia CloudWatch do usuário da Amazon](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/).


| Nome da métrica | Description | 
| --- | --- | 
| CpuUtilization | O percentual de consumo de CPU por sistema e usuário. | 
| ErroredTaskCount | O número de tarefas que apresentaram erro. | 
| MemoryUtilization | O percentual da memória total em uma instância de agente, não apenas a memória de pilha da máquina virtual Java (JVM) atualmente em uso. Normalmente, a JVM não libera memória de volta para o sistema operacional. Portanto, o tamanho da pilha da JVM (MemoryUtilization) geralmente começa com um tamanho mínimo de pilha que aumenta incrementalmente até um máximo estável de cerca de 80-90%. O uso da pilha da JVM pode aumentar ou diminuir conforme o uso efetivo da memória do conector muda. | 
| RebalanceCompletedTotal | O número total de rebalanceamentos concluídos por esse conector. | 
| RebalanceTimeAvg | O tempo médio em milissegundos gasto pelo conector no rebalanceamento.  | 
| RebalanceTimeMax | O tempo máximo em milissegundos gasto pelo conector no rebalanceamento. | 
| RebalanceTimeSinceLast |  O tempo em milissegundos desde que esse conector concluiu o rebalanceamento mais recente.  | 
| RunningTaskCount | O número de tarefas em execução no conector. | 
| SinkConsumerByteRate | O número médio de bytes consumidos por segundo pelo consumidor de coletor da estrutura do Kafka Connect antes que qualquer transformação seja aplicada aos dados. | 
| SinkRecordReadRate | O número médio de registros lidos por segundo do cluster do Apache Kafka ou do Amazon MSK. | 
| SinkRecordSendRate | O número médio de registros que são gerados pelas transformações e enviados ao destino por segundo. Esse número não inclui registros filtrados. | 
| SourceRecordPollRate | O número médio de registros produzidos ou pesquisados por segundo. | 
| SourceProducerByteRate | O número médio de bytes produzidos por segundo pelo produtor de código-fonte da estrutura do Kafka Connect após a aplicação de transformação aos dados. | 
| SourceRecordWriteRate | O número médio de registros gerados pelas transformações e gravados no cluster do Apache Kafka ou do Amazon MSK por segundo. | 
| TaskStartupAttemptsTotal | O número total de inicializações de tarefas que o conector tentou realizar. Você pode usar essa métrica para identificar anomalias nas tentativas de inicialização de tarefas. | 
| TaskStartupSuccessPercentage | O percentual médio de tarefas bem-sucedidas iniciadas para o conector. Você pode usar essa métrica para identificar anomalias nas tentativas de inicialização de tarefas. | 
| WorkerCount | O número de operadores em execução no conector. | 
| BytesInPerSec | Bytes de metadados transferidos para a estrutura do Kafka Connect para comunicação entre operadores. | 
| BytesOutPerSec | Bytes de metadados transferidos a partir da estrutura do Kafka Connect para comunicação entre operadores. | 

# Exemplos para configurar os recursos do Amazon MSK Connect
<a name="msk-connect-examples"></a>

Esta seção inclui exemplos para ajudar você a configurar os recursos do Amazon MSK Connect, como conectores e provedores de configuração terceirizados comuns.

**Topics**
+ [Configurar o conector de coletor do Amazon S3](mkc-S3sink-connector-example.md)
+ [Configure o conector de coletor EventBridge Kafka para o MSK Connect](mkc-eventbridge-kafka-connector.md)
+ [Usar o conector de origem Debezium com provedor de configuração](mkc-debeziumsource-connector-example.md)

# Configurar o conector de coletor do Amazon S3
<a name="mkc-S3sink-connector-example"></a>

Este exemplo mostra como usar o conector coletor Confluent [Amazon S3 e como criar um conector coletor](https://www.confluent.io/hub/confluentinc/kafka-connect-s3) Amazon S3 AWS CLI no MSK Connect.

1. Copie e cole o JSON a seguir em um novo arquivo. Substitua as sequências de caracteres de espaço reservado por valores que correspondam à string de conexão dos servidores bootstrap do seu cluster Amazon MSK e à sub-rede e ao grupo de segurança do cluster. IDs Para obter mais informações sobre como configurar um perfil de execução de serviços, consulte [Perfis e políticas do IAM para o MSK Connect](msk-connect-iam.md).

   ```
   {
       "connectorConfiguration": {
           "connector.class": "io.confluent.connect.s3.S3SinkConnector",
           "s3.region": "us-east-1",
           "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
           "flush.size": "1",
           "schema.compatibility": "NONE",
           "topics": "my-test-topic",
           "tasks.max": "2",
           "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
           "storage.class": "io.confluent.connect.s3.storage.S3Storage",
           "s3.bucket.name": "amzn-s3-demo-bucket"
       },
       "connectorName": "example-S3-sink-connector",
       "kafkaCluster": {
           "apacheKafkaCluster": {
               "bootstrapServers": "<cluster-bootstrap-servers-string>",
               "vpc": {
                   "subnets": [
                       "<cluster-subnet-1>",
                       "<cluster-subnet-2>",
                       "<cluster-subnet-3>"
                   ],
                   "securityGroups": ["<cluster-security-group-id>"]
               }
           }
       },
       "capacity": {
           "provisionedCapacity": {
               "mcuCount": 2,
               "workerCount": 4
           }
       },
       "kafkaConnectVersion": "2.7.1",
       "serviceExecutionRoleArn": "<arn-of-a-role-that-msk-connect-can-assume>",
       "plugins": [
           {
               "customPlugin": {
                   "customPluginArn": "<arn-of-custom-plugin-that-contains-connector-code>",
                   "revision": 1
               }
           }
       ],
       "kafkaClusterEncryptionInTransit": {"encryptionType": "PLAINTEXT"},
       "kafkaClusterClientAuthentication": {"authenticationType": "NONE"}
   }
   ```

1. Execute o AWS CLI comando a seguir na pasta em que você salvou o arquivo JSON na etapa anterior.

   ```
   aws kafkaconnect create-connector --cli-input-json file://connector-info.json
   ```

   Veja a seguir um exemplo da saída que você vai obter ao executar o comando com êxito.

   ```
   {
       "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-S3-sink-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", 
       "ConnectorState": "CREATING", 
       "ConnectorName": "example-S3-sink-connector"
   }
   ```

# Configure o conector de coletor EventBridge Kafka para o MSK Connect
<a name="mkc-eventbridge-kafka-connector"></a>

Este tópico mostra como configurar o conector do coletor [EventBridge Kafka para o MSK Connect](https://github.com/awslabs/eventbridge-kafka-connector). Esse conector permite que você envie eventos do seu cluster MSK para [barramentos de EventBridge eventos](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-bus.html). Este tópico descreve o processo para criar os recursos necessários e configurar o conector para permitir um fluxo de dados contínuo entre Kafka e. EventBridge 

**Topics**
+ [Pré-requisitos](#mkc-eb-kafka-prerequisites)
+ [Configurar os recursos necessários para o MSK Connect](#mkc-eb-kafka-set-up-resources)
+ [Criar o conector](#mkc-eb-kafka-create-connector)
+ [Enviar mensagens para o Kafka](#mkc-eb-kafka-send-json-encoded-messages)

## Pré-requisitos
<a name="mkc-eb-kafka-prerequisites"></a>

Antes de implantar o conector, verifique se você tem os seguintes recursos:
+ **Cluster Amazon MSK**: um cluster MSK ativo para produzir e consumir mensagens do Kafka.
+ **Ônibus de EventBridge eventos da Amazon**: um ônibus de EventBridge eventos para receber eventos dos tópicos de Kafka.
+ **Funções do IAM**: crie funções do IAM com as permissões necessárias para o MSK Connect e o EventBridge conector.
+ [Acesso à Internet pública a](msk-connect-internet-access.md) partir do MSK Connect ou de um endpoint EventBridge de [interface VPC criado](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-related-service-vpc.html) na VPC e na sub-rede do seu cluster MSK. Isso ajuda a evitar a passagem pela Internet pública e evita a necessidade de gateways NAT.
+ Uma [máquina cliente](create-serverless-cluster-client.md), como uma instância do Amazon EC2 ou [AWS CloudShell](https://aws.amazon.com/cloudshell/), para criar tópicos e enviar logs para o Kafka.

## Configurar os recursos necessários para o MSK Connect
<a name="mkc-eb-kafka-set-up-resources"></a>

Crie um perfil do IAM para o conector e, em seguida, crie o conector. Você também cria uma EventBridge regra para filtrar os eventos do Kafka enviados para o EventBridge ônibus de eventos.

**Topics**
+ [Perfil do IAM para o conector](#mkc-eb-kafka-iam-role-connector)
+ [Uma EventBridge regra para eventos recebidos](#mkc-eb-kafka-create-rule)

### Perfil do IAM para o conector
<a name="mkc-eb-kafka-iam-role-connector"></a>

A função do IAM que você associa ao conector deve ter a [PutEvents](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-permissions-reference.html)permissão para permitir o envio de eventos para EventBridge. O exemplo de política do IAM a seguir concede a você a permissão para enviar eventos para um barramento de eventos chamado `example-event-bus`. Certifique-se de substituir o ARN do recurso no exemplo a seguir pelo ARN do seu barramento de eventos.

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "events:PutEvents"
      ],
      "Resource": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus"
    }
  ]
}
```

------

Você também deve garantir que o perfil do IAM para o conector contenha a política de confiança a seguir.

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```

------

### Uma EventBridge regra para eventos recebidos
<a name="mkc-eb-kafka-create-rule"></a>

Você cria [regras](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html) que combinam eventos recebidos com critérios de dados de eventos, conhecidos como [https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html). Com um padrão de evento, você pode definir os critérios para filtrar os eventos recebidos e definir quais eventos devem acionar uma regra específica e, posteriormente, serem roteados para um [destino](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html) designado. O exemplo a seguir de um padrão de evento corresponde aos eventos do Kafka enviados para o EventBridge barramento de eventos.

```
{
  "detail": {
    "topic": ["msk-eventbridge-tutorial"]
  }
}
```

Veja a seguir um exemplo de um evento enviado do Kafka para EventBridge usar o conector do coletor Kafka.

```
{
  "version": "0",
  "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57",
  "account": "123456789012",
  "time": "2025-03-26T10:15:00Z",
  "region": "us-east-1",
  "detail-type": "msk-eventbridge-tutorial",
  "source": "kafka-connect.msk-eventbridge-tutorial",
  "resources": [],
  "detail": {
    "topic": "msk-eventbridge-tutorial",
    "partition": 0,
    "offset": 0,
    "timestamp": 1742984100000,
    "timestampType": "CreateTime",
    "headers": [],
    "key": "order-1",
    "value": {
      "orderItems": [
        "item-1",
        "item-2"
      ],
      "orderCreatedTime": "Wed Mar 26 10:15:00 UTC 2025"
    }
  }
}
```

No EventBridge console, [crie uma regra](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-create-rule.html) no barramento de eventos usando esse padrão de exemplo e especifique um destino, como um grupo de CloudWatch registros. O EventBridge console configurará automaticamente a política de acesso necessária para o grupo CloudWatch Registros.

## Criar o conector
<a name="mkc-eb-kafka-create-connector"></a>

Na seção a seguir, você cria e implanta o [conector coletor EventBridge Kafka](https://github.com/awslabs/eventbridge-kafka-connector) usando o. Console de gerenciamento da AWS

**Topics**
+ [Etapa 1: Download do coletor](#mkc-eb-kafka-download-connector)
+ [Etapa 2: criar um bucket do Amazon S3](#mkc-eb-kafka-s3-bucket-create)
+ [Etapa 3: criar um plug-in no MSK Connect](#mkc-eb-kafka-create-plugin)
+ [Etapa 4: criar o conector](#mkc-eb-kafka-create-connector)

### Etapa 1: Download do coletor
<a name="mkc-eb-kafka-download-connector"></a>

Baixe o coletor de EventBridge conectores JAR mais recente na [página de GitHub lançamentos](https://github.com/awslabs/eventbridge-kafka-connector/releases) do conector EventBridge Kafka. Por exemplo, para baixar a versão v1.4.1, escolha o link `kafka-eventbridge-sink-with-dependencies.jar` do arquivo JAR para baixar o conector. Em seguida, salve o arquivo no local desejado da sua máquina.

### Etapa 2: criar um bucket do Amazon S3
<a name="mkc-eb-kafka-s3-bucket-create"></a>

1. Para armazenar o arquivo JAR no Amazon S3 para uso com o MSK Connect, abra Console de gerenciamento da AWS o e escolha Amazon S3.

1. No console do Amazon S3, escolha **Criar bucket** e insira um nome de bucket exclusivo. Por exemplo, .**amzn-s3-demo-bucket1-eb-connector**

1. Escolha uma região apropriada para o bucket do Amazon S3. Verifique se ele corresponde à região em que seu cluster MSK está implantado.

1. Para as **Configurações do Bucket**, mantenha as seleções padrão ou ajuste conforme necessário.

1. Selecione **Create bucket** (Criar bucket)

1. Faça upload do arquivo JAR no bucket do Amazon S3.

### Etapa 3: criar um plug-in no MSK Connect
<a name="mkc-eb-kafka-create-plugin"></a>

1. Abra o e Console de gerenciamento da AWS, em seguida, navegue até o **MSK Connect**.

1. No painel de navegação à esquerda, escolha **Plug-ins personalizados**.

1. Escolha **Criar plug-in** e insira o **Nome do plug-in**. Por exemplo, .**eventbridge-sink-plugin**

1. Em **Localização do plug-in personalizado**, cole a **URL do objeto S3**.

1. Adicione uma descrição opcional para o plug-in.

1. Escolha **Criar plug-in**.

Depois que o plug-in for criado, você poderá usá-lo para configurar e implantar o conector EventBridge Kafka no MSK Connect.

### Etapa 4: criar o conector
<a name="mkc-eb-kafka-create-connector"></a>

Antes de criar o conector, recomendamos criar o tópico necessário do Kafka para evitar erros no conector. Para criar o tópico, use sua máquina cliente.

1. No painel esquerdo do console do MSK, escolha **Conectores** e **Criar conector**.

1. Na lista de plug-ins, escolha **eventbridge-sink-plugin** e escolha **Próximo**.

1. Insira **EventBridgeSink** como o nome do conector.

1. Na lista de clusters, escolha seu cluster do MSK.

1. <a name="connector-ex"></a>Copie a configuração a seguir e cole no campo **Configuração do conector**.

   Substitua os espaços reservados na configuração a seguir de acordo com a necessidade.
   + Remova `aws.eventbridge.endpoint.uri` se seu cluster do MSK tiver acesso à Internet pública.
   + Se você costuma PrivateLink se conectar com segurança do MSK a EventBridge, substitua a parte DNS depois `https://` pelo nome DNS privado correto do endpoint da interface VPC (opcional) criado anteriormente. EventBridge 
   + Substitua o ARN do barramento de EventBridge eventos na configuração a seguir pelo ARN do seu barramento de eventos.
   + Atualize todos os valores específicos da região.

   ```
   {
     "connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector",
     "aws.eventbridge.connector.id": "msk-eventbridge-tutorial",
     "topics": "msk-eventbridge-tutorial",
     "tasks.max": "1",
     "aws.eventbridge.endpoint.uri": "https://events.us-east-1.amazonaws.com",
     "aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus",
     "value.converter.schemas.enable": "false",
     "value.converter": "org.apache.kafka.connect.json.JsonConverter",
     "aws.eventbridge.region": "us-east-1",
     "auto.offset.reset": "earliest",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }
   ```

   Para obter mais informações sobre a configuração do conector, consulte [eventbridge-kafka-connector](https://github.com/awslabs/eventbridge-kafka-connector).

   Se necessário, altere as configurações dos operadores e o escalonamento automático. Também recomendamos usar a versão mais recente (recomendada) do Apache Kafka Connect disponível no menu suspenso. Em **Permissões de acesso**, use a função criada anteriormente. Também recomendamos ativar o registro em para fins de CloudWatch observabilidade e solução de problemas. Ajuste as outras configurações opcionais, como tags, de acordo com suas necessidades. Em seguida, implante o conector e aguarde até que o status esteja no estado Executando.

## Enviar mensagens para o Kafka
<a name="mkc-eb-kafka-send-json-encoded-messages"></a>

Você pode configurar codificações de mensagens, como Apache Avro e JSON, especificando diferentes conversores usando `value.converter` e, opcionalmente, as configurações de `key.converter` disponíveis no Kafka Connect.

O [connector example](#connector-ex) neste tópico está configurado para funcionar com mensagens codificadas em JSON, conforme indicado pelo uso de `org.apache.kafka.connect.json.JsonConverter` em `value converter`. Quando o conector estiver no estado Executando, envie logs para o tópico `msk-eventbridge-tutorial` do Kafka da sua máquina cliente.

# Usar o conector de origem Debezium com provedor de configuração
<a name="mkc-debeziumsource-connector-example"></a>

Este exemplo mostra como usar o plug-in do conector Debezium para MySQL com um banco de dados [Amazon Aurora](https://aws.amazon.com/rds/aurora/) compatível com MySQL como origem. Neste exemplo, também configuramos o [AWS Secrets Manager Config Provider](https://github.com/jcustenborder/kafka-config-provider-aws) de código aberto para externalizar as credenciais do banco de dados no AWS Secrets Manager. Para saber mais sobre os provedores de configuração, consulte [Tutorial: Externalizar informações confidenciais usando provedores de configuração](msk-connect-config-provider.md).

**Importante**  
O plug-in do conector Debezium para MySQL é [compatível com apenas uma tarefa](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-tasks-max) e não funciona com o modo de capacidade de escalabilidade automática para o Amazon MSK Connect. Em vez disso, você deve usar o modo de capacidade provisionada e definir `workerCount` igual a um na configuração do conector. Para saber mais sobre os modos de capacidade do MSK Connect, consulte [Saiba mais sobre a capacidade de conectores](msk-connect-capacity.md).

# Pré-requisitos concluídos para usar o conector de origem Debezium
<a name="mkc-debeziumsource-connector-example-prereqs"></a>

Seu conector deve ser capaz de acessar a Internet para poder interagir com serviços como os AWS Secrets Manager que estão fora do seu Amazon Virtual Private Cloud. As etapas desta seção ajudam você a concluir as tarefas a seguir para habilitar o acesso à Internet.
+ Configure uma sub-rede pública que hospede um gateway NAT e roteie o tráfego para um gateway da Internet em sua VPC.
+ Crie uma rota padrão que direcione seu tráfego de sub-rede privada para seu gateway NAT.

Para obter mais informações, consulte [Habilitar o acesso à internet para o Amazon MSK Connect](msk-connect-internet-access.md).

**Pré-requisitos**

Antes de habilitar o acesso à Internet, você precisa dos seguintes itens:
+ O ID do Amazon Virtual Private Cloud (VPC) associado ao seu cluster. Por exemplo, *vpc-123456ab*.
+ A IDs das sub-redes privadas em sua VPC. Por exemplo, *subnet-a1b2c3de*, *subnet-f4g5h6ij* etc. Você deve configurar seu conector com sub-redes privadas.

**Para habilitar o acesso à Internet para seu conector**

1. Abra o Amazon Virtual Private Cloud console em [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Crie uma sub-rede pública para seu gateway NAT com um nome descritivo e anote o ID da sub-rede. Para obter instruções detalhadas, consulte [Criar uma sub-rede na VPC](https://docs.aws.amazon.com/vpc/latest/userguide/working-with-vpcs.html#AddaSubnet).

1. Crie um gateway da Internet para que a VPC possa se comunicar com a Internet e anote o ID do gateway. Anexe o gateway da internet à sua VPC. Para obter mais instruções, consulte [Criar e anexar um gateway da Internet à VPC](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Attach_Gateway).

1. Provisione um gateway NAT público para que os hosts em suas sub-redes privadas possam acessar sua sub-rede pública. Ao criar o gateway NAT, selecione a sub-rede pública que você criou anteriormente. Para obter instruções, consulte [Create a NAT gateway](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html#nat-gateway-creating) (Criar um gateway NAT)

1. Configure suas tabelas de rotas. Para concluir essa configuração, você deve ter duas tabelas de rotas no total. Você já deve ter uma tabela de rotas principal criada automaticamente junto com sua VPC. Nesta etapa, você cria uma tabela de rotas adicional para sua sub-rede pública.

   1. Use as configurações a seguir para modificar a tabela de rotas principal da sua VPC para que suas sub-redes privadas roteiem o tráfego para seu gateway NAT. Para obter instruções, consulte [Trabalhar com tabelas de rotas](https://docs.aws.amazon.com/vpc/latest/userguide/WorkWithRouteTables.html) no *Guia do usuário* do *Amazon Virtual Private Cloud*.  
**Tabela de rotas MSKC privado**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/msk/latest/developerguide/mkc-debeziumsource-connector-example-prereqs.html)

   1. Siga as instruções em [Criar uma tabela de rotas personalizada](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Routing) para criar uma tabela de rotas para sua sub-rede pública. Ao criar a tabela, insira um nome descritivo no campo **Tag de nome** para ajudar você a identificar a qual sub-rede a tabela está associada. Por exemplo, **MSKC público**.

   1. Configure sua tabela de rotas **MSKC público** usando as configurações a seguir.  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/msk/latest/developerguide/mkc-debeziumsource-connector-example-prereqs.html)

Agora que habilitou o acesso à Internet para o Amazon MSK Connect, você está pronto para criar um conector.

# Criar um conector de origem Debezium
<a name="msk-connect-debeziumsource-connector-example-steps"></a>

Este procedimento descreve como criar um conector de origem Debezium.

1. 

**Criar um plug-in personalizado**

   1. Baixe o plug-in do conector MySQL para obter a versão estável mais recente no site do [Debezium](https://debezium.io/releases/). Anote a versão do Debezium que você baixou (versão 2.x ou a antiga série 1.x). Você criará um conector com base na sua versão do Debezium mais adiante neste procedimento.

   1. Baixe e extraia o [AWS Secrets Manager Config Provider](https://www.confluent.io/hub/jcustenborder/kafka-config-provider-aws).

   1. Coloque os seguintes arquivos no mesmo diretório:
      + A pasta `debezium-connector-mysql`.
      + A pasta `jcusten-border-kafka-config-provider-aws-0.1.1`.

   1. Compacte em um arquivo ZIP o diretório que você criou na etapa anterior e, em seguida, carregue o arquivo ZIP em um bucket do S3. Para obter instruções, consulte [Upload de objetos](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html) no *Guia do usuário do Amazon S3*.

   1. Copie e cole o JSON a seguir em um arquivo. Por exemplo, .`debezium-source-custom-plugin.json` *<example-custom-plugin-name>*Substitua pelo nome que você deseja que o plug-in tenha, *<amzn-s3-demo-bucket-arn>* pelo ARN do bucket do Amazon S3 em que você fez o upload do arquivo ZIP `<file-key-of-ZIP-object>` e pela chave de arquivo do objeto ZIP que você carregou no S3.

      ```
      {
          "name": "<example-custom-plugin-name>",
          "contentType": "ZIP",
          "location": {
              "s3Location": {
                  "bucketArn": "<amzn-s3-demo-bucket-arn>",
                  "fileKey": "<file-key-of-ZIP-object>"
              }
          }
      }
      ```

   1. Execute o AWS CLI comando a seguir na pasta em que você salvou o arquivo JSON para criar um plug-in.

      ```
      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>
      ```

      Você deve ver uma saída semelhante ao seguinte exemplo.

      ```
      {
          "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1",
          "CustomPluginState": "CREATING",
          "Name": "example-custom-plugin-name",
          "Revision": 1
      }
      ```

   1. Execute o comando a seguir para verificar o estado do plug-in. O estado do cluster deve mudar de `CREATING` para `ACTIVE`. Substitua o espaço reservado de ARN pelo ARN que você obteve na saída do comando anterior.

      ```
      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
      ```

1. 

**Configure AWS Secrets Manager e crie um segredo para suas credenciais de banco de dados**

   1. Abra o console do Secrets Manager em [https://console.aws.amazon.com/secretsmanager/](https://console.aws.amazon.com/secretsmanager/).

   1. Crie um novo segredo para armazenar as credenciais de login do banco de dados. Para obter instruções, consulte [Criar um segredo](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_create-basic-secret.html) no Guia do usuário do *AWS Secrets Manager*.

   1. Copie o ARN do seu segredo.

   1. Adicione as permissões do Secrets Manager do exemplo de política a seguir ao seu [Saiba mais sobre o perfil de execução do serviço](msk-connect-service-execution-role.md). *<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>*Substitua pelo ARN do seu segredo.

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Action": [
              "secretsmanager:GetResourcePolicy",
              "secretsmanager:GetSecretValue",
              "secretsmanager:DescribeSecret",
              "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": [
            "arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234"
            ]
          }
        ]
      }
      ```

------

      Para obter instruções sobre como adicionar permissões do IAM, consulte [Adicionar e remover permissões de identidade do IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html) no *Guia do usuário do IAM*.

1. 

**Criar uma configuração personalizada de operador com informações sobre seu provedor de configuração**

   1. Copie as seguintes propriedades de configuração do operador em um arquivo, substituindo as strings de espaço reservado por valores que correspondam ao seu cenário. Para saber mais sobre as propriedades de configuração do AWS Secrets Manager Config Provider, consulte a [SecretsManagerConfigProvider](https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-config-provider-aws/configProviders/SecretsManagerConfigProvider.html)documentação do plug-in.

      ```
      key.converter=<org.apache.kafka.connect.storage.StringConverter>
      value.converter=<org.apache.kafka.connect.storage.StringConverter>
      config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
      config.providers=secretManager
      config.providers.secretManager.param.aws.region=<us-east-1>
      ```

   1. Execute o AWS CLI comando a seguir para criar sua configuração de trabalhador personalizada. 

      Substitua os valores a seguir:
      + *<my-worker-config-name>*- um nome descritivo para sua configuração de trabalhador personalizada
      + *<encoded-properties-file-content-string>*- uma versão codificada em base64 das propriedades de texto simples que você copiou na etapa anterior

      ```
      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
      ```

1. 

**Criar um conector**

   1. Copie o seguinte JSON que corresponde à sua versão do Debezium (2.x ou 1.x) e cole-o em um novo arquivo. Substitua as strings `<placeholder>` por valores que correspondam ao seu cenário. Para obter mais informações sobre como configurar um perfil de execução de serviços, consulte [Perfis e políticas do IAM para o MSK Connect](msk-connect-iam.md).

      Para especificar as credenciais do banco de dados, a configuração usa variáveis como `${secretManager:MySecret-1234:dbusername}` em vez de texto simples. Substitua `MySecret-1234` pelo nome do seu segredo e inclua o nome da chave que você deseja recuperar. Você também deve substituir `<arn-of-config-provider-worker-configuration>` pelo ARN da sua configuração personalizada de operador.

------
#### [ Debezium 2.x ]

      Para as versões 2.x do Debezium, copie o seguinte JSON e cole-o em um novo arquivo. Substitua as strings *<placeholder>* por valores que correspondam ao seu cenário.

      ```
      {
      	"connectorConfiguration": {
      		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
      		"tasks.max": "1",
      		"database.hostname": "<aurora-database-writer-instance-endpoint>",
      		"database.port": "3306",
      		"database.user": "<${secretManager:MySecret-1234:dbusername}>",
      		"database.password": "<${secretManager:MySecret-1234:dbpassword}>",
      		"database.server.id": "123456",
      		"database.include.list": "<list-of-databases-hosted-by-specified-server>",
      		"topic.prefix": "<logical-name-of-database-server>",
      		"schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>",
      		"schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>",
      		"schema.history.internal.consumer.security.protocol": "SASL_SSL",
      		"schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM",
      		"schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"schema.history.internal.producer.security.protocol": "SASL_SSL",
      		"schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM",
      		"schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"include.schema.changes": "true"
      	},
      	"connectorName": "example-Debezium-source-connector",
      	"kafkaCluster": {
      		"apacheKafkaCluster": {
      			"bootstrapServers": "<cluster-bootstrap-servers-string>",
      			"vpc": {
      				"subnets": [
      					"<cluster-subnet-1>",
      					"<cluster-subnet-2>",
      					"<cluster-subnet-3>"
      				],
      				"securityGroups": ["<id-of-cluster-security-group>"]
      			}
      		}
      	},
      	"capacity": {
      		"provisionedCapacity": {
      			"mcuCount": 2,
      			"workerCount": 1
      		}
      	},
      	"kafkaConnectVersion": "2.7.1",
      	"serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>",
      	"plugins": [{
      		"customPlugin": {
      			"customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>",
      			"revision": 1
      		}
      	}],
      	"kafkaClusterEncryptionInTransit": {
      		"encryptionType": "TLS"
      	},
      	"kafkaClusterClientAuthentication": {
      		"authenticationType": "IAM"
      	},
      	"workerConfiguration": {
      		"workerConfigurationArn": "<arn-of-config-provider-worker-configuration>",
      		"revision": 1
      	}
      }
      ```

------
#### [ Debezium 1.x ]

      Para as versões 1.x do Debezium, copie o seguinte JSON e cole-o em um novo arquivo. Substitua as strings *<placeholder>* por valores que correspondam ao seu cenário.

      ```
      {
      	"connectorConfiguration": {
      		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
      		"tasks.max": "1",
      		"database.hostname": "<aurora-database-writer-instance-endpoint>",
      		"database.port": "3306",
      		"database.user": "<${secretManager:MySecret-1234:dbusername}>",
      		"database.password": "<${secretManager:MySecret-1234:dbpassword}>",
      		"database.server.id": "123456",
      		"database.server.name": "<logical-name-of-database-server>",
      		"database.include.list": "<list-of-databases-hosted-by-specified-server>",
      		"database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>",
      		"database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>",
      		"database.history.consumer.security.protocol": "SASL_SSL",
      		"database.history.consumer.sasl.mechanism": "AWS_MSK_IAM",
      		"database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"database.history.producer.security.protocol": "SASL_SSL",
      		"database.history.producer.sasl.mechanism": "AWS_MSK_IAM",
      		"database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"include.schema.changes": "true"
      	},
      	"connectorName": "example-Debezium-source-connector",
      	"kafkaCluster": {
      		"apacheKafkaCluster": {
      			"bootstrapServers": "<cluster-bootstrap-servers-string>",
      			"vpc": {
      				"subnets": [
      					"<cluster-subnet-1>",
      					"<cluster-subnet-2>",
      					"<cluster-subnet-3>"
      				],
      				"securityGroups": ["<id-of-cluster-security-group>"]
      			}
      		}
      	},
      	"capacity": {
      		"provisionedCapacity": {
      			"mcuCount": 2,
      			"workerCount": 1
      		}
      	},
      	"kafkaConnectVersion": "2.7.1",
      	"serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>",
      	"plugins": [{
      		"customPlugin": {
      			"customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>",
      			"revision": 1
      		}
      	}],
      	"kafkaClusterEncryptionInTransit": {
      		"encryptionType": "TLS"
      	},
      	"kafkaClusterClientAuthentication": {
      		"authenticationType": "IAM"
      	},
      	"workerConfiguration": {
      		"workerConfigurationArn": "<arn-of-config-provider-worker-configuration>",
      		"revision": 1
      	}
      }
      ```

------

   1. Execute o AWS CLI comando a seguir na pasta em que você salvou o arquivo JSON na etapa anterior.

      ```
      aws kafkaconnect create-connector --cli-input-json file://connector-info.json
      ```

      Veja a seguir um exemplo da saída que você vai obter ao executar o comando com êxito.

      ```
      {
          "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", 
          "ConnectorState": "CREATING", 
          "ConnectorName": "example-Debezium-source-connector"
      }
      ```

# Atualizar a configuração do conector Debezium
<a name="mkc-debeziumsource-connector-update"></a>

Para atualizar a configuração do conector Debezium, siga estas etapas: 

1. Copie e cole o JSON a seguir em um novo arquivo. Substitua as strings `<placeholder>` por valores que correspondam ao seu cenário.

   ```
   {
      "connectorArn": <connector_arn>,
      "connectorConfiguration": <new_configuration_in_json>,
      "currentVersion": <current_version>
   }
   ```

1. Execute o AWS CLI comando a seguir na pasta em que você salvou o arquivo JSON na etapa anterior.

   ```
   aws kafkaconnect update-connector --cli-input-json file://connector-info.json
   ```

   Veja a seguir um exemplo da saída que você vai obter ao executar o comando com êxito.

   ```
   {
       "connectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2",
       "connectorOperationArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector-operation/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2/41b6ad56-3184-479b-850a-a8bedd5a02f3",
       "connectorState": "UPDATING"
   }
   ```

1. Agora você pode executar o comando a seguir para monitorar o estado atual da operação:

   ```
   aws kafkaconnect describe-connector-operation --connector-operation-arn <operation_arn>
   ```

Para ver um exemplo de conector Debezium com etapas detalhadas, consulte [Introdução ao Amazon MSK Connect: transmita dados de e para seus clusters do Apache Kafka usando conectores gerenciados](https://aws.amazon.com/blogs/aws/introducing-amazon-msk-connect-stream-data-to-and-from-your-apache-kafka-clusters-using-managed-connectors/).

# Migrar para o Amazon MSK Connect
<a name="msk-connect-migrating"></a>

Esta seção descreve como migrar a aplicação de conector Apache Kafka para o Amazon Managed Streaming para Apache Kafka Connect (Amazon MSK Connect). Para saber mais sobre os benefícios de migrar para o Amazon MSK Connect, consulte [Benefícios de usar o Amazon MSK Connect](msk-connect.md#msk-connect-benefits).

Esta seção também descreve os tópicos de gerenciamento de estado usados pelo Kafka Connect e pelo Amazon MSK Connect e aborda os procedimentos para migrar conectores de origem e de coletor.

# Saiba mais sobre os tópicos internos usados pelo Kafka Connect
<a name="msk-connect-kafka-connect-topics"></a>

Uma aplicação Apache Kafka Connect que está sendo executada no modo distribuído armazena seu estado usando tópicos internos no cluster do Kafka e na associação ao grupo. A seguir estão os valores de configuração que correspondem aos tópicos internos usados nas aplicações do Kafka Connect:
+ Tópico de configuração, especificado por meio de `config.storage.topic`

  No tópico de configuração, o Kafka Connect armazena a configuração de todos os conectores e tarefas que foram iniciados pelos usuários. Sempre que os usuários atualizam a configuração de um conector ou quando um conector solicita uma reconfiguração (por exemplo, o conector detecta que pode iniciar mais tarefas), um registro é emitido para esse tópico. Esse tópico tem compactação habilitada, portanto, ele sempre mantém o último estado de cada entidade.
+ Tópico de deslocamentos, especificado por meio de `offset.storage.topic`

  No tópico de deslocamentos, o Kafka Connect armazena os deslocamentos dos conectores de origem. Assim como o tópico de configuração, o tópico de deslocamentos está habilitado para compactação. Esse tópico é usado para gravar as posições de origem somente para conectores de origem que produzem dados para o Kafka de sistemas externos. Os conectores de coletor, que leem dados do Kafka e os enviam para sistemas externos, armazenam os deslocamentos de consumo usando grupos regulares de consumidores do Kafka.
+ Tópico de status, especificado por meio de `status.storage.topic`

  No tópico de status, o Kafka Connect armazena o estado atual dos conectores e das tarefas. Esse tópico é usado como o local central para os dados que são consultados pelos usuários da API REST. Esse tópico permite que os usuários consultem qualquer operador e ainda obtenham o status de todos os plug-ins em execução. Assim como os tópicos de configuração e deslocamentos, o tópico de status também está habilitado para compactação.

Além desses tópicos, o Kafka Connect faz uso extensivo da API de associação a grupos do Kafka. Os grupos são recebem o nome de acordo com o nome do conector. Por exemplo, para um conector chamado file-sink, o grupo é nomeado. connect-file-sink Cada consumidor do grupo fornece registros para uma única tarefa. Esses grupos e seus deslocamentos podem ser recuperados usando ferramentas regulares de grupos de consumidores, como `Kafka-consumer-group.sh`. Para cada conector de coletor, o runtime do Connect executa um grupo regular de consumidores que extrai registros do Kafka.

# Gerenciamento de estados das aplicações do Amazon MSK Connect
<a name="msk-connect-state-management"></a>

Por padrão, o Amazon MSK Connect cria três tópicos separados no cluster do Kafka para cada conector do Amazon MSK para armazenar a configuração, o deslocamento e o status do conector. Os nomes de tópicos padrão são estruturados da seguinte maneira:
+ *connector-name*\$1\$1msk\$1connect\$1configs\$1 \$1 *connector-id*
+ *connector-name*\$1\$1msk\$1connect\$1status\$1 \$1 *connector-id*
+ *connector-name*\$1\$1msk\$1connect\$1offsets\$1 \$1 *connector-id*

**nota**  
Para fornecer continuidade de deslocamento entre conectores de origem, você pode usar um tópico de deslocamento de armazenamento de sua escolha em vez do tópico padrão. Especificar um tópico de deslocamento de armazenamento ajuda você a realizar tarefas como criar um conector de origem que retoma a leitura desde o último deslocamento de um conector anterior. Para especificar um tópico de deslocamento de armazenamento, forneça um valor para a propriedade [https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-manage-connector-offsets](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-manage-connector-offsets) em sua configuração de operador do Amazon MSK Connect antes de criar um conector.

# Migre conectores de origem para o Amazon MSK Connect
<a name="msk-connect-migrate-source-connectors"></a>

Os conectores de origem são aplicações do Apache Kafka Connect que importam registros de sistemas externos para o Kafka. Esta seção descreve o processo de migração de aplicativos de conectores de origem do Apache Kafka Connect que estão sendo executados localmente ou clusters autogerenciados do Kafka Connect que estão sendo executados no Amazon MSK Connect. AWS 

A aplicação do conector de origem do Kafka Connect armazena deslocamentos em um tópico nomeado com o valor definido para a propriedade de configuração `offset.storage.topic`. A seguir estão exemplos de mensagens de deslocamento para um conector JDBC que está executando duas tarefas que importam dados de duas tabelas diferentes denominadas `movies` e `shows`. A linha mais recente importada da tabela de filmes tem um ID primário de `18343`. A linha mais recente importada da tabela de shows tem um ID primário de `732`.

```
["jdbcsource",{"protocol":"1","table":"sample.movies"}] {"incrementing":18343}
["jdbcsource",{"protocol":"1","table":"sample.shows"}] {"incrementing":732}
```

Para migrar conectores de origem para o Amazon MSK Connect, faça o seguinte:

1. Crie um [plug-in personalizado](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-plugins.html) do Amazon MSK Connect extraindo bibliotecas de conectores do seu cluster do Kafka Connect on-premises ou autogerenciado.

1. Crie [propriedades de operador](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-config-provider.html#msk-connect-config-providers-create-custom-config) do Amazon MSK Connect e defina as propriedades `key.converter`, `value.converter` e `offset.storage.topic` com os mesmos valores estipulados para o conector do Kafka que está sendo executado em seu cluster atual do Kafka Connect.

1. Pause a aplicação do conector no cluster existente fazendo uma solicitação `PUT /connectors/connector-name/pause` no cluster existente do Kafka Connect.

1. Certifique-se de que todas as tarefas da aplicação do conector estejam completamente interrompidas. Você pode interromper as tarefas fazendo uma solicitação `GET /connectors/connector-name/status` no cluster existente do Kafka Connect ou consumindo as mensagens do nome do tópico definido para a propriedade `status.storage.topic`.

1. Obtenha a configuração do conector do cluster existente. Você pode obter a configuração do conector fazendo uma solicitação `GET /connectors/connector-name/config/` no cluster existente ou consumindo as mensagens do nome do tópico definido para a propriedade `config.storage.topic`.

1. Crie um [Amazon MSK Connector](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html) com o mesmo nome de um cluster existente. Crie esse conector usando o plug-in personalizado do conector que você criou na etapa 1, as propriedades do operador que você criou na etapa 2 e a configuração do conector que você extraiu na etapa 5.

1. Quando o status do Amazon MSK Connector estiver `active`, visualize os logs para verificar se o conector começou a importar dados do sistema de origem.

1. Exclua o conector no cluster existente fazendo uma solicitação `DELETE /connectors/connector-name`.

# Migrar conectores de coletor para o Amazon MSK Connect
<a name="msk-connect-migrate-sink-connectors"></a>

Os conectores de coletor são aplicações do Apache Kafka Connect que importam registros de sistemas externos para o Kafka. Esta seção descreve o processo de migração de aplicativos conectores de coletor do Apache Kafka Connect que estão sendo executados localmente ou clusters autogerenciados do Kafka Connect que estão sendo executados no Amazon MSK Connect. AWS 

Os conectores de coletor do Kafka Connect usam a API de associação de grupos do Kafka e armazenam deslocamentos nos mesmos tópicos `__consumer_offset` de uma aplicação de consumo típico. Esse comportamento simplifica a migração do conector de coletor de um cluster autogerenciado para o Amazon MSK Connect.

Para migrar conectores de coletor para o Amazon MSK Connect, faça o seguinte:

1. Crie um [plug-in personalizado](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-plugins.html) do Amazon MSK Connect extraindo bibliotecas de conectores do seu cluster do Kafka Connect on-premises ou autogerenciado.

1. Crie [propriedades de operador](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-config-provider.html#msk-connect-config-providers-create-custom-config) do Amazon MSK Connect e defina as propriedades `key.converter` e `value.converter` com os mesmos valores definidos para o conector do Kafka que está sendo executado no cluster existente do Kafka Connect.

1. Pause a aplicação do conector no cluster existente fazendo uma solicitação `PUT /connectors/connector-name/pause` no cluster existente do Kafka Connect.

1. Certifique-se de que todas as tarefas da aplicação do conector estejam completamente interrompidas. Você pode interromper as tarefas fazendo uma solicitação `GET /connectors/connector-name/status` no cluster existente do Kafka Connect ou consumindo as mensagens do nome do tópico definido para a propriedade `status.storage.topic`.

1. Obtenha a configuração do conector do cluster existente. Você pode obter a configuração do conector fazendo uma solicitação `GET /connectors/connector-name/config` no cluster existente ou consumindo as mensagens do nome do tópico definido para a propriedade `config.storage.topic`.

1. Crie um [Amazon MSK Connector](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html) com o mesmo nome do cluster existente. Crie esse conector usando o plug-in personalizado do conector que você criou na etapa 1, as propriedades do operador que você criou na etapa 2 e a configuração do conector que você extraiu na etapa 5.

1. Quando o status do Amazon MSK Connector estiver `active`, visualize os logs para verificar se o conector começou a importar dados do sistema de origem.

1. Exclua o conector no cluster existente fazendo uma solicitação `DELETE /connectors/connector-name`.

# Solução de problemas no Amazon MSK Connect
<a name="msk-connect-troubleshooting"></a>

As informações a seguir podem ajudar você a solucionar problemas que você pode vir a enfrentar com MSK Connect. Você também pode publicar seu problema no [AWS re:Post](https://repost.aws/).

**O conector não consegue acessar recursos hospedados na Internet pública**  
Consulte [Como habilitar o acesso à Internet para o Amazon MSK Connect](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-internet-access.html).

**O número de tarefas em execução do conector não é igual ao número de tarefas especificado em tasks.max**  
Aqui estão alguns motivos pelos quais um conector pode usar menos tarefas do que o valor especificado na configuração tasks.max:
+ Algumas implementações de conectores limitam o número de tarefas que podem ser usadas. Por exemplo, o conector Debezium para MySQL está limitado ao uso de uma única tarefa.
+ Ao usar o modo de capacidade com escalabilidade automática, o Amazon MSK Connect substitui a propriedade tasks.max de um conector por um valor proporcional ao número de trabalhadores em execução no conector e ao número de por trabalhador. MCUs Se você tiver configurado o `maxAutoscalingTaskCount` parâmetro opcional, o `tasks.max` valor não excederá esse limite. Para obter mais informações, consulte [Compreender a contagem máxima de tarefas de escalonamento automático](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html#msk-connect-max-autoscaling-task-count).
+ Para conectores de coletor, o nível de paralelismo (número de tarefas) não pode ser maior que o número de partições de tópicos. Embora você possa definir tasks.max com um valor maior que esse, uma única partição nunca é processada por mais de uma única tarefa por vez.
+ No Kafka Connect 2.7.x, o atribuidor de partição de consumidor padrão é `RangeAssignor`. O comportamento desse atribuidor é fornecer a primeira partição de cada tópico a um único consumidor, a segunda partição de cada tópico a um único consumidor etc. Isso significa que o número máximo de tarefas ativas usadas por um conector de coletor com `RangeAssignor` é igual ao número máximo de partições em qualquer tópico que esteja sendo consumido. Se isso não funcionar para seu caso de uso, você deve [criar uma configuração de agente](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-create-custom-worker-config) na qual a propriedade `consumer.partition.assignment.strategy` seja definida como um atribuidor de partição de consumidor mais adequado. Consulte [Interface do Kafka 2.7 ConsumerPartitionAssignor: *todas as classes de implementação conhecidas*](https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html).