ワーカー - Amazon Managed Streaming for Apache Kafka

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

ワーカー

ワーカーは、コネクタロジックを実行する Java 仮想マシン (JVM) プロセスです。各ワーカーは、並列スレッドで実行され、データをコピーする作業を行う一連のタスクを作成します。タスクは状態を保存しないため、復元力のあるスケーラブルな Data Pipeline を提供するために、いつでも開始、停止、または再開できます。スケーリングイベントまたは予期しない障害によるワーカー数の変更は、残りのワーカーによって自動的に検出され、残りのワーカーのセット全体でタスクのバランスを取り直すように調整されます。Connect ワーカーは、Apache Kafka のコンシューマーグループを使用して、調整とリバランスを行います。

コネクタの容量要件が可変であるか、推定が難しい場合は、指定した下限と上限の間で必要に応じて MSK Connect でワーカー数をスケーリングできます。または、コネクタロジックを実行するワーカーの正確な数を指定することもできます。詳細については、「コネクタ容量」を参照してください。

MSK Connect ワーカーが IP アドレスを使用する

MSK Connect ワーカーは、お客様が用意したサブネットで IP アドレスを使用します。各ワーカーは、お客様が用意したサブネットの 1 つから 1 つの IP アドレスを使用します。特にワーカー数が変動する可能性のあるコネクタを自動スケーリングする場合は、指定された容量を考慮して CreateConnector 、リクエストに提供されるサブネットに十分な使用可能な IP アドレスがあることを確認する必要があります。

デフォルトのワーカー構成

MSK Connect には、次のデフォルトのワーカー設定があります。

key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter

サポートされているワーカー設定プロパティ

MSK Connect はデフォルトのワーカー設定を提供します。コネクタで使用するカスタムワーカー設定を作成することもできます。次のリストには、Amazon MSK Connect がサポートするワーカー設定プロパティまたはサポートしないワーカー設定プロパティに関する情報が含まれています。

  • key.converter プロパティと value.converter プロパティが必要です。

  • MSK Connect では、次のproducer.設定プロパティがサポートされています。

    producer.acks producer.batch.size producer.buffer.memory producer.compression.type producer.enable.idempotence producer.key.serializer producer.linger.ms producer.max.request.size producer.metadata.max.age.ms producer.metadata.max.idle.ms producer.partitioner.class producer.reconnect.backoff.max.ms producer.reconnect.backoff.ms producer.request.timeout.ms producer.retry.backoff.ms producer.value.serializer
  • MSK Connect では、次のconsumer.設定プロパティがサポートされています。

    consumer.allow.auto.create.topics consumer.auto.offset.reset consumer.check.crcs consumer.fetch.max.bytes consumer.fetch.max.wait.ms consumer.fetch.min.bytes consumer.heartbeat.interval.ms consumer.key.deserializer consumer.max.partition.fetch.bytes consumer.max.poll.records consumer.metadata.max.age.ms consumer.partition.assignment.strategy consumer.reconnect.backoff.max.ms consumer.reconnect.backoff.ms consumer.request.timeout.ms consumer.retry.backoff.ms consumer.session.timeout.ms consumer.value.deserializer
  • 次のプロパティを除いて、producer. または consumer. プレフィックスでスタートしないすべての設定プロパティが許可されます。

    access.control. admin. admin.listeners.https. client. connect. inter.worker. internal. listeners.https. metrics. metrics.context. rest. sasl. security. socket. ssl. topic.tracking. worker. bootstrap.servers config.storage.topic connections.max.idle.ms connector.client.config.override.policy group.id listeners metric.reporters plugin.path receive.buffer.bytes response.http.headers.config scheduled.rebalance.max.delay.ms send.buffer.bytes status.storage.topic

ワーカー設定プロパティとその表現については、Apache Kafka ドキュメントの「Kafka Connect Config」を参照してください。

カスタムワーカー設定を作成する

を使用したカスタムワーカー設定の作成 AWS Management Console
  1. で Amazon MSKコンソールを開きますhttps://console.aws.amazon.com/msk/

  2. 左側のペインの MSK Connect で、ワーカー設定 を選択します。

  3. Create worker configuration (ワーカー構成の作成) を選択します。

  4. 名前とオプションの説明を入力し、設定するプロパティと値を追加します。

  5. Create worker configuration (ワーカー構成の作成) を選択します。

MSK Connect を使用してワーカー設定APIを作成するには、「」を参照してくださいCreateWorkerConfiguration

offset.storage.topic を使用してソースコネクタオフセットを管理する

このセクションでは、「オフセットストレージトピック」を使用してソースコネクタオフセットを管理するのに役立つ情報を提供します。オフセットストレージトピックは、Kafka Connect がコネクタとタスク設定のオフセットを保存するために使用する内部トピックです。

デフォルトのオフセットストレージトピックを使用する

デフォルトでは、Amazon MSK Connect は作成するコネクタごとに Kafka クラスターに新しいオフセットストレージトピックを生成します。MSK は、コネクタ の一部を使用してデフォルトのトピック名を作成しますARN。例えば、__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 と指定します。

独自のオフセットストレージトピックを指定する

ソースコネクタ間のオフセットの連続性を提供するために、デフォルトトピックの代わりに任意のオフセットストレージトピックを使用できます。オフセットストレージトピックを指定すると、前のコネクタの最後のオフセットから読み取りを再開するソースコネクタを作成するといったタスクを実行しやすくなります。

