Apache Kafka - AWS IoT Core

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Apache Kafka

Apache Kafka (Kafka) 動作會將訊息直接傳送到您的 Amazon Managed Streaming for Apache Kafka (Amazon MSK)、由 Confluent Cloud 等第三方供應商管理的 Apache Kafka 叢集,或自我管理的 Apache Kafka 叢集。透過 Kafka 規則動作,您可以將 IoT 資料路由至 Kafka 叢集。這可讓您為各種目的建置高效能資料管道,例如串流分析、資料整合、視覺化和任務關鍵型業務應用程式。

注意

本主題假設您熟悉 Apache Kafka 平台及相關概念。如需有關 Apache Kafka 的詳細資訊,請參閱 Apache Kafka。不支援MSK無伺服器。MSK 無伺服器叢集只能透過 Apache Kafka 規則動作目前不支援的IAM身分驗證來完成。如需如何使用 AWS IoT Core Confluent 設定的詳細資訊,請參閱利用 Confluent 和 AWS 解決 IoT 裝置和資料管理挑戰

要求

此規則動作具有下列需求:

  • AWS IoT 可以擔任IAM的角色,以執行 ec2:CreateNetworkInterfaceec2:DescribeNetworkInterfacesec2:CreateNetworkInterfacePermissionec2:DeleteNetworkInterfaceec2:DescribeSubnetsec2:DescribeVpcsec2:DescribeVpcAttributeec2:DescribeSecurityGroups操作。此角色會建立並管理您 Amazon Virtual Private Cloud 的彈性網路介面,以達到您的 Kafka 代理程式。如需詳細資訊,請參閱授予 AWS IoT 規則所需的存取權

    在 AWS IoT 主控台中,您可以選擇或建立允許 AWS IoT Core 執行此規則動作的角色。

    如需網路介面的詳細資訊,請參閱 Amazon EC2使用者指南 中的彈性網路介面

    連接至您指定之角色的政策應如下列範例所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • 如果您使用 AWS Secrets Manager 來存放連線至 Kafka 代理程式所需的憑證,則必須建立 AWS IoT Core 可擔任IAM的角色,以執行 secretsmanager:GetSecretValuesecretsmanager:DescribeSecret操作。

    連接至您指定之角色的政策應如下列範例所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:region:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region:123456789012:secret:kafka_keytab-*" ] } ] }
  • 您可以在 Amazon Virtual Private Cloud (Amazon ) 內執行 Apache Kafka 叢集VPC。您必須建立 Amazon VPC目的地,並在子網路中使用NAT閘道,將訊息從 轉送 AWS IoT 到公有 Kafka 叢集。 AWS IoT 規則引擎會在VPC目的地中列出的每個子網路中建立網路介面,以將流量直接路由至 VPC。當您建立VPC目的地時, AWS IoT 規則引擎會自動建立VPC規則動作。如需VPC規則動作的詳細資訊,請參閱 虛擬私有雲端 (VPC) 目的地

  • 如果您使用客戶受管 AWS KMS key (KMS 金鑰) 加密靜態資料,服務必須具有代表來電者使用 KMS金鑰的許可。如需詳細資訊,請參閱 Amazon Managed Streaming for Apache Kafka 開發人員指南 中的 Amazon MSK加密

參數

使用此動作建立 AWS IoT 規則時,您必須指定下列資訊:

destinationArn

VPC 目的地的 Amazon Resource Name (ARN)。如需建立VPC目的地的相關資訊,請參閱 虛擬私有雲端 (VPC) 目的地

主題

要傳送至 Kafka 代理程式之訊息的 Kafka 主題。

您可使用替代範本來替代此欄位。如需詳細資訊,請參閱替代範本

金鑰 (選用)

Kafka 訊息金鑰。

您可使用替代範本來替代此欄位。如需詳細資訊,請參閱替代範本

標頭 (選用)

