選取您的 Cookie 偏好設定

我們使用提供自身網站和服務所需的基本 Cookie 和類似工具。我們使用效能 Cookie 收集匿名統計資料,以便了解客戶如何使用我們的網站並進行改進。基本 Cookie 無法停用,但可以按一下「自訂」或「拒絕」以拒絕效能 Cookie。

如果您同意,AWS 與經核准的第三方也會使用 Cookie 提供實用的網站功能、記住您的偏好設定,並顯示相關內容,包括相關廣告。若要接受或拒絕所有非必要 Cookie,請按一下「接受」或「拒絕」。若要進行更詳細的選擇,請按一下「自訂」。

Amazon Athena MSK 連接器

焦點模式
Amazon Athena MSK 連接器 - Amazon Athena

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

適用於 Amazon MSK 的 Amazon Athena 連接器可讓 Amazon Athena 能夠對 Apache Kafka 主題執行 SQL 查詢。使用此連接器可以在 Athena 中以資料表的形式檢視 Apache Kafka 主題,並以資料列的形式檢視訊息。如需詳細資訊,請參閱 AWS 大數據部落格中的使用 Amazon Athena 分析 Amazon MSK 中的即時串流資料。

此連接器不會使用 Glue Connections 集中 Glue 中的組態屬性。連線組態是透過 Lambda 完成。

先決條件

使用 Athena 主控台或 AWS Serverless Application Repository,將連接器部署到您的 AWS 帳戶 。如需詳細資訊,請參閱 建立資料來源連線使用 AWS Serverless Application Repository 部署資料來源連接器

限制

  • 不支援寫入 DDL 操作。

  • 任何相關的 Lambda 限制。如需詳細資訊,請參閱《AWS Lambda 開發人員指南》中的 Lambda 配額

  • 篩選條件中的日期和時間戳記資料類型必須轉換為適當的資料類型。

  • 日期和時間戳記資料類型不受 CSV 檔案類型支援,且會被視為 varchar 值。

  • 不支援映射至巢狀 JSON 欄位。連接器僅映射最上層欄位。

  • 連接器不支援複雜類型。複雜類型會轉譯為字串。

  • 若要擷取或使用複雜的 JSON 值,請使用 Athena 中提供的 JSON 相關函數。如需詳細資訊,請參閱從字串擷取 JSON 資料

  • 連接器不支援存取 Kafka 訊息中繼資料。

條款

  • 中繼資料處理常式 - 從資料庫執行個體中擷取中繼資料的 Lambda 處理常式。

  • 記錄處理常式 - 從資料庫執行個體中擷取資料記錄的 Lambda 處理常式。

  • 複合處理常式 - 從資料庫執行個體中擷取中繼資料和資料記錄的 Lambda 處理常式。

  • Kafka 端點 – 與 Kafka 執行個體建立連線的文字字串。

叢集相容性

MSK 連接器可搭配下列叢集類型使用。

  • MSK 佈建的叢集 – 您可以手動指定、監控和調整叢集容量。

  • MSK 無伺服器叢集 – 提供隨需容量,可隨應用程式 I/O 擴展而自動擴展。

  • 獨立 Kafka – 直接連接至 Kafka (已進行身分驗證或未進行身分驗證)。

支援的身分驗證方法

連接器支援下列身分驗證方法。

支援的輸入資料格式

連接器支援以下輸入資料格式。

  • JSON

  • CSV

參數

