Apache Kafka
Apache Kafka(Kafka)操作将消息直接发送到您的 Amazon Managed Streaming for Apache Kafka(Amazon MSK)、Confluent Cloud
注意
本主题假设您熟悉 Apache Kafka 平台及相关概念。有关 Apache Kafka 的更多信息,请参阅 Apache Kafka
要求
此规则操作具有以下要求:
-
AWS IoT 可以承担的 IAM 角色,以执行
ec2:CreateNetworkInterface
、ec2:DescribeNetworkInterfaces
、ec2:CreateNetworkInterfacePermission
、ec2:DeleteNetworkInterface
、ec2:DescribeSubnets
、ec2:DescribeVpcs
、ec2:DescribeVpcAttribute
和ec2:DescribeSecurityGroups
操作。此角色创建并管理您的 Amazon Virtual Private Cloud 弹性网络接口,以便联系您的 Kafka 代理。有关更多信息,请参阅 向 AWS IoT 规则授予所需的访问权限。在 AWS IoT 控制台中,您可以选择或创建一个角色以允许 AWS IoT Core 执行此规则操作。
有关网络接口的更多信息,请参阅《Amazon EC2 用户指南》中的弹性网络接口。
附加到您指定的角色的策略应如以下示例所示:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
如果您使用 AWS Secrets Manager 来存储所需的凭证以连接到 Kafka 代理,则必须创建一个 IAM 角色,以便 AWS IoT Core 可以承担该角色来执行
secretsmanager:GetSecretValue
和secretsmanager:DescribeSecret
操作。附加到您指定的角色的策略应如以下示例所示:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:
region
:123456789012
:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region
:123456789012
:secret:kafka_keytab-*" ] } ] }-
您可以在 Amazon Virtual Private Cloud (Amazon VPC) 中运行您的 Apache Kafka 集群。您必须创建 Amazon VPC 目标,然后在子网中使用 NAT 网关将来自 AWS IoT 的消息转发到公有 Kafka 集群。AWS IoT 规则引擎会在 VPC 目标中列出的每个子网中创建一个网络接口,以将流量直接路由到 VPC。当您创建 VPC 目标时,AWS IoT 规则引擎会自动创建 VPC 规则操作。有关 VPC 规则操作的更多信息,请参阅 Virtual Private Cloud (VPC) 目标。
-
如果您使用客户自主管理型 AWS KMS key(KMS 密钥)对加密静态数据,服务必须具有代表调用方使用 KMS 的权限。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的 Amazon MSK 加密。
参数
使用此操作创建 AWS IoT 规则时,您必须指定以下信息:
- destinationArn
VPC 目标的 Amazon Resource Name (ARN)。有关如何创建 VPC 目标的更多信息,请参阅 Virtual Private Cloud (VPC) 目标。
- topic
要发送到 Kafka 代理的消息的 Kafka 主题。
您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板。
- 键(可选)
Kafka 消息键。
您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板。
- 标头(可选)
-
您指定的 Kafka 标头的列表。每个标头都是一个键/值对,您可以在创建 Kafka 操作时指定该键值对。您可以使用这些标头将数据从 IoT 客户端路由到下游 Kafka 集群,而无需修改消息有效负载。
您可以使用替代模板替换此字段。要了解如何在 Kafka 操作的标头中将内联规则的函数作为替换模板传递,请参阅示例。有关更多信息,请参阅 替换模板。
注意
不支持二进制格式的标头。
- 分区(可选)
Kafka 消息分区。
您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板。
- clientProperties
定义 Apache Kafka 生成器客户端属性的对象。
- acks(可选)
生成器在考虑请求完成之前要求服务器收到的确认数。
如果指定 0 作为值,则生产者将不会等待来自服务器的任何确认。如果服务器没有收到该消息,则生成器将不会重试发送该消息。
有效值:
-1
、0
、1
、all
。默认值为1
。- bootstrap.servers
主机和端口对列表(例如
host1:port1
、host2:port2
)用于建立到 Kafka 集群的初始连接。- compression.type(可选)
生成器生成的所有数据的压缩类型。
有效值:
none
、gzip
、snappy
、lz4
、zstd
。默认值为none
。- security.protocol
用于连接到您的 Kafka 代理的安全协议。
有效值:
SSL
、SASL_SSL
。默认值为SSL
。- key.serializer
指定如何将您使用
ProducerRecord
提供的键对象转换为字节。有效值:
StringSerializer
。- value.serializer
指定如何将您使用
ProducerRecord
提供的值对象转换为字节。有效值:
ByteBufferSerializer
。- ssl.truststore
base64 格式的信任库文件或位于 AWS Secrets Manager 的信任库文件位置。如果您的信任存储受到 Amazon 证书授权机构(CA)的认证,则不需要此值。
此字段支持替换模板。如果使用 Secrets Manager 存储连接到 Kafka 代理所需的凭证,则可以使用
get_secret
SQL 函数检索此字段的值。有关替换模板的更多信息,请参阅 替换模板。有关get_secret
SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。如果信任库采用文件的形式,请使用SecretBinary
参数。如果信任库采用字符串的形式,请使用SecretString
参数。此值最大为 65 KB。
- ssl.truststore.password
信任库存储的密码。仅当您为信任库创建了密码时,才需要此值。
- ssl.keystore
密钥库文件。当您指定
SSL
作为security.protocol
的值时,才需要此值。此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用
get_secret
SQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关get_secret
SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用SecretBinary
参数。- ssl.keystore.password
密钥库文件存储的密码。如果为
ssl.keystore
指定了值,则需要此值。此字段的值可以是纯文本。此字段还支持替代模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用
get_secret
SQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关get_secret
SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用SecretString
参数。- ssl.key.password
密钥库文件中私有密钥的密码。
此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用
get_secret
SQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关get_secret
SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用SecretString
参数。- sasl.mechanism
用于连接到您的 Kafka 代理的安全机制。当您为
security.protocol
指定SASL_SSL
时,则需要此值。有效值:
PLAIN
、SCRAM-SHA-512
、GSSAPI
。注意
SCRAM-SHA-512
是 cn-north-1、cn-north-1、cn-northwest-1、us-gov-east-1 和 us-gov-west-1 区域中唯一支持的安全机制。- sasl.plain.username
用于从 Secrets Manager 中检索密钥字符串的用户名。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定PLAIN
时,则需要此值。- sasl.plain.password
用于从 Secrets Manager 中检索密钥字符串的密码。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定PLAIN
时,则需要此值。- sasl.scram.username
用于从 Secrets Manager 中检索密钥字符串的用户名。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定SCRAM-SHA-512
时,则需要此值。- sasl.scram.password
用于从 Secrets Manager 中检索密钥字符串的密码。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定SCRAM-SHA-512
时,则需要此值。- sasl.kerberos.keytab
Secrets Manager 中用于 Kerberos 身份验证的 keytab 文件。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定GSSAPI
时,则需要此值。此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用
get_secret
SQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关get_secret
SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用SecretBinary
参数。- sasl.kerberos.service.name
Apache Kafka 运行的 Kerberos 主要名称。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定GSSAPI
时,则需要此值。- sasl.kerberos.krb5.kdc
您的 Apache Kafka 生成器客户端连接到的密钥分配中心 (KDC) 的主机名。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定GSSAPI
时,则需要此值。- sasl.kerberos.krb5.realm
您的 Apache Kafka 生成器客户端连接到的领域。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定GSSAPI
时,则需要此值。- sasl.kerberos.principal
Kerberos 可以为其分配票证以访问 Kerberos 感知服务的唯一 Kerberos 身份。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定GSSAPI
时,则需要此值。
示例
下面的 JSON 示例介绍了如何在 AWS IoT 规则中定义 Apache Kafka 操作:以下示例将 sourceIp() 内联函数作为替换模板传递到 Kafka 操作标头中。
{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }
有关 Kerberos 设置的重要注意事项
您的密钥分配中心 (KDC) 必须通过目标 VPC 内的私有域名系统 (DNS) 进行解析。一种可能的方法是将 KDC DNS 条目添加到私有托管区域。有关此方法的更多信息,请参阅如何使用私有托管区域。
每个 VPC 都必须启用 DNS 解析。有关更多信息,请参阅将 DNS 与您的 VPC 一起使用。
VPC 目标中的网络接口安全组和实例级安全组必须允许来自 VPC 内部以下端口的流量。
引导代理侦听器端口上的 TCP 流量(通常为 9092,但必须在 9000–9100 范围内)
KDC 端口 88 上的 TCP 和 UDP 流量
SCRAM-SHA-512
是 cn-north-1、cn-north-1、cn-northwest-1、us-gov-east-1 和 us-gov-west-1 区域中唯一支持的安全机制。