Debezium-Quell-Konnektor mit Konfigurationsanbieter - Amazon Managed Streaming für Apache Kafka

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Debezium-Quell-Konnektor mit Konfigurationsanbieter

Dieses Beispiel zeigt, wie das Debezium My SQL Connector-Plugin mit einer My SQL -kompatiblen Amazon Aurora Aurora-Datenbank als Quelle verwendet wird. In diesem Beispiel haben wir auch den Open-Source AWS Secrets Manager Config Provider für die Externalisierung von Datenbank-Anmeldeinformationen in AWS Secrets Manager eingerichtet. Weitere Informationen zu Konfigurationsanbietern finden Sie unter Externalisierung vertraulicher Informationen mithilfe von Konfigurationsanbietern.

Wichtig

Das Debezium My SQL Connector-Plugin unterstützt nur eine Aufgabe und funktioniert nicht mit dem automatisch skalierten Kapazitätsmodus für Amazon Connect. MSK Sie sollten stattdessen den Modus Bereitgestellte Kapazität verwenden und in der Konnektor-Konfiguration den Wert workerCount auf Eins festlegen. Weitere Informationen zu den Kapazitätsmodi für MSK Connect finden Sie unterKapazität des Konnektors.

Bevor Sie beginnen

Ihr Connector muss auf das Internet zugreifen können, damit er mit Diensten interagieren kann AWS Secrets Manager , die sich beispielsweise außerhalb Ihres befinden Amazon Virtual Private Cloud. Die Schritte in diesem Abschnitt helfen Ihnen dabei, die folgenden Aufgaben auszuführen, um den Internetzugang zu aktivieren.

  • Richten Sie ein öffentliches Subnetz ein, das ein NAT Gateway hostet und den Datenverkehr an ein Internet-Gateway in Ihrem VPC weiterleitet.

  • Erstellen Sie eine Standardroute, die Ihren privaten Subnetzverkehr an Ihr NAT Gateway weiterleitet.

Weitere Informationen finden Sie unter Aktivieren des Internetzugangs für Amazon MSK Connect.

Voraussetzungen

Bevor Sie den Internetzugang aktivieren können, benötigen Sie die folgenden Elemente:

  • Die ID der Amazon Virtual Private Cloud (VPC), die Ihrem Cluster zugeordnet ist. Zum Beispiel vpc-123456ab.

  • Die IDs der privaten Subnetze in IhremVPC. Zum Beispiel subnet-a1b2c3de, subnet-f4g5h6ij usw. Sie müssen Ihren Konnektor mit privaten Subnetzen konfigurieren.

