Konektor sumber Debezium dengan penyedia konfigurasi - Amazon Managed Streaming untuk Apache Kafka

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Konektor sumber Debezium dengan penyedia konfigurasi

Contoh ini menunjukkan cara menggunakan plugin SQL konektor Debezium My dengan database Amazon SQL Aurora yang kompatibel dengan Saya sebagai sumbernya. Dalam contoh ini, kami juga menyiapkan Penyedia Config AWS Secrets Manager open-source untuk mengeksternalisasi kredenal database di. AWS Secrets Manager Untuk mempelajari lebih lanjut tentang penyedia konfigurasi, lihatEksternalisasi informasi sensitif menggunakan penyedia konfigurasi.

penting

Plugin SQL konektor Debezium My hanya mendukung satu tugas dan tidak berfungsi dengan mode kapasitas berskala otomatis untuk Amazon Connect. MSK Sebagai gantinya, Anda harus menggunakan mode kapasitas yang disediakan dan menyetel workerCount sama dengan satu di konfigurasi konektor Anda. Untuk mempelajari lebih lanjut tentang mode kapasitas MSK Connect, lihatKapasitas konektor.

Sebelum Anda mulai

Konektor Anda harus dapat mengakses internet sehingga dapat berinteraksi dengan layanan seperti AWS Secrets Manager yang berada di luar Anda Amazon Virtual Private Cloud. Langkah-langkah di bagian ini membantu Anda menyelesaikan tugas-tugas berikut untuk mengaktifkan akses internet.

  • Siapkan subnet publik yang menjadi tuan rumah NAT gateway dan rute lalu lintas ke gateway internet di AndaVPC.

  • Buat rute default yang mengarahkan lalu lintas subnet pribadi Anda ke gateway AndaNAT.

Untuk informasi selengkapnya, lihat Mengaktifkan akses internet untuk Amazon MSK Connect.

Prasyarat

Sebelum Anda dapat mengaktifkan akses internet, Anda memerlukan item berikut:

  • ID dari Amazon Virtual Private Cloud (VPC) yang terkait dengan cluster Anda. Misalnya, vpc-123456ab.

  • Subnet pribadi di AndaVPC. IDs Misalnya, subnet-a1b2c3de, subnet-f4g5h6ij, dll. Anda harus mengkonfigurasi konektor Anda dengan subnet pribadi.

Untuk mengaktifkan akses internet untuk konektor Anda
  1. Buka Amazon Virtual Private Cloud konsol di https://console.aws.amazon.com/vpc/.

  2. Buat subnet publik untuk NAT gateway Anda dengan nama deskriptif, dan catat ID subnet. Untuk petunjuk terperinci, lihat Membuat subnet di bagian Anda VPC.

  3. Buat gateway internet sehingga Anda VPC dapat berkomunikasi dengan internet, dan catat ID gateway. Lampirkan gateway internet ke AndaVPC. Untuk petunjuk, lihat Membuat dan melampirkan gateway internet.

  4. Menyediakan NAT gateway publik sehingga host di subnet pribadi Anda dapat mencapai subnet publik Anda. Saat Anda membuat NAT gateway, pilih subnet publik yang Anda buat sebelumnya. Untuk petunjuk, lihat Membuat NAT gateway.

  5. Konfigurasikan tabel rute Anda. Anda harus memiliki dua tabel rute secara total untuk menyelesaikan pengaturan ini. Anda seharusnya sudah memiliki tabel rute utama yang secara otomatis dibuat pada saat yang sama dengan AndaVPC. Pada langkah ini Anda membuat tabel rute tambahan untuk subnet publik Anda.

    1. Gunakan pengaturan berikut untuk memodifikasi tabel rute utama Anda VPC sehingga subnet pribadi Anda merutekan lalu lintas ke NAT gateway Anda. Untuk petunjuk, lihat Bekerja dengan tabel rute di Panduan Amazon Virtual Private CloudPengguna.

      Tabel MSKC rute pribadi
      Properti Nilai
      Tag nama Kami menyarankan Anda memberikan tabel rute ini tag nama deskriptif untuk membantu Anda mengidentifikasinya. Misalnya, Private MSKC.
      Subnet terkait Subnet pribadi Anda
      Rute untuk mengaktifkan akses internet untuk MSK Connect
      • Tujuan: 0.0.0.0/0

      • Target: ID NAT gateway Anda. Misalnya, nat-12a345bc6789efg1h.

      Rute lokal untuk lalu lintas internal
      • Tujuan: 10.0.0.0/16. Nilai ini mungkin berbeda tergantung pada CIDR blok AndaVPC.

      • Target: Lokal

    2. Ikuti petunjuk di Buat tabel rute kustom untuk membuat tabel rute untuk subnet publik Anda. Saat Anda membuat tabel, masukkan nama deskriptif di kolom Tag nama untuk membantu Anda mengidentifikasi subnet mana yang terkait dengan tabel tersebut. Misalnya, Publik MSKC.

    3. Konfigurasikan tabel MSKC rute Publik Anda menggunakan pengaturan berikut.

      Properti Nilai
      Tag nama Publik MSKC atau nama deskriptif berbeda yang Anda pilih
      Subnet terkait Subnet publik Anda dengan gateway NAT
      Rute untuk mengaktifkan akses internet untuk MSK Connect
      • Tujuan: 0.0.0.0/0

      • Target: ID gateway internet Anda. Misalnya, igw-1a234bc5.

      Rute lokal untuk lalu lintas internal
      • Tujuan: 10.0.0.0/16. Nilai ini mungkin berbeda tergantung pada CIDR blok AndaVPC.

      • Target: Lokal