使用本節中的參數來設定 Athena MSK 連接器。

  • auth_type – 指定叢集的身分驗證類型。連接器支援下列身分驗證類型:

    • NO_AUTH – 直接連接至 Kafka,而不需要身分驗證 (例如,連接至透過無需使用身分驗證之 EC2 執行個體部署的 Kafka 叢集)。

    • SASL_SSL_PLAIN – 此方法使用 SASL_SSL 安全通訊協定和 PLAIN SASL 機制。

    • SASL_PLAINTEXT_PLAIN – 此方法使用 SASL_PLAINTEXT 安全通訊協定和 PLAIN SASL 機制。

      注意

      Apache Kafka 支援 SASL_SSL_PLAINSASL_PLAINTEXT_PLAIN 身分驗證類型,但 Amazon MSK 不支援。

    • SASL_SSL_AWS_MSK_IAM – 適用於 Amazon MSK 的 IAM 存取控制可讓您能夠處理 MSK 叢集的身分驗證和授權。您使用者的 AWS 登入資料 (私密金鑰和存取金鑰) 用於與叢集連線。如需詳細資訊,請參閱《Amazon Managed Streaming for Apache Kafka 開發人員指南》中的 IAM 存取控制

    • SASL_SSL_SCRAM_SHA512 – 您可以使用此身分驗證類型來控制對 Amazon MSK 叢集的存取。此方法將使用者名稱和密碼存放在 上 AWS Secrets Manager。祕密必須與 Amazon MSK 叢集相關聯。如需詳細資訊,請參閱《Amazon Managed Streaming for Apache Kafka 開發人員指南》中的設定 Amazon MSK 叢集的 SASL/SCRAM 身分驗證

    • SSL – SSL 身分驗證使用金鑰存放區和信任存放區檔案來連接 Amazon MSK 叢集。您必須產生信任存放區和金鑰存放區檔案,將其上傳至 Amazon S3 儲存貯體,並在部署連接器時提供 Amazon S3 參考。金鑰存放區、信任存放區和 SSL 金鑰會儲存在 AWS Secrets Manager中。部署連接器時,您的用戶端必須提供 AWS 私密金鑰。如需詳細資訊,請參閱《Amazon Managed Streaming for Apache Kafka 開發人員指南》中的相互 TLS 身分驗證

      如需詳細資訊,請參閱設定 Athena MSK 連接器的身分驗證

  • certificates_s3_reference – 包含憑證 (金鑰存放區和信任存放區檔案) 的 Amazon S3 位置。

  • disable_spill_encryption - (選用) 當設定為 True 時,停用溢出加密。預設為 False,因此溢出 S3 的資料會使用 AES-GCM 進行加密 — 使用隨機產生的金鑰或 KMS 來產生金鑰。停用溢出加密可以提高效能,尤其是如果溢出位置使用伺服器端加密

  • kafka_endpoint – 提供給 Kafka 的端點詳細資訊。例如,對於 Amazon MSK 叢集,您可以為叢集提供引導 URL

  • secrets_manager_secret – 用來儲存憑證的 AWS 祕密的名稱。IAM 身分驗證不需要此參數。

  • 溢出參數 –Lambda 函數會將不適用記憶體的資料暫時存放 (「溢出」) 至 Amazon S3。由相同 Lambda 函數存取的所有資料庫執行個體溢出到相同的位置。請使用以下資料表中的參數來指定溢出位置。

    參數 描述
    spill_bucket 必要。Lambda 函數可在其中溢出資料的 Amazon S3 儲存貯體的名稱。
    spill_prefix 必要。Lambda 函數可在其中溢出資料的溢出儲存貯體的字首。
    spill_put_request_headers (選用) 用於溢出的 Amazon S3 putObject 請求的請求標頭和值的 JSON 編碼映射 (例如,{"x-amz-server-side-encryption" : "AES256"})。如需了解其他可能的標頭,請參閱《Amazon Simple Storage Service API 參考》中的 PutObject

支援的資料類型

下表顯示 Kafka 和 Apache Arrow 支援的相應資料類型。

Kafka Arrow
CHAR VARCHAR
VARCHAR VARCHAR
TIMESTAMP 毫秒
DATE DAY
BOOLEAN BOOL
SMALLINT SMALLINT
INTEGER INT
BIGINT BIGINT
DECIMAL FLOAT8
DOUBLE FLOAT8

分割區和分隔

Kafka 主題會分為多個分割區。每個分割區都已排序。分割區中的每個訊息都有一個稱為位移的增量 ID。每個 Kafka 分割區可再細分為多個分隔,用於並行處理。資料在 Kafka 叢集中設定的保留期間內可供使用。

最佳實務

最佳實務是在您查詢 Athena 時使用述詞下推,如以下範例所示。

SELECT * FROM "msk_catalog_name"."glue_schema_registry_name"."glue_schema_name" WHERE integercol = 2147483647
SELECT * FROM "msk_catalog_name"."glue_schema_registry_name"."glue_schema_name" WHERE timestampcol >= TIMESTAMP '2018-03-25 07:30:58.878'

設定 MSK 連接器

您必須先設定 Amazon MSK 叢集、使用 AWS Glue 結構描述登錄檔來定義結構描述,以及為連接器設定身分驗證,才能使用連接器。

注意

如果您將連接器部署到 VPC 中以存取私有資源,並且還想要連接到可公開存取的服務 (例如 Confluent),則必須將連接器與具有 NAT 閘道的私有子網路建立關聯。如需詳細資訊,請參閱《Amazon VPC 使用者指南》中的 NAT 閘道

使用 AWS Glue 結構描述登錄檔時,請注意下列事項:

  • 請確定 AWS Glue 結構描述登錄檔的 Description (描述) 欄位中的文字包含字串 {AthenaFederationMSK}。您搭配 Amazon Athena MSK 連接器使用的 AWS Glue 登錄檔需要此標記字串。

  • 為了獲得最佳效能,請僅使用小寫作為資料庫名稱和資料表名稱。使用混合大小寫會導致連接器執行運算密集程度較高的不區分大小寫搜尋。

