Connecteur source Debezium avec fournisseur de configuration - Amazon Managed Streaming for Apache Kafka

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Connecteur source Debezium avec fournisseur de configuration

Cet exemple montre comment utiliser le plugin Debezium My SQL connector avec une base de données Amazon SQL Aurora compatible My comme source. Dans cet exemple, nous avons également configuré le fournisseur de configuration AWS Secrets Manager open source pour externaliser les informations d'identification de la base de données dans AWS Secrets Manager. Pour en savoir plus sur les fournisseurs de configuration, consultez Externalisation d'informations sensibles à l'aide des fournisseurs de configuration.

Important

Le plugin Debezium My SQL connector ne prend en charge qu'une seule tâche et ne fonctionne pas avec le mode de capacité autoscalé pour Amazon Connect. MSK Vous devez plutôt utiliser le mode de capacité provisionné et définir la valeur de workerCount égale à un dans la configuration de votre connecteur. Pour en savoir plus sur les modes de capacité de MSK Connect, consultezCapacité du connecteur.

Avant de commencer

Votre connecteur doit être en mesure d'accéder à Internet afin de pouvoir interagir avec des services tels AWS Secrets Manager que ceux situés en dehors du vôtre Amazon Virtual Private Cloud. Les étapes décrites dans cette section vous aident à effectuer les tâches suivantes pour activer l'accès à Internet.

  • Configurez un sous-réseau public qui héberge une NAT passerelle et achemine le trafic vers une passerelle Internet de votreVPC.

  • Créez un itinéraire par défaut qui dirige le trafic de votre sous-réseau privé vers votre NAT passerelle.

Pour plus d’informations, consultez Activation de l'accès à Internet pour Amazon MSK Connect.

Prérequis

Avant de pouvoir activer l'accès à Internet, vous devez disposer des éléments suivants :

  • L'ID du Amazon Virtual Private Cloud (VPC) associé à votre cluster. Par exemple, vpc-123456ab.

  • Les IDs sous-réseaux privés de votreVPC. Par exemple, subnet-a1b2c3de, subnet-f4g5h6ij, etc. Vous devez configurer votre connecteur avec des sous-réseaux privés.

Pour activer l'accès à Internet pour votre connecteur
  1. Ouvrez la Amazon Virtual Private Cloud console à l'adresse https://console.aws.amazon.com/vpc/.

  2. Créez un sous-réseau public pour votre NAT passerelle avec un nom descriptif et notez l'ID du sous-réseau. Pour obtenir des instructions détaillées, consultez la section Créer un sous-réseau dans votre VPC.

  3. Créez une passerelle Internet afin de VPC pouvoir communiquer avec Internet et notez l'ID de la passerelle. Connectez la passerelle Internet à votreVPC. Pour de plus amples informations, consultez Créer et attacher une passerelle Internet.

  4. Mettez en place une NAT passerelle publique afin que les hôtes de vos sous-réseaux privés puissent accéder à votre sous-réseau public. Lorsque vous créez la NAT passerelle, sélectionnez le sous-réseau public que vous avez créé précédemment. Pour obtenir des instructions, reportez-vous à la section Création d'une NAT passerelle.

  5. Configurez vos tables de routage. Vous devez disposer de deux tables de routage au total pour terminer cette configuration. Vous devriez déjà avoir une table de routage principale créée automatiquement en même temps que votreVPC. Au cours de cette étape, vous créez une table de routage supplémentaire pour votre sous-réseau public.

    1. Utilisez les paramètres suivants pour modifier votre table VPC de routage principale afin que vos sous-réseaux privés acheminent le trafic vers votre NAT passerelle. Pour obtenir des informations, consultez la section Utiliser des tables de routage dans le Guide de l'utilisateur Amazon Virtual Private Cloud.

      Table de MSKC routage privée
      Propriété Valeur
      Identification de nom Nous vous recommandons de donner à cette table de routage un nom descriptif pour vous aider à l'identifier. Par exemple, Privé MSKC.
      Sous-réseaux associés Vos sous-réseaux privés
      Un itinéraire pour permettre l'accès à Internet pour MSK Connect
      • Destination : 0.0.0.0/0

      • Cible : votre identifiant de NAT passerelle. Par exemple, nat-12a345bc6789efg1h.

      Une route pour l'ensemble du trafic local
      • Destination : 10.0.0.0/16. Cette valeur peut varier en fonction VPC de votre CIDR bloc.

      • Cible : locale

    2. Suivez les instructions de la section Créer une table de routage personnalisée pour créer une table de routage pour votre sous-réseau public. Lorsque vous créez la table, entrez un nom descriptif dans le champ Identification de nom pour vous aider à identifier le sous-réseau auquel la table est associée. Par exemple, Public MSKC.

    3. Configurez votre table de MSKC routage publique à l'aide des paramètres suivants.

      Propriété Valeur
      Identification de nom Nom public MSKC ou descriptif différent que vous choisissez
      Sous-réseaux associés Votre sous-réseau public avec passerelle NAT
      Un itinéraire pour permettre l'accès à Internet pour MSK Connect
      • Destination : 0.0.0.0/0

      • Cible : votre identifiant de passerelle Internet. Par exemple, igw-1a234bc5.

      Une route pour l'ensemble du trafic local
      • Destination : 10.0.0.0/16. Cette valeur peut varier en fonction VPC de votre CIDR bloc.

      • Cible : locale

Maintenant que vous avez activé l'accès à Internet pour Amazon MSK Connect, vous êtes prêt à créer un connecteur.

