Conector de origem Debezium com provedor de configuração - Amazon Managed Streaming for Apache Kafka

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Conector de origem Debezium com provedor de configuração

Este exemplo mostra como usar o plug-in do conector Debezium para MySQL com um banco de dados Amazon Aurora compatível com MySQL como origem. Neste exemplo, também configuramos o AWS Secrets Manager Config Provider de código aberto para externalizar as credenciais do banco de dados no AWS Secrets Manager. Para saber mais sobre os provedores de configuração, consulte Externalizando informações confidenciais usando provedores de configuração.

Importante

O plug-in do conector Debezium para MySQL é compatível com apenas uma tarefa e não funciona com o modo de capacidade de escalabilidade automática para o Amazon MSK Connect. Em vez disso, você deve usar o modo de capacidade provisionada e definir workerCount igual a um na configuração do conector. Para saber mais sobre os modos de capacidade do MSK Connect, consulte Capacidade do conector.

Antes de começar

Seu conector deve ser capaz de acessar a Internet para poder interagir com serviços como os AWS Secrets Manager que estão fora do seu Amazon Virtual Private Cloud. As etapas desta seção ajudam você a concluir as tarefas a seguir para habilitar o acesso à Internet.

  • Configure uma sub-rede pública que hospede um gateway NAT e roteie o tráfego para um gateway da Internet em sua VPC.

  • Crie uma rota padrão que direcione seu tráfego de sub-rede privada para seu gateway NAT.

Para ter mais informações, consulte Como habilitar o acesso à Internet para o Amazon MSK Connect.

Pré-requisitos

Antes de habilitar o acesso à Internet, você precisa dos seguintes itens:

  • O ID do Amazon Virtual Private Cloud (VPC) associado ao seu cluster. Por exemplo, vpc-123456ab.

  • Os IDs das sub-redes privadas em sua VPC. Por exemplo, subnet-a1b2c3de, subnet-f4g5h6ij etc. Você deve configurar seu conector com sub-redes privadas.

Para habilitar o acesso à Internet para seu conector
  1. Abra o Amazon Virtual Private Cloud console em https://console.aws.amazon.com/vpc/.

  2. Crie uma sub-rede pública para seu gateway NAT com um nome descritivo e anote o ID da sub-rede. Para obter instruções detalhadas, consulte Criar uma sub-rede na VPC.

  3. Crie um gateway da Internet para que a VPC possa se comunicar com a Internet e anote o ID do gateway. Anexe o gateway da internet à sua VPC. Para obter mais instruções, consulte Criar e anexar um gateway da Internet à VPC.

  4. Provisione um gateway NAT público para que os hosts em suas sub-redes privadas possam acessar sua sub-rede pública. Ao criar o gateway NAT, selecione a sub-rede pública que você criou anteriormente. Para obter instruções, consulte Create a NAT gateway (Criar um gateway NAT)

  5. Configure suas tabelas de rotas. Para concluir essa configuração, você deve ter duas tabelas de rotas no total. Você já deve ter uma tabela de rotas principal criada automaticamente junto com sua VPC. Nesta etapa, você cria uma tabela de rotas adicional para sua sub-rede pública.

    1. Use as configurações a seguir para modificar a tabela de rotas principal da sua VPC para que suas sub-redes privadas roteiem o tráfego para seu gateway NAT. Para obter instruções, consulte Trabalhar com tabelas de rotas no Guia do usuário do Amazon Virtual Private Cloud.

      Tabela de rotas MSKC privado
      Propriedade Valor
      Name tag Recomendamos que você atribua uma tag de nome descritivo a essa tabela de rotas para ajudar na identificação dela. Por exemplo, MSKC privado.
      Sub-redes associadas Suas sub-redes privadas
      Uma rota para habilitar o acesso à Internet para o MSK Connect
      • Destino: 0.0.0.0/0

      • Alvo: o ID do seu gateway NAT Por exemplo, nat-12a345bc6789efg1h.

      Uma rota local para o tráfego interno
      • Destino: 10.0.0.0/16 Esse valor pode ser diferente dependendo do bloco CIDR da sua VPC.

      • Alvo: local

    2. Siga as instruções em Criar uma tabela de rotas personalizada para criar uma tabela de rotas para sua sub-rede pública. Ao criar a tabela, insira um nome descritivo no campo Tag de nome para ajudar você a identificar a qual sub-rede a tabela está associada. Por exemplo, MSKC público.

    3. Configure sua tabela de rotas MSKC público usando as configurações a seguir.

      Propriedade Valor
      Name tag MSKC público ou um nome descritivo diferente que você escolher
      Sub-redes associadas Sua sub-rede pública com gateway NAT
      Uma rota para habilitar o acesso à Internet para o MSK Connect
      • Destino: 0.0.0.0/0

      • Alvo: o ID do seu gateway da Internet Por exemplo, igw-1a234bc5.

      Uma rota local para o tráfego interno
      • Destino: 10.0.0.0/16 Esse valor pode ser diferente dependendo do bloco CIDR da sua VPC.

      • Alvo: local

Agora que habilitou o acesso à Internet para o Amazon MSK Connect, você está pronto para criar um conector.