Sekarang setelah Anda mengaktifkan akses internet untuk Amazon MSK Connect, Anda siap membuat konektor.

Membuat konektor sumber Debezium

  1. Buat plugin khusus
    1. Unduh plugin SQL Konektor saya untuk rilis stabil terbaru dari situs Debezium. Catat versi rilis Debezium yang Anda unduh (versi 2.x, atau seri 1.x yang lebih lama). Kemudian dalam prosedur ini, Anda akan membuat konektor berdasarkan versi Debezium Anda.

    2. Unduh dan ekstrak Penyedia Config AWS Secrets Manager.

    3. Tempatkan arsip berikut ke dalam direktori yang sama:

      • debezium-connector-mysqlFolder

      • jcusten-border-kafka-config-provider-aws-0.1.1Folder

    4. Kompres direktori yang Anda buat pada langkah sebelumnya ke dalam ZIP file dan kemudian unggah ZIP file ke bucket S3. Untuk petunjuk, lihat Mengunggah objek di Panduan Pengguna Amazon S3.

    5. Salin berikut ini JSON dan tempel dalam file. Misalnya, debezium-source-custom-plugin.json. Ganti <example-custom-plugin-name> dengan nama yang Anda inginkan plugin untuk memiliki, <arn-of-your-s3-bucket> dengan bucket S3 tempat Anda mengunggah ZIP file, dan <file-key-of-ZIP-object> dengan kunci file ZIP objek yang Anda unggah ke S3. ARN

      { "name": "<example-custom-plugin-name>", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<arn-of-your-s3-bucket>", "fileKey": "<file-key-of-ZIP-object>" } } }
    6. Jalankan AWS CLI perintah berikut dari folder tempat Anda menyimpan JSON file untuk membuat plugin.

      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>

      Anda akan melihat output yang mirip dengan contoh berikut.

      { "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
    7. Jalankan perintah berikut untuk memeriksa status plugin. Negara harus berubah dari CREATING keACTIVE. Ganti ARN placeholder dengan ARN yang Anda dapatkan di output dari perintah sebelumnya.

      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
  2. Konfigurasikan AWS Secrets Manager dan buat rahasia untuk kredensi database Anda
    1. Buka konsol Secrets Manager di https://console.aws.amazon.com/secretsmanager/.

    2. Buat rahasia baru untuk menyimpan kredensi login database Anda. Untuk petunjuk, lihat Membuat rahasia di Panduan AWS Secrets ManagerPengguna.

    3. Salin rahasia AndaARN.

    4. Tambahkan izin Secrets Manager dari kebijakan contoh berikut ke kebijakan AndaPeran eksekusi layanan. Ganti <arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234> dengan ARN rahasiamu.

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>" ] } ] }

      Untuk petunjuk tentang cara menambahkan IAM izin, lihat Menambahkan dan menghapus izin IAM identitas di IAMPanduan Pengguna.

  3. Buat konfigurasi pekerja khusus dengan informasi tentang penyedia konfigurasi Anda
    1. Salin properti konfigurasi pekerja berikut ke dalam file, ganti string placeholder dengan nilai yang sesuai dengan skenario Anda. Untuk mempelajari lebih lanjut tentang properti konfigurasi untuk Penyedia Config AWS Secrets Manager, lihat SecretsManagerConfigProviderdi dokumentasi plugin.

      key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
    2. Jalankan AWS CLI perintah berikut untuk membuat konfigurasi pekerja kustom Anda.

      Ganti nilai-nilai berikut:

      • <my-worker-config-name> - nama deskriptif untuk konfigurasi pekerja kustom Anda

      • <encoded-properties-file-content-string> - versi base64 yang dikodekan dari properti plaintext yang Anda salin pada langkah sebelumnya

      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
  4. Buat konektor
    1. Salin berikut ini JSON yang sesuai dengan versi Debezium Anda (2.x atau 1.x) dan tempel di file baru. Ganti <placeholder> string dengan nilai yang sesuai dengan skenario Anda. Untuk informasi tentang cara menyiapkan peran eksekusi layanan, lihatPeran dan kebijakan IAM untuk MSK Connect.

      Perhatikan bahwa konfigurasi menggunakan variabel seperti ${secretManager:MySecret-1234:dbusername} bukan plaintext untuk menentukan kredenal database. Ganti MySecret-1234 dengan nama rahasia Anda dan kemudian sertakan nama kunci yang ingin Anda ambil. Anda juga harus mengganti <arn-of-config-provider-worker-configuration> dengan konfigurasi pekerja kustom Anda. ARN

      Debezium 2.x

      Untuk versi Debezium 2.x, salin berikut ini JSON dan tempel di file baru. Ganti <placeholder> string dengan nilai yang sesuai dengan skenario Anda.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
      Debezium 1.x

      Untuk versi Debezium 1.x, salin yang berikut ini JSON dan tempel di file baru. Ganti <placeholder> string dengan nilai yang sesuai dengan skenario Anda.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.server.name": "<logical-name-of-database-server>", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "AWS_MSK_IAM", "database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "AWS_MSK_IAM", "database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
    2. Jalankan AWS CLI perintah berikut di folder tempat Anda menyimpan JSON file di langkah sebelumnya.

      aws kafkaconnect create-connector --cli-input-json file://connector-info.json

      Berikut ini adalah contoh output yang Anda dapatkan ketika Anda menjalankan perintah dengan sukses.

      { "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }

Untuk contoh konektor Debezium dengan langkah-langkah terperinci, lihat Memperkenalkan Amazon MSK Connect - Streaming Data ke dan dari Cluster Apache Kafka Anda Menggunakan Konektor Terkelola.