設定 Amazon MSK 環境和 AWS Glue 結構描述登錄檔
  1. 設定您的 Amazon MSK 環境。如需相關資訊和步驟,請參閱《Amazon Managed Streaming for Apache Kafka 開發人員指南》中的設定 Amazon MSK開始使用 Amazon MSK

  2. 將 JSON 格式的 Kafka 主題描述檔案 (即其結構描述) 上傳至 AWS Glue 結構描述登錄檔。如需詳細資訊,請參閱《 AWS Glue 開發人員指南》中的與 AWS Glue 結構描述登錄檔整合。如需範例結構描述,請參閱下一節。

將結構描述上傳至 AWS Glue 結構描述登錄檔時,請使用本節中的範例格式。

JSON 類型結構描述範例

在下列範例中,要在結構描述登錄檔中建立的 AWS Glue 結構描述指定 json 做為 的值dataFormat,並datatypejson用於 topicName

注意

topicName 的值應使用與 Kafka 中的主題名稱相同的大小寫。

{ "topicName": "datatypejson", "message": { "dataFormat": "json", "fields": [ { "name": "intcol", "mapping": "intcol", "type": "INTEGER" }, { "name": "varcharcol", "mapping": "varcharcol", "type": "VARCHAR" }, { "name": "booleancol", "mapping": "booleancol", "type": "BOOLEAN" }, { "name": "bigintcol", "mapping": "bigintcol", "type": "BIGINT" }, { "name": "doublecol", "mapping": "doublecol", "type": "DOUBLE" }, { "name": "smallintcol", "mapping": "smallintcol", "type": "SMALLINT" }, { "name": "tinyintcol", "mapping": "tinyintcol", "type": "TINYINT" }, { "name": "datecol", "mapping": "datecol", "type": "DATE", "formatHint": "yyyy-MM-dd" }, { "name": "timestampcol", "mapping": "timestampcol", "type": "TIMESTAMP", "formatHint": "yyyy-MM-dd HH:mm:ss.SSS" } ] } }

CSV 類型結構描述範例

在下列範例中,要在結構描述登錄檔中建立的 AWS Glue 結構描述指定 csv 做為 的值dataFormat,並datatypecsvbulk用於 topicNametopicName 的值應使用與 Kafka 中的主題名稱相同的大小寫。

{ "topicName": "datatypecsvbulk", "message": { "dataFormat": "csv", "fields": [ { "name": "intcol", "type": "INTEGER", "mapping": "0" }, { "name": "varcharcol", "type": "VARCHAR", "mapping": "1" }, { "name": "booleancol", "type": "BOOLEAN", "mapping": "2" }, { "name": "bigintcol", "type": "BIGINT", "mapping": "3" }, { "name": "doublecol", "type": "DOUBLE", "mapping": "4" }, { "name": "smallintcol", "type": "SMALLINT", "mapping": "5" }, { "name": "tinyintcol", "type": "TINYINT", "mapping": "6" }, { "name": "floatcol", "type": "DOUBLE", "mapping": "7" } ] } }

將結構描述上傳至 AWS Glue 結構描述登錄檔時,請使用本節中的範例格式。

JSON 類型結構描述範例

在下列範例中,要在結構描述登錄檔中建立的 AWS Glue 結構描述指定 json 做為 的值dataFormat,並datatypejson用於 topicName

注意

topicName 的值應使用與 Kafka 中的主題名稱相同的大小寫。

{ "topicName": "datatypejson", "message": { "dataFormat": "json", "fields": [ { "name": "intcol", "mapping": "intcol", "type": "INTEGER" }, { "name": "varcharcol", "mapping": "varcharcol", "type": "VARCHAR" }, { "name": "booleancol", "mapping": "booleancol", "type": "BOOLEAN" }, { "name": "bigintcol", "mapping": "bigintcol", "type": "BIGINT" }, { "name": "doublecol", "mapping": "doublecol", "type": "DOUBLE" }, { "name": "smallintcol", "mapping": "smallintcol", "type": "SMALLINT" }, { "name": "tinyintcol", "mapping": "tinyintcol", "type": "TINYINT" }, { "name": "datecol", "mapping": "datecol", "type": "DATE", "formatHint": "yyyy-MM-dd" }, { "name": "timestampcol", "mapping": "timestampcol", "type": "TIMESTAMP", "formatHint": "yyyy-MM-dd HH:mm:ss.SSS" } ] } }

CSV 類型結構描述範例

在下列範例中,要在結構描述登錄檔中建立的 AWS Glue 結構描述指定 csv 做為 的值dataFormat,並datatypecsvbulk用於 topicNametopicName 的值應使用與 Kafka 中的主題名稱相同的大小寫。