您指定的 Kafka 標頭清單。每個標頭都是鍵值對,您可以在建立 Kafka 動作時指定。您可以使用這些標頭將資料從 IoT 用戶端路由到下游 Kafka 叢集,而不需修改訊息承載。

您可使用替代範本來替代此欄位。若要了解如何在 Kafka 動作標頭中傳遞內嵌規則的函數作為替換範本,請參閱範例。如需詳細資訊,請參閱替代範本

注意

不支援二進位格式的標頭。

分割區 (選用)

Kafka 訊息分割區。

您可使用替代範本來替代此欄位。如需詳細資訊,請參閱替代範本

clientProperties

定義 Apache Kafka 生產者用戶端屬性的物件。

acks (選用)

生產者要求伺服器在考慮請求完成之前所收到的確認數目。

如果您指定 0 作為值,生產者將不會等待伺服器的任何確認。若伺服器並未收到訊息,生產者不會重試傳送訊息。

有效值:-101all。預設值為 1

bootstrap.servers

用來建立 Kafka 叢集初始連線的主機和連接埠配對清單 (例如 host1:port1host2:port2)。

compression.type (選用)

生產者所產生所有資料的壓縮類型。

有效值:nonegzipsnappylz4zstd。預設值為 none

security.protocol

用來連接至您 Kafka 代理程式的安全通訊協定。

有效值:SSLSASL_SSL。預設值為 SSL

key.serializer

指定如何將與 ProducerRecord 一起提供的金鑰物件轉換為位元組。

有效值:StringSerializer

value.serializer

指定如何將與 ProducerRecord 一起提供的值物件轉換為位元組。

有效值:ByteBufferSerializer

ssl.truststore

base64 格式的信任庫檔案或在 AWS Secrets Manager 中的信任庫檔案位置。若 Amazon 憑證授權機構 (CA) 信任您的信任存放區,則不需要此值。

此欄位支援替代範本。如果您使用 Secrets Manager 來存放連線到 Kafka 代理程式所需的憑證,您可以使用 get_secretSQL函數來擷取此欄位的值。如需替代範本的詳細資訊,請參閱 替代範本。如需 get_secretSQL函數的詳細資訊,請參閱 get_secret(secretId, secretType, key, roleArn)。若信任庫為檔案形式,請使用 SecretBinary 參數。若信任庫為字串形式,請使用 SecretString 參數。

此值的最大大小為 65 KB。

ssl.truststore.password

信任庫的密碼。只有在您已建立信任庫的密碼時,才需要此值。

ssl.keystore

金鑰存放區檔案。當您指定 SSLsecurity.protocol 的值時,則需要此值。

此欄位支援替代範本。使用 Secrets Manager 來存放連接至 Kafka 代理程式所需的憑證。若要擷取此欄位的值,請使用 get_secretSQL函數。如需替代範本的詳細資訊,請參閱 替代範本。如需 get_secretSQL函數的詳細資訊,請參閱 get_secret(secretId, secretType, key, roleArn)。使用 SecretBinary 參數。

ssl.keystore.password

金鑰存放區檔案的存放區密碼。若指定 ssl.keystore 的值,則需要此值。

此欄位的值可以是純文字。此欄位也支援替代範本。使用 Secrets Manager 來存放連接至 Kafka 代理程式所需的憑證。若要擷取此欄位的值,請使用 get_secretSQL函數。如需替代範本的詳細資訊,請參閱 替代範本。如需 get_secretSQL函數的詳細資訊,請參閱 get_secret(secretId, secretType, key, roleArn)。使用 SecretString 參數。

ssl.key.password

金鑰存放區檔案中私鑰的密碼。

此欄位支援替代範本。使用 Secrets Manager 來存放連接至 Kafka 代理程式所需的憑證。若要擷取此欄位的值,請使用 get_secretSQL函數。如需替代範本的詳細資訊,請參閱 替代範本。如需 get_secretSQL函數的詳細資訊,請參閱 get_secret(secretId, secretType, key, roleArn)。使用 SecretString 參數。