Internetzugang für Ihren Konnektor aktivieren
  1. Öffnen Sie die Amazon Virtual Private Cloud Konsole unter. https://console.aws.amazon.com/vpc/

  2. Erstellen Sie ein öffentliches Subnetz für Ihr NAT Gateway mit einem aussagekräftigen Namen und notieren Sie sich die Subnetz-ID. Detaillierte Anweisungen finden Sie unter Erstellen Sie ein Subnetz in Ihrem. VPC

  3. Erstellen Sie ein Internet-Gateway, damit Sie mit dem Internet kommunizieren VPC können, und notieren Sie sich die Gateway-ID. Verbinden Sie das Internet-Gateway mit IhremVPC. Anweisungen finden Sie unter Erstellen und Anfügen eines Internet-Gateway.

  4. Stellen Sie ein öffentliches NAT Gateway bereit, damit Hosts in Ihren privaten Subnetzen Ihr öffentliches Subnetz erreichen können. Wenn Sie das NAT Gateway erstellen, wählen Sie das öffentliche Subnetz aus, das Sie zuvor erstellt haben. Anweisungen finden Sie unter NATGateway erstellen.

  5. Konfigurieren Sie Ihre Routing-Tabellen. Sie benötigen insgesamt zwei Routing-Tabellen, um diese Einrichtung abzuschließen. Sie sollten bereits über eine Haupt-Routing-Tabelle verfügen, die automatisch zur gleichen Zeit wie Ihre erstellt wurdeVPC. In diesem Schritt erstellen Sie eine zusätzliche Routing-Tabelle für Ihr öffentliches Subnetz.

    1. Verwenden Sie die folgenden Einstellungen, um Ihre VPC Haupt-Routing-Tabelle so zu ändern, dass Ihre privaten Subnetze den Verkehr an Ihr NAT Gateway weiterleiten. Anweisungen finden Sie im Benutzerhandbuch für Amazon Virtual Private Cloud unter Arbeiten mit Routing-Tabellen.

      Private MSKC Routing-Tabelle
      Eigenschaft Wert
      Namens-Tag Es wird empfohlen, dieser Routing-Tabelle einen aussagekräftigen Namen zu geben, damit Sie sie leichter identifizieren können. Zum Beispiel Privat MSKC.
      Assoziierte Subnetze Ihre privaten Subnetze
      Eine Route, um den Internetzugang für MSK Connect zu aktivieren
      • Ziel: 0.0.0.0/0

      • Ziel: Ihre NAT Gateway-ID. Zum Beispiel nat-12a345bc6789efg1h.

      Еine lokale Route für internen Datenverkehr
      • Ziel: 10.0.0.0/16. Dieser Wert kann je nach Ihrem VPC CIDR Block unterschiedlich sein.

      • Ziel: Lokal

    2. Folgen Sie den Anweisungen unter Erstellen einer benutzerdefinierten Routing-Tabelle, um eine Routing-Tabelle für Ihr öffentliches Subnetz zu erstellen. Geben Sie beim Erstellen der Tabelle einen aussagekräftigen Namen in das Feld Namens-Tag ein, damit Sie leichter erkennen können, mit welchem Subnetz die Tabelle verknüpft ist. Zum Beispiel Öffentlich MSKC.

    3. Konfigurieren Sie Ihre öffentliche MSKC Routing-Tabelle mit den folgenden Einstellungen.

      Eigenschaft Wert
      Namens-Tag Öffentlich MSKC oder ein anderer beschreibender Name, den Sie wählen
      Assoziierte Subnetze Ihr öffentliches Subnetz mit Gateway NAT
      Eine Route, um den Internetzugang für MSK Connect zu aktivieren
      • Ziel: 0.0.0.0/0

      • Ziel: Ihre Internet-Gateway-ID. Zum Beispiel igw-1a234bc5.

      Еine lokale Route für internen Datenverkehr
      • Ziel: 10.0.0.0/16. Dieser Wert kann je nach Ihrem VPC CIDR Block unterschiedlich sein.

      • Ziel: Lokal

Nachdem Sie den Internetzugang für Amazon MSK Connect aktiviert haben, können Sie einen Connector erstellen.