{ "topicName": "datatypecsvbulk", "message": { "dataFormat": "csv", "fields": [ { "name": "intcol", "type": "INTEGER", "mapping": "0" }, { "name": "varcharcol", "type": "VARCHAR", "mapping": "1" }, { "name": "booleancol", "type": "BOOLEAN", "mapping": "2" }, { "name": "bigintcol", "type": "BIGINT", "mapping": "3" }, { "name": "doublecol", "type": "DOUBLE", "mapping": "4" }, { "name": "smallintcol", "type": "SMALLINT", "mapping": "5" }, { "name": "tinyintcol", "type": "TINYINT", "mapping": "6" }, { "name": "floatcol", "type": "DOUBLE", "mapping": "7" } ] } }

設定 Athena MSK 連接器的身分驗證

您可以使用各種方法對 Amazon MSK 叢集進行身分驗證,包括 IAM、SSL、SCRAM 和獨立 Kafka。

下表顯示了連接器的身分驗證類型,以及每種連接器的安全通訊協定和 SASL 機制。如需詳細資訊,請參閱《Amazon Managed Streaming for Apache Kafka 開發人員指南》中的Apache Kafka API 的身分驗證和授權

auth_type security.protocol sasl.mechanism
SASL_SSL_PLAIN SASL_SSL PLAIN
SASL_PLAINTEXT_PLAIN SASL_PLAINTEXT PLAIN
SASL_SSL_AWS_MSK_IAM SASL_SSL AWS_MSK_IAM
SASL_SSL_SCRAM_SHA512 SASL_SSL SCRAM-SHA-512
SSL SSL N/A
注意

Apache Kafka 支援 SASL_SSL_PLAINSASL_PLAINTEXT_PLAIN 身分驗證類型,但 Amazon MSK 不支援。

SASL/IAM

如果叢集使用 IAM 身分驗證,則必須在設定叢集時為使用者設定 IAM 政策。如需詳細資訊,請參閱《Amazon Managed Streaming for Apache Kafka 開發人員指南》中的 IAM 存取控制

若要使用此身分驗證類型,請將連接器的 auth_type Lambda 環境變數設定為 SASL_SSL_AWS_MSK_IAM

SSL

如果叢集經過 SSL 身分驗證,您必須產生信任存放區和金鑰存放區檔案,並將其上傳到 Amazon S3 儲存貯體。部署連接器時,您必須提供此 Amazon S3 參考。金鑰存放區、信任存放區和 SSL 金鑰會儲存在 AWS Secrets Manager中。部署連接器時,您會提供 AWS 私密金鑰。

如需有關在 Secrets Manager 中建立祕密的詳細資訊,請參閱建立 AWS Secrets Manager 祕密

若要使用此身分驗證類型,請設定環境變數,如下表所示。

參數 Value
auth_type SSL
certificates_s3_reference 包含憑證的 Amazon S3 位置。
secrets_manager_secret AWS 私密金鑰的名稱。

在 Secrets Manager 中建立祕密之後,您可以在 Secrets Manager 主控台中進行檢視。

若要檢視 Secrets Manager 中的祕密
  1. 前往以下位置開啟機密管理員控制台:https://console.aws.amazon.com/secretsmanager/

  2. 在導覽窗格中,選擇 Secrets (祕密)。

  3. Secrets (祕密) 頁面中,選擇祕密的連結。

  4. 在祕密的詳細資訊頁面上,選擇 Retrieve secret value (擷取祕密值)。

    下圖顯示了具有三個金鑰/值對的祕密範例:keystore_passwordtruststore_passwordssl_key_password

    在 Secrets Manager 中擷取 SSL 祕密。

SASL/SCRAM

如果您的叢集使用 SCRAM 身分驗證,則請在部署連接器時提供與叢集相關聯的 Secrets Manager 金鑰。使用者的 AWS 憑證 (祕密金鑰和存取金鑰) 可用於向叢集進行身分驗證。

設定環境變數,如下表所示。

參數 Value
auth_type SASL_SSL_SCRAM_SHA512
secrets_manager_secret AWS 私密金鑰的名稱。

下圖顯示了 Secrets Manager 主控台中含有兩個金鑰/值對的祕密範例:一個用於 username,另一個用於 password

在 Secrets Manager 中擷取 SCRAM 祕密。

授權資訊

使用此連接器,即表示您確認已包含第三方元件,可在此連接器的 pom.xml 檔案中找到其清單,並同意 GitHub.com 上 LICENSE.txt 檔案中提供的相應第三方授權中的條款。

其他資源

如需此連接器的其他資訊,請造訪 GitHub.com 上的相應網站

下一個主題:

MySQL

上一個主題:

Kafka
隱私權網站條款Cookie 偏好設定
© 2025, Amazon Web Services, Inc.或其附屬公司。保留所有權利。