オフセットストレージトピックを指定するには、コネクタを作成する前にワーカー設定で offset.storage.topic プロパティの値を指定します。オフセットストレージトピックを再利用して以前に作成したコネクタのオフセットを利用する場合は、新しいコネクタに古いコネクタと同じ名前を付ける必要があります。カスタムオフセットストレージトピックを作成する場合は、トピック設定で cleanup.policycompact に設定する必要があります。

注記

シンクコネクタの作成時にオフセットストレージトピックを指定すると、Connect MSK はトピックが存在しない場合にトピックを作成します。ただし、このトピックはコネクタオフセットの保存には使用されません。

代わりに、シンクコネクタオフセットは Kafka コンシューマーグループプロトコルを使用して管理されます。各シンクコネクタは connect-{CONNECTOR_NAME} という名前のグループを作成します。コンシューマーグループが存在する限り、同じ CONNECTOR_NAME 値で連続して作成されるシンクコネクタは、最後にコミットされたオフセットから継続されます。

例 : オフセットストレージトピックを指定し、更新された設定を使用してソースコネクタを再作成する

変更データキャプチャ (CDC) コネクタがあり、CDCストリーム内の場所を失うことなくコネクタ設定を変更するとします。既存のコネクタ設定を更新することはできませんが、そのコネクタを削除して同じ名前で新しいコネクタ設定を作成することはできます。新しいコネクタにCDCストリーム内の読み取りを開始する場所を指定するには、ワーカー設定で古いコネクタのオフセットストレージトピックを指定します。次のステップでこのタスクのやり方を説明します。

  1. クライアントマシンで、次のコマンドを実行してコネクタのオフセットストレージトピックの名前を検索します。<bootstrapBrokerString> をクラスターのブートストラップブローカー文字列に置き換えます。ブートストラップブローカー文字列を取得する手順については、「Amazon MSKクラスターのブートストラップブローカーの取得」を参照してください。

    <path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server <bootstrapBrokerString>

    次の出力は、デフォルトの内部コネクタトピックを含むすべてのクラスタートピックのリストを示しています。この例では、既存のCDCコネクタは MSK Connect によって作成されたデフォルトのオフセットストレージトピックを使用します。オフセットストレージトピックが __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 と呼ばれるのはこれが理由です。

    __consumer_offsets __amazon_msk_canary __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 my-msk-topic-1 my-msk-topic-2
  2. で Amazon MSKコンソールを開きますhttps://console.aws.amazon.com/msk/

  3. [コネクタ] リストからコネクタを選択します。[コネクタ設定] フィールドの内容をコピーして保存し、内容を変更して新しいコネクタを作成できるようにします。

  4. コネクタを削除するには、[削除] を選択します。テキスト入力フィールドにコネクタ名を入力して、削除を確定します。

  5. 実際のシナリオに合った値を使用してカスタムワーカー設定を作成します。手順については、カスタムワーカー設定を作成する を参照してください。

    ワーカー設定では、以下の設定のように、以前に offset.storage.topic の値として取得したオフセットストレージトピックの名前を指定する必要があります。

    config.providers.secretManager.param.aws.region=us-east-1 key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
  6. 重要

    新しいコネクタには古いコネクタと同じ名前を付ける必要があります。

    前のステップで設定したワーカー設定を使用して、新しいコネクタを作成します。手順については、コネクタの作成 を参照してください。

考慮事項

ソースコネクタオフセットを管理するときは、次の点を考慮してください。

  • オフセットストレージトピックを指定するには、ワーカー設定の offset.storage.topic の値として、コネクタオフセットが保存される Kafka トピックの名前を指定します。

  • コネクタ設定を変更する場合は注意が必要です。ソースコネクタがキーオフセットレコードに対して設定の値を使用している場合、設定値を変更すると、コネクタが意図しない動作をする可能性があります。プラグインのドキュメントを参照することをお勧めします。

  • デフォルトのパーティション数のカスタマイズoffset.storage.topic の追加によるワーカー設定のカスタマイズに加えて、オフセットとステータスストレージのトピックのパーティション数をカスタマイズできます。内部トピックのデフォルトパーティションは次のとおりです。

    • config.storage.topic: 1、設定不可、単一パーティションのトピックである必要がある

    • offset.storage.topic: 25、offset.storage.partitions を指定することで設定可能

    • status.storage.topic: 5、status.storage.partitions を指定することで設定可能

  • トピックの手動削除 – Amazon MSK Connect は、コネクタのデプロイごとに新しい Kafka Connect 内部トピック (トピック名は で始まる__amazon_msk_connect) を作成します。offset.storage.topic などの内部トピックはコネクタ間で再利用される可能性があるため、削除されたコネクタにアタッチされた古いトピックは自動的に削除されません。ただし、MSKConnect によって作成された未使用の内部トピックは手動で削除できます。内部トピックには __amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id 形式に従って名前が付けられます。

    正規表現 __amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id を使用して内部トピックを削除できます。実行中のコネクタによって現在使用されている内部トピックは削除しないでください。

  • MSK Connect によって作成された内部トピックに同じ名前を使用する – オフセットストレージトピックを再利用して、以前に作成したコネクタからのオフセットを消費する場合は、新しいコネクタに古いコネクタと同じ名前を付ける必要があります。ワーカー設定を使用して offset.storage.topic プロパティを設定し、offset.storage.topic に同じ名前を割り当て、異なるコネクタ間で再利用することができます。この設定については、「コネクタオフセットを管理する」で説明しています。MSK Connect では、異なるコネクタで config.storage.topicと を共有することはできませんstatus.storage.topic。これらのトピックは、 で新しいコネクタを作成するたびに作成されますMSKC。__amazon_msk_connect_<status|configs>_connector_name_connector_id 形式に従って自動的に名前が付けられるため、作成するコネクタによって名前が異なります。