sasl.mechanism

用來連接至 Kafka 代理程式的安全機制。指定 security.protocolSASL_SSL 時,則需要此值。

有效值:PLAINSCRAM-SHA-512GSSAPI

注意

SCRAM-SHA-512 是 cn-north-1、cn-northwest-1、 us-gov-east-1 和 us-gov-west-1 區域中唯一支援的安全機制。

sasl.plain.username

用來從 Secrets Manager 擷取秘密字串的使用者名稱。指定 security.protocolSASL_SSLsasl.mechanismPLAIN 時,則需要此值。

sasl.plain.password

用於從 Secrets Manager 擷取秘密字串的密碼。指定 security.protocolSASL_SSLsasl.mechanismPLAIN 時,則需要此值。

sasl.scram.username

用來從 Secrets Manager 擷取秘密字串的使用者名稱。指定 security.protocolSASL_SSLsasl.mechanismSCRAM-SHA-512 時,則需要此值。

sasl.scram.password

用於從 Secrets Manager 擷取秘密字串的密碼。指定 security.protocolSASL_SSLsasl.mechanismSCRAM-SHA-512 時,則需要此值。

sasl.kerberos.keytab

Secrets Manager 中 Kerberos 驗證的 keytab 檔案。指定 security.protocolSASL_SSLsasl.mechanismGSSAPI 時,則需要此值。

此欄位支援替代範本。使用 Secrets Manager 來存放連接至 Kafka 代理程式所需的憑證。若要擷取此欄位的值,請使用 get_secretSQL函數。如需替代範本的詳細資訊,請參閱 替代範本。如需 get_secretSQL函數的詳細資訊,請參閱 get_secret(secretId, secretType, key, roleArn)。使用 SecretBinary 參數。

sasl.kerberos.service.name

在 Apache Kafka 執行之下的 Kerberos 委託人名稱。指定 security.protocolSASL_SSLsasl.mechanismGSSAPI 時,則需要此值。

sasl.kerberos.krb5.kdc

Apache Kafka 生產者用戶端所連線之金鑰分發中心的主機名稱 (KDC)。指定 security.protocolSASL_SSLsasl.mechanismGSSAPI 時,則需要此值。

sasl.kerberos.krb5.realm

您的 Apache Kafka 生產者用戶端連線的領域。指定 security.protocolSASL_SSLsasl.mechanismGSSAPI 時,則需要此值。

sasl.kerberos.principal

Kerberos 可指派票證來存取 Kerberos 感知服務的唯一 Kerberos 身分。指定 security.protocolSASL_SSLsasl.mechanismGSSAPI 時,則需要此值。

範例

下列JSON範例定義 AWS IoT 規則中的 Apache Kafka 動作。下列範例會將 sourceIp() 內嵌函數傳遞為 Kafka 動作標頭中的替代範本

{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }

關於您 Kerberos 設定的重要注意事項

  • 您的金鑰分發中心 (KDC) 必須透過目標 中的私有網域名稱系統 (DNS) 進行解析VPC。其中一種可能的方法是將KDCDNS項目新增至私有託管區域。如需此方法的詳細資訊,請參閱使用私有託管區域

  • 每個 VPC 必須啟用DNS解析。如需詳細資訊,請參閱DNS搭配 使用 VPC

  • VPC 目的地中的網路介面安全群組和執行個體層級安全群組必須允許來自下列連接埠VPC之 內部的流量。

    • TCP 引導代理程式接聽程式連接埠上的流量 (通常為 9092,但必須在 9000–9100 範圍內)

    • TCP 和連接埠 88 上的UDP流量 KDC

  • SCRAM-SHA-512 是 cn-north-1、cn-northwest-1、 us-gov-east-1 和 us-gov-west-1 區域中唯一支援的安全機制。