Debezium ソースコネクタ (設定プロバイダー付き) - Amazon Managed Streaming for Apache Kafka

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Debezium ソースコネクタ (設定プロバイダー付き)

この例では、 SQL互換のSQL Amazon Aurora データベースをソースとして Debezium My コネクタプラグインを使用する方法を示します。この例では、 AWS Secrets Managerのデータベースの認証情報を外部化するために、オープンソースの AWS Secrets Manager Config プロバイダーも設定しています。設定プロバイダーの詳細については、「設定プロバイダーを用いた機密情報の外部化」を参照してください。

重要

Debezium MySQL コネクタプラグインは 1 つのタスクのみをサポートし、Amazon MSK Connect の自動スケーリングされたキャパシティモードでは機能しません。代わりにプロビジョニングキャパシティモードを使用し、workerCount をコネクタ設定の値と等しい値に設定してください。MSK Connect の容量モードの詳細については、「」を参照してくださいコネクタ容量

開始する前に

コネクタは、 の AWS Secrets Manager 外部にある などのサービスとやり取りできるように、インターネットにアクセスできる必要があります Amazon Virtual Private Cloud。このセクションの手順は、インターネットアクセスを有効にするための次のタスクを実行するのに役立ちます。

  • NAT ゲートウェイをホストし、 のインターネットゲートウェイにトラフィックをルーティングするパブリックサブネットを設定しますVPC。

  • プライベートサブネットトラフィックをNATゲートウェイに送信するデフォルトルートを作成します。

詳細については、「Amazon MSK Connect のインターネットアクセスを有効にする」を参照してください。

前提条件

インターネットアクセスを有効にするには、以下のものが必要です。

  • クラスターに関連付けられている Amazon Virtual Private Cloud (VPC) の ID。例えば、vpc-123456ab などです。

  • 内のプライベートサブネットIDsの VPC。例えば、subnet-a1b2c3desubnet-f4g5h6ij などです。コネクタにはプライベートサブネットを設定する必要があります。

コネクタのインターネットアクセスを有効にするには
  1. で Amazon Virtual Private Cloud コンソールを開きますhttps://console.aws.amazon.com/vpc/

  2. NAT ゲートウェイのパブリックサブネットをわかりやすい名前で作成し、サブネット ID を書き留めます。詳細な手順については、「 でサブネットを作成するVPC」を参照してください。

  3. がインターネットとVPC通信できるようにインターネットゲートウェイを作成し、ゲートウェイ ID を書き留めます。インターネットゲートウェイを にアタッチしますVPC。手順については、「インターネットゲートウェイの作成とアタッチ」を参照してください。

  4. プライベートサブネットのホストがパブリックサブネットに到達できるように、パブリックNATゲートウェイをプロビジョニングします。NAT ゲートウェイを作成するときに、前に作成したパブリックサブネットを選択します。手順については、NAT「ゲートウェイの作成」を参照してください。

  5. ルートテーブルを設定します。この設定を完了するには、合計で 2 つのルートテーブルが必要です。と同時に自動的に作成されたメインルートテーブルが既にあるはずですVPC。このステップでは、パブリックサブネット用の追加のルートテーブルを作成します。

    1. 以下の設定を使用して、プライベートサブネットVPCがトラフィックをNATゲートウェイにルーティングするように のメインルートテーブルを変更します。手順については、「Amazon Virtual Private Cloudユーザーガイド」の「ルートテーブルの使用」を参照してください。

      プライベートMSKCルートテーブル
      プロパティ
      名前タグ このルートテーブルには、識別しやすいようにわかりやすい名前タグを付けることをお勧めします。例えば、プライベート などMSKCです。
      関連付けられたサブネット プライベートサブネット
      MSK Connect のインターネットアクセスを有効にするルート
      • 送信先: 0.0.0.0/0

      • ターゲット: NATゲートウェイ ID。例えば、nat-12a345bc6789efg1h です。

      内部トラフィックのルート
      • 送信先: 10.0.0.0/16 この値は、 VPCの CIDRブロックによって異なる場合があります。

      • ターゲット: ローカル

    2. カスタムルートテーブルを作成する」の手順に従って、パブリックサブネットのルートテーブルを作成します。テーブルを作成するときは、そのテーブルがどのサブネットに関連付けられているかを識別しやすいように、[名前タグ] フィールドにわかりやすい名前を入力します。例えば、パブリック MSKCなどです。

    3. 次の設定を使用して、パブリックMSKCルートテーブルを設定します。

      プロパティ
      名前タグ パブリックMSKC名または選択した別のわかりやすい名前
      関連付けられたサブネット NAT ゲートウェイを使用するパブリックサブネット
      MSK Connect のインターネットアクセスを有効にするルート
      • 送信先: 0.0.0.0/0

      • ターゲット: インターネットゲートウェイ ID。例えば igw-1a234bc5 です。

      内部トラフィックのルート
      • 送信先: 10.0.0.0/16 この値は、 VPCの CIDRブロックによって異なる場合があります。

      • ターゲット: ローカル

Amazon MSK Connect のインターネットアクセスを有効にしたので、コネクタを作成する準備が整いました。

