Pekerja - Amazon Managed Streaming untuk Apache Kafka

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Pekerja

Seorang pekerja adalah proses mesin virtual Java (JVM) yang menjalankan logika konektor. Setiap pekerja membuat serangkaian tugas yang berjalan di thread paralel dan melakukan pekerjaan menyalin data. Tugas tidak menyimpan status, dan karenanya dapat dimulai, dihentikan, atau dimulai ulang kapan saja untuk menyediakan pipeline data yang tangguh dan dapat diskalakan. Perubahan jumlah pekerja, baik karena peristiwa penskalaan atau karena kegagalan yang tidak terduga, secara otomatis terdeteksi oleh pekerja yang tersisa. Mereka berkoordinasi untuk menyeimbangkan kembali tugas di seluruh set pekerja yang tersisa. Connect workers menggunakan kelompok konsumen Apache Kafka untuk berkoordinasi dan menyeimbangkan kembali.

Jika persyaratan kapasitas konektor bervariasi atau sulit diperkirakan, Anda dapat membiarkan MSK Connect menskalakan jumlah pekerja sesuai kebutuhan antara batas bawah dan batas atas yang Anda tentukan. Atau, Anda dapat menentukan jumlah pasti pekerja yang ingin Anda jalankan logika konektor Anda. Untuk informasi selengkapnya, lihat Kapasitas konektor.

MSKConnect worker menggunakan alamat IP

MSKConnect workers menggunakan alamat IP di subnet yang disediakan pelanggan. Setiap pekerja menggunakan satu alamat IP dari salah satu subnet yang disediakan pelanggan. Anda harus memastikan bahwa Anda memiliki cukup alamat IP yang tersedia di subnet yang disediakan untuk CreateConnector permintaan untuk memperhitungkan kapasitas yang ditentukan, terutama ketika konektor penskalaan otomatis di mana jumlah pekerja dapat berfluktuasi.

Konfigurasi pekerja default

MSKConnect menyediakan konfigurasi pekerja default berikut:

key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter

Properti konfigurasi pekerja yang didukung

MSKConnect menyediakan konfigurasi pekerja default. Anda juga memiliki opsi untuk membuat konfigurasi pekerja khusus untuk digunakan dengan konektor Anda. Daftar berikut mencakup informasi tentang properti konfigurasi pekerja yang didukung atau tidak didukung Amazon MSK Connect.

  • value.converterProperti key.converter dan diperlukan.

  • MSKConnect mendukung properti producer. konfigurasi berikut.

    producer.acks producer.batch.size producer.buffer.memory producer.compression.type producer.enable.idempotence producer.key.serializer producer.linger.ms producer.max.request.size producer.metadata.max.age.ms producer.metadata.max.idle.ms producer.partitioner.class producer.reconnect.backoff.max.ms producer.reconnect.backoff.ms producer.request.timeout.ms producer.retry.backoff.ms producer.value.serializer
  • MSKConnect mendukung properti consumer. konfigurasi berikut.

    consumer.allow.auto.create.topics consumer.auto.offset.reset consumer.check.crcs consumer.fetch.max.bytes consumer.fetch.max.wait.ms consumer.fetch.min.bytes consumer.heartbeat.interval.ms consumer.key.deserializer consumer.max.partition.fetch.bytes consumer.max.poll.records consumer.metadata.max.age.ms consumer.partition.assignment.strategy consumer.reconnect.backoff.max.ms consumer.reconnect.backoff.ms consumer.request.timeout.ms consumer.retry.backoff.ms consumer.session.timeout.ms consumer.value.deserializer
  • Semua properti konfigurasi lain yang tidak dimulai dengan consumer. awalan producer. atau didukung kecuali untuk properti berikut.

    access.control. admin. admin.listeners.https. client. connect. inter.worker. internal. listeners.https. metrics. metrics.context. rest. sasl. security. socket. ssl. topic.tracking. worker. bootstrap.servers config.storage.topic connections.max.idle.ms connector.client.config.override.policy group.id listeners metric.reporters plugin.path receive.buffer.bytes response.http.headers.config scheduled.rebalance.max.delay.ms send.buffer.bytes status.storage.topic

Untuk informasi selengkapnya tentang properti konfigurasi pekerja dan apa yang diwakilinya, lihat Konfigurasi Kafka Connect di dokumentasi Apache Kafka.