Erstellen eines Debezium-Quell-Konnektors

  1. Ein benutzerdefiniertes Plugin erstellen
    1. Laden Sie das My SQL Connector-Plugin für die neueste stabile Version von der Debezium-Website herunter. Notieren Sie sich die Debezium-Release-Version, die Sie herunterladen (Version 2.x oder die ältere Serie 1.x). Später in diesem Verfahren werden Sie einen Konnektor erstellen, der auf Ihrer Debezium-Version basiert.

    2. Laden Sie den AWS Secrets Manager Config Provider herunter und extrahieren Sie ihn.

    3. Platzieren Sie die folgenden Archive in das gleiche Verzeichnis:

      • Den Ordner debezium-connector-mysql

      • Den Ordner jcusten-border-kafka-config-provider-aws-0.1.1

    4. Komprimieren Sie das Verzeichnis, das Sie im vorherigen Schritt erstellt haben, in eine ZIP Datei und laden Sie die ZIP Datei dann in einen S3-Bucket hoch. Eine Anleitung finden Sie unter Hochladen von Objekten im Amazon-S3-Benutzerhandbuch.

    5. Kopieren Sie den folgenden Text JSON und fügen Sie ihn in eine Datei ein. z. B. debezium-source-custom-plugin.json. Ersetzen <example-custom-plugin-name> mit dem Namen, den das Plugin haben soll, <arn-of-your-s3-bucket> mit dem ARN des S3-Buckets, in den Sie die ZIP Datei hochgeladen haben, und <file-key-of-ZIP-object> mit dem Dateischlüssel des ZIP Objekts, das Sie auf S3 hochgeladen haben.

      { "name": "<example-custom-plugin-name>", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<arn-of-your-s3-bucket>", "fileKey": "<file-key-of-ZIP-object>" } } }
    6. Führen Sie den folgenden AWS CLI Befehl in dem Ordner aus, in dem Sie die JSON Datei gespeichert haben, um ein Plugin zu erstellen.

      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>

      Die Ausgabe sollte in etwa wie folgt aussehen.

      { "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
    7. Führen Sie den folgenden Befehl aus, um den Plugin-Status zu überprüfen. Der Status sollte von CREATING zu ACTIVE wechseln. Ersetzen Sie den ARN Platzhalter durch den PlatzhalterARN, den Sie in der Ausgabe des vorherigen Befehls erhalten haben.

      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
  2. Konfigurieren AWS Secrets Manager und erstellen Sie ein Geheimnis für Ihre Datenbankanmeldedaten
    1. Öffnen Sie die Secrets Manager Manager-Konsole unter https://console.aws.amazon.com/secretsmanager/.

    2. Erstellen Sie ein neues Secret, um Ihre Datenbank-Anmeldeinformationen zu speichern. Anweisungen finden Sie unter Ein Secret erstellen im Benutzerhandbuch für AWS Secrets Manager.

    3. Kopieren Sie Ihre GeheimnisseARN.

    4. Fügen Sie die Secrets-Manager-Berechtigungen aus der folgenden Beispielrichtlinie zu der Service-Ausführungsrolle hinzu. Ersetzen <arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234> mit dem ARN deines Geheimnisses.

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>" ] } ] }

      Anweisungen zum Hinzufügen von IAM Berechtigungen finden Sie unter Hinzufügen und Entfernen von IAM Identitätsberechtigungen im IAMBenutzerhandbuch.

  3. Eine benutzerdefinierte Worker-Konfiguration mit Informationen zu Ihrem Konfigurationsanbieter erstellen
    1. Kopieren Sie die folgenden Eigenschaften der Worker-Konfiguration in eine Datei und ersetzen Sie die Platzhalterzeichenfolgen durch Werte, die Ihrem Szenario entsprechen. Weitere Informationen zu den Konfigurationseigenschaften für den AWS Secrets Manager Config Provider finden Sie SecretsManagerConfigProviderin der Dokumentation des Plugins.

      key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
    2. Führen Sie den folgenden AWS CLI Befehl aus, um Ihre benutzerdefinierte Worker-Konfiguration zu erstellen.

      Ersetzen Sie die folgenden Werte:

      • <my-worker-config-name> - ein beschreibender Name für Ihre benutzerdefinierte Worker-Konfiguration

      • <encoded-properties-file-content-string> - eine Base64-kodierte Version der Klartext-Eigenschaften, die Sie im vorherigen Schritt kopiert haben

      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
  4. Erstellen eines Konnektors
    1. Kopieren Sie das FolgendeJSON, das Ihrer Debezium-Version (2.x oder 1.x) entspricht, und fügen Sie es in eine neue Datei ein. Ersetzen Sie die Zeichenfolge <placeholder> durch Werte, die Ihrem Szenario entsprechen. Informationen zum Einrichten einer Service-Ausführungsrolle finden Sie unter IAM-Rollen und -Richtlinien für MSK Connect.

      Beachten Sie, dass die Konfiguration Variablen wie ${secretManager:MySecret-1234:dbusername} anstelle von Klartext verwendet, um Datenbank-Anmeldeinformationen anzugeben. Ersetzen Sie MySecret-1234 durch den Namen Ihres Secrets und geben Sie dann den Namen des Schlüssels an, den Sie abrufen möchten. Sie müssen es auch durch die Ihrer benutzerdefinierten <arn-of-config-provider-worker-configuration> Worker-Konfiguration ersetzenARN.

      Debezium 2.x

      Kopieren Sie für Debezium 2.x-Versionen Folgendes JSON und fügen Sie es in eine neue Datei ein. Ersetzen Sie die <placeholder> Zeichenketten mit Werten, die Ihrem Szenario entsprechen.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
      Debezium 1.x

      Kopieren Sie für Debezium 1.x-Versionen Folgendes JSON und fügen Sie es in eine neue Datei ein. Ersetzen Sie die <placeholder> Zeichenketten mit Werten, die Ihrem Szenario entsprechen.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.server.name": "<logical-name-of-database-server>", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "AWS_MSK_IAM", "database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "AWS_MSK_IAM", "database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
    2. Führen Sie den folgenden AWS CLI Befehl in dem Ordner aus, in dem Sie die JSON Datei im vorherigen Schritt gespeichert haben.

      aws kafkaconnect create-connector --cli-input-json file://connector-info.json

      Das Folgende ist ein Beispiel für die Ausgabe, die Sie erhalten, wenn Sie den Befehl erfolgreich ausführen.

      { "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }

Ein Beispiel für einen Debezium-Connector mit detaillierten Schritten finden Sie unter Einführung in Amazon MSK Connect — Streamen Sie Daten mithilfe von Managed Connectors zu und von Ihren Apache Kafka-Clustern.