Debezium ソースコネクタを作成する

  1. カスタムプラグインを作成する
    1. Debezium サイトから最新の安定版リリース用のマイSQLコネクタプラグインをダウンロードします。ダウンロードした Debezium リリースバージョン (バージョン 2.x、または古いシリーズ 1.x) を書き留めます。この手順の後半で、Debezium のバージョンに基づいてコネクタを作成します。

    2. AWS Secrets Manager 設定プロバイダーをダウンロードして解凍します。

    3. 以下のアーカイブを同じディレクトリに置きます。

      • debezium-connector-mysql フォルダ

      • jcusten-border-kafka-config-provider-aws-0.1.1 フォルダ

    4. 前のステップで作成したディレクトリをZIPファイルに圧縮し、ZIPそのファイルを S3 バケットにアップロードします。手順については、「Amazon S3 ユーザーガイド」の「オブジェクトのアップロード」を参照してください。

    5. 以下をコピーJSONしてファイルに貼り付けます。例えば debezium-source-custom-plugin.json です。置換 <example-custom-plugin-name> プラグインに付ける名前、<arn-of-your-s3-bucket> をZIPファイルをアップロードした S3 バケットARNの <file-key-of-ZIP-object>に、 を S3 にアップロードしたZIPオブジェクトのファイルキーに。

      { "name": "<example-custom-plugin-name>", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<arn-of-your-s3-bucket>", "fileKey": "<file-key-of-ZIP-object>" } } }
    6. JSON ファイルを保存したフォルダから次の AWS CLI コマンドを実行して、プラグインを作成します。

      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>

      以下のような出力が表示されます。

      { "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
    7. 次のコマンドを実行して、プラグインの状態を確認します。状態は CREATING から ACTIVE に変わります。ARN プレースホルダーをARN、前のコマンドの出力で取得した に置き換えます。

      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
  2. データベース認証情報のシークレットを設定 AWS Secrets Manager して作成する
    1. で Secrets Manager コンソールを開きますhttps://console.aws.amazon.com/secretsmanager/

    2. データベースのサインイン認証情報を保存する新しいシークレットを作成します。手順については、「AWS Secrets Managerユーザーガイド」の「シークレットを作成する」を参照してください。

    3. シークレットの をコピーしますARN。

    4. 以下のサンプルポリシーの Secrets Manager のアクセス許可を サービス実行のロール に追加します。置換 <arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234> シークレットARNの を使用します。

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>" ] } ] }

      アクセスIAM許可を追加する方法については、「 ユーザーガイド」のIAM「ID アクセス許可の追加と削除IAM」を参照してください。

  3. 設定プロバイダーに関する情報を使用してカスタムワーカー設定を作成します。
    1. 次のワーカー設定プロパティをファイルにコピーして、プレースホルダー文字列をシナリオに対応する値に置き換えます。Secrets Manager Config プロバイダーの設定プロパティ AWS の詳細については、プラグインのドキュメントSecretsManagerConfigProviderの「」を参照してください。

      key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
    2. 次の AWS CLI コマンドを実行して、カスタムワーカー設定を作成します。

      以下の値を置き換えます:

      • <my-worker-config-name> - カスタムワーカー設定のわかりやすい名前

      • <encoded-properties-file-content-string> - 前のステップでコピーしたプレーンテキストプロパティの base64 エンコードバージョン

      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
  4. コネクタを作成する
    1. Debezium のバージョン (2.x または 1.x) JSONに対応する以下をコピーし、新しいファイルに貼り付けます。<placeholder> 文字列をシナリオに対応する値に置き換えます。サービス実行ロールの設定方法については、「MSK Connect の IAM のロールとポリシー」を参照してください。

      この設定では、データベースの認証情報を指定するのにプレーンテキストではなく ${secretManager:MySecret-1234:dbusername} のような変数を使用していることに注意してください。MySecret-1234 をシークレットの名前に置き換えてから、取得したいキーの名前を入力します。また、 をカスタムワーカー設定ARNの <arn-of-config-provider-worker-configuration>に置き換える必要があります。

      Debezium 2.x

      Debezium 2.x バージョンの場合は、以下をコピーJSONして新しいファイルに貼り付けます。を置き換える <placeholder> シナリオに対応する値を持つ文字列。

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
      Debezium 1.x

      Debezium 1.x バージョンの場合は、以下をコピーJSONして新しいファイルに貼り付けます。を置き換える <placeholder> シナリオに対応する値を持つ文字列。

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.server.name": "<logical-name-of-database-server>", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "AWS_MSK_IAM", "database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "AWS_MSK_IAM", "database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
    2. 前のステップでJSONファイルを保存したフォルダで、次の AWS CLI コマンドを実行します。

      aws kafkaconnect create-connector --cli-input-json file://connector-info.json

      以下は、コマンドを正常に実行したときに得られる出力の例です。

      { "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }

詳細な手順を含む Debezium コネクタの例については、「 マネージドコネクタを使用して Apache Kafka クラスターとの間でデータをMSKストリーミングする」を参照してください。