Membuat konfigurasi pekerja khusus

Membuat konfigurasi pekerja khusus menggunakan AWS Management Console
  1. Buka MSK konsol Amazon dihttps://console.aws.amazon.com/msk/.

  2. Di panel kiri, di bawah MSKConnect, pilih konfigurasi Worker.

  3. Pilih Buat konfigurasi pekerja.

  4. Masukkan nama dan deskripsi opsional, lalu tambahkan properti dan nilai yang ingin Anda atur.

  5. Pilih Buat konfigurasi pekerja.

Untuk menggunakan MSK Connect API untuk membuat konfigurasi pekerja, lihat CreateWorkerConfiguration.

Mengelola offset konektor sumber menggunakan offset.storage.topic

Bagian ini memberikan informasi untuk membantu Anda mengelola offset konektor sumber menggunakan topik penyimpanan offset. Topik penyimpanan offset adalah topik internal yang digunakan Kafka Connect untuk menyimpan konektor dan offset konfigurasi tugas.

Menggunakan topik penyimpanan offset default

Secara default, Amazon MSK Connect menghasilkan topik penyimpanan offset baru di cluster Kafka Anda untuk setiap konektor yang Anda buat. MSKmembangun nama topik default menggunakan bagian konektorARN. Misalnya, __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2.

Menentukan topik penyimpanan offset Anda sendiri

Untuk memberikan kontinuitas offset antara konektor sumber, Anda dapat menggunakan topik penyimpanan offset pilihan Anda alih-alih topik default. Menentukan topik penyimpanan offset membantu Anda menyelesaikan tugas seperti membuat konektor sumber yang melanjutkan pembacaan dari offset terakhir konektor sebelumnya.

Untuk menentukan topik penyimpanan offset, Anda memberikan nilai untuk offset.storage.topic properti dalam konfigurasi pekerja sebelum membuat konektor. Jika Anda ingin menggunakan kembali topik penyimpanan offset untuk menggunakan offset dari konektor yang dibuat sebelumnya, Anda harus memberi konektor baru nama yang sama dengan konektor lama. Jika Anda membuat topik penyimpanan offset kustom, Anda harus menyetel cleanup.policyke compact dalam konfigurasi topik Anda.

catatan

Jika Anda menentukan topik penyimpanan offset saat membuat konektor sink, MSK Connect akan membuat topik jika belum ada. Namun, topik tersebut tidak akan digunakan untuk menyimpan offset konektor.

Offset konektor sink malah dikelola menggunakan protokol grup konsumen Kafka. Setiap konektor wastafel membuat grup bernamaconnect-{CONNECTOR_NAME}. Selama grup konsumen ada, konektor wastafel berturut-turut yang Anda buat dengan CONNECTOR_NAME nilai yang sama akan berlanjut dari offset komitmen terakhir.

contoh : Menentukan topik penyimpanan offset untuk membuat ulang konektor sumber dengan konfigurasi yang diperbarui

Misalkan Anda memiliki konektor change data capture (CDC) dan Anda ingin memodifikasi konfigurasi konektor tanpa kehilangan tempat Anda di CDC aliran. Anda tidak dapat memperbarui konfigurasi konektor yang ada, tetapi Anda dapat menghapus konektor dan membuat yang baru dengan nama yang sama. Untuk memberi tahu konektor baru di mana harus mulai membaca di CDC aliran, Anda dapat menentukan topik penyimpanan offset konektor lama dalam konfigurasi pekerja Anda. Langkah-langkah berikut menunjukkan bagaimana menyelesaikan tugas ini.

  1. Pada mesin klien Anda, jalankan perintah berikut untuk menemukan nama topik penyimpanan offset konektor Anda. Ganti <bootstrapBrokerString> dengan string broker bootstrap cluster Anda. Untuk petunjuk tentang mendapatkan string broker bootstrap Anda, lihatMendapatkan broker bootstrap untuk MSK cluster Amazon.

    <path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server <bootstrapBrokerString>

    Output berikut menunjukkan daftar semua topik cluster, termasuk topik konektor internal default. Dalam contoh ini, CDC konektor yang ada menggunakan topik penyimpanan offset default yang dibuat oleh MSK Connect. Inilah sebabnya mengapa topik penyimpanan offset disebut__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2.

    __consumer_offsets __amazon_msk_canary __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 my-msk-topic-1 my-msk-topic-2
  2. Buka MSK konsol Amazon di https://console.aws.amazon.com/msk/.

  3. Pilih konektor Anda dari daftar Konektor. Salin dan simpan konten bidang konfigurasi Konektor sehingga Anda dapat memodifikasinya dan menggunakannya untuk membuat konektor baru.

  4. Pilih Hapus untuk menghapus konektor. Kemudian masukkan nama konektor di bidang input teks untuk mengonfirmasi penghapusan.

  5. Buat konfigurasi pekerja khusus dengan nilai yang sesuai dengan skenario Anda. Untuk petunjuk, silakan lihat Membuat konfigurasi pekerja khusus.

    Dalam konfigurasi pekerja Anda, Anda harus menentukan nama topik penyimpanan offset yang sebelumnya Anda ambil sebagai nilai untuk offset.storage.topic like dalam konfigurasi berikut.

    config.providers.secretManager.param.aws.region=us-east-1 key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
  6. penting

    Anda harus memberikan konektor baru Anda nama yang sama dengan konektor lama.

    Buat konektor baru menggunakan konfigurasi pekerja yang Anda atur di langkah sebelumnya. Untuk petunjuk, silakan lihat Membuat konektor.

