本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
Apache Kafka
Apache Kafka (Kafka) 動作會將訊息直接傳送到您的 Amazon Managed Streaming for Apache Kafka (Amazon MSK)、由 Confluent Cloud
注意
本主題假設您熟悉 Apache Kafka 平台及相關概念。如需有關 Apache Kafka 的詳細資訊,請參閱 Apache Kafka
要求
此規則動作具有下列需求:
-
AWS IoT 可以擔任IAM的角色,以執行
ec2:CreateNetworkInterface
、ec2:DescribeNetworkInterfaces
、ec2:CreateNetworkInterfacePermission
、ec2:DeleteNetworkInterface
、ec2:DescribeSubnets
、ec2:DescribeVpcs
、ec2:DescribeVpcAttribute
和ec2:DescribeSecurityGroups
操作。此角色會建立並管理您 Amazon Virtual Private Cloud 的彈性網路介面,以達到您的 Kafka 代理程式。如需詳細資訊,請參閱授予 AWS IoT 規則所需的存取權。在 AWS IoT 主控台中,您可以選擇或建立允許 AWS IoT Core 執行此規則動作的角色。
如需網路介面的詳細資訊,請參閱 Amazon EC2使用者指南 中的彈性網路介面。
連接至您指定之角色的政策應如下列範例所示:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
如果您使用 AWS Secrets Manager 來存放連線至 Kafka 代理程式所需的憑證,則必須建立 AWS IoT Core 可擔任IAM的角色,以執行
secretsmanager:GetSecretValue
和secretsmanager:DescribeSecret
操作。連接至您指定之角色的政策應如下列範例所示:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:
region
:123456789012
:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region
:123456789012
:secret:kafka_keytab-*" ] } ] }-
您可以在 Amazon Virtual Private Cloud (Amazon ) 內執行 Apache Kafka 叢集VPC。您必須建立 Amazon VPC目的地,並在子網路中使用NAT閘道,將訊息從 轉送 AWS IoT 到公有 Kafka 叢集。 AWS IoT 規則引擎會在VPC目的地中列出的每個子網路中建立網路介面,以將流量直接路由至 VPC。當您建立VPC目的地時, AWS IoT 規則引擎會自動建立VPC規則動作。如需VPC規則動作的詳細資訊,請參閱 虛擬私有雲端 (VPC) 目的地。
-
如果您使用客戶受管 AWS KMS key (KMS 金鑰) 加密靜態資料,服務必須具有代表來電者使用 KMS金鑰的許可。如需詳細資訊,請參閱 Amazon Managed Streaming for Apache Kafka 開發人員指南 中的 Amazon MSK加密。
參數
使用此動作建立 AWS IoT 規則時,您必須指定下列資訊:
- destinationArn
VPC 目的地的 Amazon Resource Name (ARN)。如需建立VPC目的地的相關資訊,請參閱 虛擬私有雲端 (VPC) 目的地。
- 主題
要傳送至 Kafka 代理程式之訊息的 Kafka 主題。
您可使用替代範本來替代此欄位。如需詳細資訊,請參閱替代範本。
- 金鑰 (選用)
Kafka 訊息金鑰。
您可使用替代範本來替代此欄位。如需詳細資訊,請參閱替代範本。
- 標頭 (選用)
-
您指定的 Kafka 標頭清單。每個標頭都是鍵值對,您可以在建立 Kafka 動作時指定。您可以使用這些標頭將資料從 IoT 用戶端路由到下游 Kafka 叢集,而不需修改訊息承載。
您可使用替代範本來替代此欄位。若要了解如何在 Kafka 動作標頭中傳遞內嵌規則的函數作為替換範本,請參閱範例。如需詳細資訊,請參閱替代範本。
注意
不支援二進位格式的標頭。
- 分割區 (選用)
Kafka 訊息分割區。
您可使用替代範本來替代此欄位。如需詳細資訊,請參閱替代範本。
- clientProperties
定義 Apache Kafka 生產者用戶端屬性的物件。
- acks (選用)
生產者要求伺服器在考慮請求完成之前所收到的確認數目。
如果您指定 0 作為值,生產者將不會等待伺服器的任何確認。若伺服器並未收到訊息,生產者不會重試傳送訊息。
有效值:
-1
、0
、1
、all
。預設值為1
。- bootstrap.servers
用來建立 Kafka 叢集初始連線的主機和連接埠配對清單 (例如
host1:port1
、host2:port2
)。- compression.type (選用)
生產者所產生所有資料的壓縮類型。
有效值:
none
、gzip
、snappy
、lz4
、zstd
。預設值為none
。- security.protocol
用來連接至您 Kafka 代理程式的安全通訊協定。
有效值:
SSL
、SASL_SSL
。預設值為SSL
。- key.serializer
指定如何將與
ProducerRecord
一起提供的金鑰物件轉換為位元組。有效值:
StringSerializer
。- value.serializer
指定如何將與
ProducerRecord
一起提供的值物件轉換為位元組。有效值:
ByteBufferSerializer
。- ssl.truststore
base64 格式的信任庫檔案或在 AWS Secrets Manager 中的信任庫檔案位置。若 Amazon 憑證授權機構 (CA) 信任您的信任存放區,則不需要此值。
此欄位支援替代範本。如果您使用 Secrets Manager 來存放連線到 Kafka 代理程式所需的憑證,您可以使用
get_secret
SQL函數來擷取此欄位的值。如需替代範本的詳細資訊,請參閱 替代範本。如需get_secret
SQL函數的詳細資訊,請參閱 get_secret(secretId, secretType, key, roleArn)。若信任庫為檔案形式,請使用SecretBinary
參數。若信任庫為字串形式,請使用SecretString
參數。此值的最大大小為 65 KB。
- ssl.truststore.password
信任庫的密碼。只有在您已建立信任庫的密碼時,才需要此值。
- ssl.keystore
金鑰存放區檔案。當您指定
SSL
為security.protocol
的值時,則需要此值。此欄位支援替代範本。使用 Secrets Manager 來存放連接至 Kafka 代理程式所需的憑證。若要擷取此欄位的值,請使用
get_secret
SQL函數。如需替代範本的詳細資訊,請參閱 替代範本。如需get_secret
SQL函數的詳細資訊,請參閱 get_secret(secretId, secretType, key, roleArn)。使用SecretBinary
參數。- ssl.keystore.password
金鑰存放區檔案的存放區密碼。若指定
ssl.keystore
的值,則需要此值。此欄位的值可以是純文字。此欄位也支援替代範本。使用 Secrets Manager 來存放連接至 Kafka 代理程式所需的憑證。若要擷取此欄位的值,請使用
get_secret
SQL函數。如需替代範本的詳細資訊,請參閱 替代範本。如需get_secret
SQL函數的詳細資訊,請參閱 get_secret(secretId, secretType, key, roleArn)。使用SecretString
參數。- ssl.key.password
金鑰存放區檔案中私鑰的密碼。
此欄位支援替代範本。使用 Secrets Manager 來存放連接至 Kafka 代理程式所需的憑證。若要擷取此欄位的值,請使用
get_secret
SQL函數。如需替代範本的詳細資訊,請參閱 替代範本。如需get_secret
SQL函數的詳細資訊,請參閱 get_secret(secretId, secretType, key, roleArn)。使用SecretString
參數。- sasl.mechanism
用來連接至 Kafka 代理程式的安全機制。指定
security.protocol
的SASL_SSL
時,則需要此值。有效值:
PLAIN
、SCRAM-SHA-512
、GSSAPI
。注意
SCRAM-SHA-512
是 cn-north-1、cn-northwest-1、 us-gov-east-1 和 us-gov-west-1 區域中唯一支援的安全機制。- sasl.plain.username
用來從 Secrets Manager 擷取秘密字串的使用者名稱。指定
security.protocol
的SASL_SSL
和sasl.mechanism
的PLAIN
時,則需要此值。- sasl.plain.password
用於從 Secrets Manager 擷取秘密字串的密碼。指定
security.protocol
的SASL_SSL
和sasl.mechanism
的PLAIN
時,則需要此值。- sasl.scram.username
用來從 Secrets Manager 擷取秘密字串的使用者名稱。指定
security.protocol
的SASL_SSL
和sasl.mechanism
的SCRAM-SHA-512
時,則需要此值。- sasl.scram.password
用於從 Secrets Manager 擷取秘密字串的密碼。指定
security.protocol
的SASL_SSL
和sasl.mechanism
的SCRAM-SHA-512
時,則需要此值。- sasl.kerberos.keytab
Secrets Manager 中 Kerberos 驗證的 keytab 檔案。指定
security.protocol
的SASL_SSL
和sasl.mechanism
的GSSAPI
時,則需要此值。此欄位支援替代範本。使用 Secrets Manager 來存放連接至 Kafka 代理程式所需的憑證。若要擷取此欄位的值,請使用
get_secret
SQL函數。如需替代範本的詳細資訊,請參閱 替代範本。如需get_secret
SQL函數的詳細資訊,請參閱 get_secret(secretId, secretType, key, roleArn)。使用SecretBinary
參數。- sasl.kerberos.service.name
在 Apache Kafka 執行之下的 Kerberos 委託人名稱。指定
security.protocol
的SASL_SSL
和sasl.mechanism
的GSSAPI
時,則需要此值。- sasl.kerberos.krb5.kdc
Apache Kafka 生產者用戶端所連線之金鑰分發中心的主機名稱 (KDC)。指定
security.protocol
的SASL_SSL
和sasl.mechanism
的GSSAPI
時,則需要此值。- sasl.kerberos.krb5.realm
您的 Apache Kafka 生產者用戶端連線的領域。指定
security.protocol
的SASL_SSL
和sasl.mechanism
的GSSAPI
時,則需要此值。- sasl.kerberos.principal
Kerberos 可指派票證來存取 Kerberos 感知服務的唯一 Kerberos 身分。指定
security.protocol
的SASL_SSL
和sasl.mechanism
的GSSAPI
時,則需要此值。
範例
下列JSON範例定義 AWS IoT 規則中的 Apache Kafka 動作。下列範例會將 sourceIp() 內嵌函數傳遞為 Kafka 動作標頭中的替代範本。
{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }
關於您 Kerberos 設定的重要注意事項
您的金鑰分發中心 (KDC) 必須透過目標 中的私有網域名稱系統 (DNS) 進行解析VPC。其中一種可能的方法是將KDCDNS項目新增至私有託管區域。如需此方法的詳細資訊,請參閱使用私有託管區域。
每個 VPC 必須啟用DNS解析。如需詳細資訊,請參閱DNS搭配 使用 VPC。
VPC 目的地中的網路介面安全群組和執行個體層級安全群組必須允許來自下列連接埠VPC之 內部的流量。
TCP 引導代理程式接聽程式連接埠上的流量 (通常為 9092,但必須在 9000–9100 範圍內)
TCP 和連接埠 88 上的UDP流量 KDC
SCRAM-SHA-512
是 cn-north-1、cn-northwest-1、 us-gov-east-1 和 us-gov-west-1 區域中唯一支援的安全機制。