Como criar um conector de origem do Debezium

  1. Criar um plug-in personalizado
    1. Baixe o plug-in do conector MySQL para obter a versão estável mais recente no site do Debezium. Anote a versão do Debezium que você baixou (versão 2.x ou a antiga série 1.x). Você criará um conector com base na sua versão do Debezium mais adiante neste procedimento.

    2. Baixe e extraia o AWS Secrets Manager Config Provider.

    3. Coloque os seguintes arquivos no mesmo diretório:

      • A pasta debezium-connector-mysql.

      • A pasta jcusten-border-kafka-config-provider-aws-0.1.1.

    4. Compacte em um arquivo ZIP o diretório que você criou na etapa anterior e, em seguida, carregue o arquivo ZIP em um bucket do S3. Para obter instruções, consulte Upload de objetos no Guia do usuário do Amazon S3.

    5. Copie e cole o JSON a seguir em um arquivo. Por exemplo, debezium-source-custom-plugin.json. Substitua < example-custom-plugin-name > pelo nome que você deseja que o plug-in tenha, < arn-of-your-s 3-bucket> pelo ARN do bucket do S3 em que você fez o upload do arquivo ZIP e <file-key-of-ZIP-object> pela chave de arquivo do objeto ZIP que você carregou no S3.

      { "name": "<example-custom-plugin-name>", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<arn-of-your-s3-bucket>", "fileKey": "<file-key-of-ZIP-object>" } } }
    6. Execute o AWS CLI comando a seguir na pasta em que você salvou o arquivo JSON para criar um plug-in.

      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>

      Você deve ver uma saída semelhante ao seguinte exemplo.

      { "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
    7. Execute o comando a seguir para verificar o estado do plug-in. O estado do cluster deve mudar de CREATING para ACTIVE. Substitua o espaço reservado de ARN pelo ARN que você obteve na saída do comando anterior.

      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
  2. Configure AWS Secrets Manager e crie um segredo para suas credenciais de banco de dados
    1. Abra o console do Secrets Manager em https://console.aws.amazon.com/secretsmanager/.

    2. Crie um novo segredo para armazenar as credenciais de login do banco de dados. Para obter instruções, consulte Criar um segredo no Guia do usuário do AWS Secrets Manager.

    3. Copie o ARN do seu segredo.

    4. Adicione as permissões do Secrets Manager do exemplo de política a seguir ao seu Perfil de execução do serviço. Substitua <arn:aws:secretsmanager:us-east- 1:123456789000:secret: -1234> pelo ARN do seu segredo. MySecret

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>" ] } ] }

      Para obter instruções sobre como adicionar permissões do IAM, consulte Adicionar e remover permissões de identidade do IAM no Guia do usuário do IAM.

  3. Criar uma configuração personalizada de operador com informações sobre seu provedor de configuração
    1. Copie as seguintes propriedades de configuração do operador em um arquivo, substituindo as strings de espaço reservado por valores que correspondam ao seu cenário. Para saber mais sobre as propriedades de configuração do AWS Secrets Manager Config Provider, consulte a SecretsManagerConfigProviderdocumentação do plug-in.

      key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
    2. Execute o AWS CLI comando a seguir para criar sua configuração de trabalhador personalizada.

      Substitua os valores a seguir:

      • < my-worker-config-name > - um nome descritivo para sua configuração de trabalhador personalizada

      • < encoded-properties-file-content -string> - uma versão codificada em base64 das propriedades de texto simples que você copiou na etapa anterior

      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
  4. Criar um conector
    1. Copie o seguinte JSON que corresponde à sua versão do Debezium (2.x ou 1.x) e cole-o em um novo arquivo. Substitua as strings <placeholder> por valores que correspondam ao seu cenário. Para obter mais informações sobre como configurar um perfil de execução de serviços, consulte Perfis e políticas do IAM para o MSK Connect.

      Para especificar as credenciais do banco de dados, a configuração usa variáveis como ${secretManager:MySecret-1234:dbusername} em vez de texto simples. Substitua MySecret-1234 pelo nome do seu segredo e inclua o nome da chave que você deseja recuperar. Você também deve substituir <arn-of-config-provider-worker-configuration> pelo ARN da sua configuração personalizada de operador.

      Debezium 2.x

      Para as versões 2.x do Debezium, copie o seguinte JSON e cole-o em um novo arquivo. Substitua as strings <placeholder> por valores que correspondam ao seu cenário.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
      Debezium 1.x

      Para as versões 1.x do Debezium, copie o seguinte JSON e cole-o em um novo arquivo. Substitua as strings <placeholder> por valores que correspondam ao seu cenário.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.server.name": "<logical-name-of-database-server>", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "AWS_MSK_IAM", "database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "AWS_MSK_IAM", "database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
    2. Execute o AWS CLI comando a seguir na pasta em que você salvou o arquivo JSON na etapa anterior.

      aws kafkaconnect create-connector --cli-input-json file://connector-info.json

      Veja a seguir um exemplo da saída que você vai obter ao executar o comando com êxito.

      { "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }

Para ver um exemplo de conector Debezium com etapas detalhadas, consulte Introdução ao Amazon MSK Connect: transmita dados de e para seus clusters do Apache Kafka usando conectores gerenciados.