Pertimbangan

Pertimbangkan hal berikut ketika Anda mengelola offset konektor sumber.

  • Untuk menentukan topik penyimpanan offset, berikan nama topik Kafka tempat offset konektor disimpan sebagai nilai untuk konfigurasi pekerja offset.storage.topic Anda.

  • Berhati-hatilah saat Anda membuat perubahan pada konfigurasi konektor. Mengubah nilai konfigurasi dapat mengakibatkan perilaku konektor yang tidak diinginkan jika konektor sumber menggunakan nilai dari konfigurasi ke catatan offset kunci. Kami menyarankan Anda merujuk ke dokumentasi plugin Anda untuk panduan.

  • Sesuaikan jumlah partisi default — Selain menyesuaikan konfigurasi pekerja dengan menambahkanoffset.storage.topic, Anda dapat menyesuaikan jumlah partisi untuk topik penyimpanan offset dan status. Partisi default untuk topik internal adalah sebagai berikut.

    • config.storage.topic: 1, tidak dapat dikonfigurasi, harus topik partisi tunggal

    • offset.storage.topic: 25, dapat dikonfigurasi dengan menyediakan offset.storage.partitions

    • status.storage.topic: 5, dapat dikonfigurasi dengan menyediakan status.storage.partitions

  • Menghapus topik secara manual - Amazon MSK Connect membuat topik internal Kafka connect baru (nama topik dimulai dengan__amazon_msk_connect) pada setiap penerapan konektor. Topik lama yang dilampirkan ke konektor yang dihapus tidak dihapus secara otomatis karena topik internal, sepertioffset.storage.topic, dapat digunakan kembali di antara konektor. Namun, Anda dapat secara manual menghapus topik internal yang tidak digunakan yang dibuat oleh MSK Connect. Topik internal diberi nama mengikuti format__amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id.

    Ekspresi reguler __amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id dapat digunakan untuk menghapus topik internal. Anda tidak boleh menghapus topik internal yang saat ini digunakan oleh konektor yang sedang berjalan.

  • Menggunakan nama yang sama untuk topik internal yang dibuat oleh MSK Connect — Jika Anda ingin menggunakan kembali topik penyimpanan offset untuk menggunakan offset dari konektor yang dibuat sebelumnya, Anda harus memberikan konektor baru nama yang sama dengan konektor lama. offset.storage.topicProperti dapat diatur menggunakan konfigurasi pekerja untuk menetapkan nama yang sama ke offset.storage.topic dan digunakan kembali di antara konektor yang berbeda. Konfigurasi ini dijelaskan dalam Mengelola offset konektor. MSKConnect tidak mengizinkan konektor yang berbeda untuk berbagi config.storage.topic danstatus.storage.topic. Topik-topik tersebut dibuat setiap kali Anda membuat konektor baru diMSKC. Mereka secara otomatis dinamai mengikuti format__amazon_msk_connect_<status|configs>_connector_name_connector_id, dan begitu juga berbeda di berbagai konektor yang Anda buat.