Création d'un connecteur source Debezium

  1. Créez un plugin personnalisé
    1. Téléchargez le plugin My SQL connector pour la dernière version stable sur le site de Debezium. Notez la version de Debezium que vous téléchargez (version 2.x ou ancienne série 1.x). Vous allez créer ultérieurement un connecteur basé sur votre version de Debezium.

    2. Téléchargez et extrayez le fournisseur de configuration AWS Secrets Manager.

    3. Placez les archives suivantes dans le même répertoire :

      • Le dossier debezium-connector-mysql

      • Le dossier jcusten-border-kafka-config-provider-aws-0.1.1

    4. Compressez le répertoire que vous avez créé à l'étape précédente dans un ZIP fichier, puis téléchargez le ZIP fichier dans un compartiment S3. Pour obtenir des informations, consultez Chargement d'objets dans le Guide de l'utilisateur Amazon S3.

    5. Copiez ce qui suit JSON et collez-le dans un fichier. Par exemple, debezium-source-custom-plugin.json. Remplacez <example-custom-plugin-name> avec le nom que vous souhaitez donner au plugin, <arn-of-your-s3-bucket> avec le ARN compartiment S3 dans lequel vous avez chargé le ZIP fichier et <file-key-of-ZIP-object> avec la clé de fichier de l'ZIPobjet que vous avez chargé sur S3.

      { "name": "<example-custom-plugin-name>", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<arn-of-your-s3-bucket>", "fileKey": "<file-key-of-ZIP-object>" } } }
    6. Exécutez la AWS CLI commande suivante depuis le dossier dans lequel vous avez enregistré le JSON fichier pour créer un plugin.

      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>

      Vous devez voir un résultat similaire à ce qui suit.

      { "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
    7. Exécutez la commande suivante pour vérifier le statut du plugin. Le statut doit passer de CREATING à ACTIVE. Remplacez l'ARNespace réservé par celui ARN que vous avez obtenu dans le résultat de la commande précédente.

      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
  2. Configurer AWS Secrets Manager et créer un secret pour les informations d'identification de votre base de données
    1. Ouvrez la console Secrets Manager à l'adresse https://console.aws.amazon.com/secretsmanager/.

    2. Créez un nouveau secret pour stocker les informations d'identification de connexion à votre base de données. Pour obtenir des informations, consultez la section Créer un secret dans le Guide de l'utilisateur AWS Secrets Manager.

    3. Copiez votre secretARN.

    4. Ajoutez les autorisations Secrets Manager de l'exemple de politique suivant à votre Rôles d'exécution du service. Remplacez <arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234> avec la ARN révélation de ton secret.

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>" ] } ] }

      Pour savoir comment ajouter des IAM autorisations, consultez la section Ajouter et supprimer des autorisations IAM d'identité dans le Guide de IAM l'utilisateur.

  3. Créez une configuration de worker personnalisée avec des informations sur votre fournisseur de configuration
    1. Copiez les propriétés de configuration du worker suivantes dans un fichier, en remplaçant les chaînes d'espace réservé par des valeurs correspondant à votre scénario. Pour en savoir plus sur les propriétés de configuration du fournisseur de configuration AWS Secrets Manager, consultez SecretsManagerConfigProviderla documentation du plugin.

      key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
    2. Exécutez la AWS CLI commande suivante pour créer votre configuration de travail personnalisée.

      Remplacez les valeurs suivantes :

      • <my-worker-config-name> - un nom descriptif pour votre configuration de travail personnalisée

      • <encoded-properties-file-content-string> - une version codée en base64 des propriétés en texte brut que vous avez copiées à l'étape précédente

      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
  4. Créez un connecteur
    1. Copiez JSON ce qui suit correspond à votre version de Debezium (2.x ou 1.x) et collez-le dans un nouveau fichier. Remplacez les chaînes <placeholder> par des valeurs correspondant à votre scénario. Pour plus d'informations sur la configuration d'un rôle d'exécution de service, consultez Rôles et politiques IAM pour MSK Connect.

      Notez que la configuration utilise des variables comme ${secretManager:MySecret-1234:dbusername} plutôt que du texte brut pour spécifier les informations d'identification de la base de données. Remplacez MySecret-1234 par le nom de votre secret, puis indiquez le nom de la clé que vous souhaitez récupérer. Vous devez également remplacer <arn-of-config-provider-worker-configuration> par la configuration ARN de votre programme de travail personnalisé.

      Debezium 2.x

      Pour les versions 2.x de Debezium, copiez ce qui suit JSON et collez-le dans un nouveau fichier. Remplacez le <placeholder> des chaînes dont les valeurs correspondent à votre scénario.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
      Debezium 1.x

      Pour les versions 1.x de Debezium, copiez ce qui suit JSON et collez-le dans un nouveau fichier. Remplacez le <placeholder> des chaînes dont les valeurs correspondent à votre scénario.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.server.name": "<logical-name-of-database-server>", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "AWS_MSK_IAM", "database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "AWS_MSK_IAM", "database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
    2. Exécutez la AWS CLI commande suivante dans le dossier où vous avez enregistré le JSON fichier à l'étape précédente.

      aws kafkaconnect create-connector --cli-input-json file://connector-info.json

      Voici un exemple du résultat que vous obtenez lorsque vous exécutez la commande.

      { "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }

Pour un exemple de connecteur Debezium avec des étapes détaillées, consultez Présentation d'Amazon MSK Connect - Diffusez des données vers et depuis vos clusters Apache Kafka à l'aide de connecteurs gérés.