連接器 - Amazon Managed Streaming for Apache Kafka

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

連接器

連接器會將外部系統和 Amazon 服務與 Apache Kafka 整合,方法是持續將資料來源的串流資料複製到 Apache Kafka 叢集中,或持續將叢集中的資料複製到資料目的地中。在將資料傳送至目的地之前,連接器也可執行輕量型邏輯,例如轉換、格式轉換或篩選資料。來源連接器會從資料來源提取資料,並將此資料推送至叢集中,同時,目的地連接器則會從叢集提取資料,並將此資料推送至資料目的地中。

以下圖表說明連接器的架構。工作程序是執行連接器邏輯的 Java 虛擬機器 (JVM) 程序。每個工作程序皆會建立一組在平行執行緒中執行的任務,並執行複製資料的工作。任務不會存放狀態,因此可以隨時啟動、停止或重新啟動,以提供彈性且可擴展的資料管道。

顯示連接器叢集之架構的圖表。

連接器容量

連接器的總容量將依該連接器具有的工作程序數量,以及每個工作程序的 MSK Connect 單位 (MCU) 數量而定。一個 MCU 代表 1 個 vCPU 的運算和 4 GiB 的記憶體。MCU 記憶體與工作程序執行個體的總記憶體有關,而非與使用中的堆積記憶體相關。

MSK Connect 工作者會使用客戶提供的子網路中的 IP 位址。每個 Worker 都會使用其中一個客戶提供的子網路中的一個 IP 位址。您應該確保提供的子網路中有足夠的 CreateConnector 可用 IP 位址來說明其指定容量,尤其是當 Worker 數量可能會波動的自動調度資源連接器時。

若要建立連接器,您必須選擇以下兩種容量模式之一。

  • 佈建 - 若您知道連接器的容量需求,請選擇此模式。您可以指定兩個值:

    • 工作程序數量。

    • 每個工作程序的 MCU 數目。

  • 自動擴展 - 若連接器的容量需求可變,或者您事先不知道容量需求,請選擇此模式。在您使用自動擴展模式時,Amazon MSK Connect 會使用與連接器中執行的工作程序數量以及每個工作程序的 MCU 數量成比例的值,來覆寫連接器的 tasks.max 屬性。

    您可以指定三組值:

    • 工作程序數量下限和上限。

    • CP 使用率的縮減和橫向擴展百分比,依據 CpuUtilization 指標決定。在連接器的 CpuUtilization 指標超過橫向擴展百分比時,MSK Connect 會增加連接器中執行的工作程序數量。在 CpuUtilization 指標低於縮減百分比時,MSK Connect 會減少工作程序的數量。工作程序的數目會永遠維持在您建立連接器時指定的數量下限和上限之間。

    • 每個工作程序的 MCU 數目。

如需有關工作程序的詳細資訊,請參閱 工作程序。若要了解 MSK Connect 指標,請參閱 監控 MSK Connect

建立連接器

使用建立連接器 AWS Management Console
  1. 開啟位於 https://console.aws.amazon.com/msk/ 的 Amazon MSK 主控台。

  2. 在左窗格的 MSK Connect 下,選擇連接器

  3. 選擇 Create connector (建立連接器)

  4. 您可選擇使用現有的自訂外掛程式來建立連接器,或先建立新的自訂外掛程式。如需有關自訂外掛程式以及如何建立的相關資訊,請參閱 外掛程式。在此過程中,請假設您具有要使用的自訂外掛程式。在自訂外掛程式清單中,找到您要使用的外掛程式,然後選取其左側的方塊,然後選擇下一步

  5. 輸入名稱,以及描述 (非必要)。

  6. 選擇您要連線到的叢集。

  7. 指定連接器組態。您需要指定的組態參數會依據您要建立的連接器類型而定。然而,有部分參數是所有連接器都要指定的,例如 connector.classtasks.max 參數。以下是 Confluent Amazon S3 目的地連接器的範例組態。

    connector.class=io.confluent.connect.s3.S3SinkConnector tasks.max=2 topics=my-example-topic s3.region=us-east-1 s3.bucket.name=my-destination-bucket flush.size=1 storage.class=io.confluent.connect.s3.storage.S3Storage format.class=io.confluent.connect.s3.format.json.JsonFormat partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter schema.compatibility=NONE
  8. 接下來,您需設定連接器容量。您可在兩種容量模式中選擇其一:佈建和自動擴展。如需關於這兩個選項的詳細資訊,請參閱連接器容量

  9. 在預設工作程序組態或自訂工作程序組態中選擇其一。如需有關建立自訂工作程序組態的詳細資訊,請參閱 工作程序

  10. 接下來,您需指定服務執行角色。這必須是 MSK Connect 可以承擔的 IAM 角色,並授予連接器存取必要 AWS 資源所需的所有權限。這些許可會依連接器的邏輯而定。如需如何建立此角色的資訊,請參閱 服務執行角色

  11. 選擇下一步,檢閱安全性資訊,然後再次選擇下一步

  12. 指定所需的記錄選項,然後選擇下一步。如需日誌記錄的相關資訊,請參閱MSK Connect 的日誌

  13. 選擇 Create connector (建立連接器)

若要使用 MSK 連線 API 建立 Connect 器,請參閱CreateConnector