Apache Kafka - AWS IoT Core

Apache Kafka

Apache Kafka(Kafka)操作将消息直接发送到您的 Amazon Managed Streaming for Apache Kafka(Amazon MSK)、Confluent Cloud 等第三方提供商托管的 Apache Kafka 集群,或自行管理的 Apache Kafka 集群。通过 Kafka 规则操作,您可以将您的 IoT 数据路由到 Kafka 集群。这使您能够出于各种目的构建高性能数据管道,例如流分析、数据集成、可视化和任务关键型业务应用程序等。

注意

本主题假设您熟悉 Apache Kafka 平台及相关概念。有关 Apache Kafka 的更多信息,请参阅 Apache Kafka。不支持 MSK Serverless。MSK Serverless 集群只能通过 IAM 身份验证完成,而 Apache Kafka 规则操作目前不支持该身份验证。有关如何使用 Confluent 配置 AWS IoT Core 的更多信息,请参阅 Leveraging Confluent and AWS to Solve IoT Device and Data Management Challenges

要求

此规则操作具有以下要求:

  • AWS IoT 可以承担的 IAM 角色,以执行 ec2:CreateNetworkInterfaceec2:DescribeNetworkInterfacesec2:CreateNetworkInterfacePermissionec2:DeleteNetworkInterfaceec2:DescribeSubnetsec2:DescribeVpcsec2:DescribeVpcAttributeec2:DescribeSecurityGroups 操作。此角色创建并管理您的 Amazon Virtual Private Cloud 弹性网络接口,以便联系您的 Kafka 代理。有关更多信息,请参阅 向 AWS IoT 规则授予所需的访问权限

    在 AWS IoT 控制台中,您可以选择或创建一个角色以允许 AWS IoT Core 执行此规则操作。

    有关网络接口的更多信息,请参阅《Amazon EC2 用户指南》中的弹性网络接口

    附加到您指定的角色的策略应如以下示例所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • 如果您使用 AWS Secrets Manager 来存储所需的凭证以连接到 Kafka 代理,则必须创建一个 IAM 角色,以便 AWS IoT Core 可以承担该角色来执行 secretsmanager:GetSecretValuesecretsmanager:DescribeSecret 操作。

    附加到您指定的角色的策略应如以下示例所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:region:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region:123456789012:secret:kafka_keytab-*" ] } ] }
  • 您可以在 Amazon Virtual Private Cloud (Amazon VPC) 中运行您的 Apache Kafka 集群。您必须创建 Amazon VPC 目标,然后在子网中使用 NAT 网关将来自 AWS IoT 的消息转发到公有 Kafka 集群。AWS IoT 规则引擎会在 VPC 目标中列出的每个子网中创建一个网络接口,以将流量直接路由到 VPC。当您创建 VPC 目标时,AWS IoT 规则引擎会自动创建 VPC 规则操作。有关 VPC 规则操作的更多信息,请参阅 Virtual Private Cloud (VPC) 目标

  • 如果您使用客户自主管理型 AWS KMS key(KMS 密钥)对加密静态数据,服务必须具有代表调用方使用 KMS 的权限。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的 Amazon MSK 加密

参数

使用此操作创建 AWS IoT 规则时,您必须指定以下信息:

destinationArn

VPC 目标的 Amazon Resource Name (ARN)。有关如何创建 VPC 目标的更多信息,请参阅 Virtual Private Cloud (VPC) 目标

topic

要发送到 Kafka 代理的消息的 Kafka 主题。

您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板

键(可选)

Kafka 消息键。

您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板

标头(可选)

您指定的 Kafka 标头的列表。每个标头都是一个键/值对,您可以在创建 Kafka 操作时指定该键值对。您可以使用这些标头将数据从 IoT 客户端路由到下游 Kafka 集群,而无需修改消息有效负载。

您可以使用替代模板替换此字段。要了解如何在 Kafka 操作的标头中将内联规则的函数作为替换模板传递,请参阅示例。有关更多信息,请参阅 替换模板

注意

不支持二进制格式的标头。

分区(可选)

Kafka 消息分区。

您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板

clientProperties

定义 Apache Kafka 生成器客户端属性的对象。

acks(可选)

生成器在考虑请求完成之前要求服务器收到的确认数。

如果指定 0 作为值,则生产者将不会等待来自服务器的任何确认。如果服务器没有收到该消息,则生成器将不会重试发送该消息。

有效值:-101all。默认值为 1

bootstrap.servers

主机和端口对列表(例如 host1:port1host2:port2)用于建立到 Kafka 集群的初始连接。

compression.type(可选)

生成器生成的所有数据的压缩类型。

有效值:nonegzipsnappylz4zstd。默认值为 none

security.protocol

用于连接到您的 Kafka 代理的安全协议。

有效值:SSLSASL_SSL。默认值为 SSL

key.serializer

指定如何将您使用 ProducerRecord 提供的键对象转换为字节。

有效值:StringSerializer

value.serializer

指定如何将您使用 ProducerRecord 提供的值对象转换为字节。

有效值:ByteBufferSerializer

ssl.truststore

base64 格式的信任库文件或位于 AWS Secrets Manager 的信任库文件位置。如果您的信任存储受到 Amazon 证书授权机构(CA)的认证,则不需要此值。

此字段支持替换模板。如果使用 Secrets Manager 存储连接到 Kafka 代理所需的凭证,则可以使用 get_secret SQL 函数检索此字段的值。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。如果信任库采用文件的形式,请使用 SecretBinary 参数。如果信任库采用字符串的形式,请使用 SecretString 参数。

此值最大为 65 KB。

ssl.truststore.password

信任库存储的密码。仅当您为信任库创建了密码时,才需要此值。

ssl.keystore

密钥库文件。当您指定 SSL 作为 security.protocol 的值时,才需要此值。

此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用 get_secret SQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用 SecretBinary 参数。

ssl.keystore.password

密钥库文件存储的密码。如果为 ssl.keystore 指定了值,则需要此值。

此字段的值可以是纯文本。此字段还支持替代模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用 get_secret SQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用 SecretString 参数。

ssl.key.password

密钥库文件中私有密钥的密码。

此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用 get_secret SQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用 SecretString 参数。

sasl.mechanism

用于连接到您的 Kafka 代理的安全机制。当您为 security.protocol 指定 SASL_SSL 时,则需要此值。

有效值:PLAINSCRAM-SHA-512GSSAPI

注意

SCRAM-SHA-512 是 cn-north-1、cn-north-1、cn-northwest-1、us-gov-east-1 和 us-gov-west-1 区域中唯一支持的安全机制。

sasl.plain.username

用于从 Secrets Manager 中检索密钥字符串的用户名。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 PLAIN 时,则需要此值。

sasl.plain.password

用于从 Secrets Manager 中检索密钥字符串的密码。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 PLAIN 时,则需要此值。

sasl.scram.username

用于从 Secrets Manager 中检索密钥字符串的用户名。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 SCRAM-SHA-512 时,则需要此值。

sasl.scram.password

用于从 Secrets Manager 中检索密钥字符串的密码。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 SCRAM-SHA-512 时,则需要此值。

sasl.kerberos.keytab

Secrets Manager 中用于 Kerberos 身份验证的 keytab 文件。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用 get_secret SQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用 SecretBinary 参数。

sasl.kerberos.service.name

Apache Kafka 运行的 Kerberos 主要名称。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

sasl.kerberos.krb5.kdc

您的 Apache Kafka 生成器客户端连接到的密钥分配中心 (KDC) 的主机名。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

sasl.kerberos.krb5.realm

您的 Apache Kafka 生成器客户端连接到的领域。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

sasl.kerberos.principal

Kerberos 可以为其分配票证以访问 Kerberos 感知服务的唯一 Kerberos 身份。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

示例

下面的 JSON 示例介绍了如何在 AWS IoT 规则中定义 Apache Kafka 操作:以下示例将 sourceIp() 内联函数作为替换模板传递到 Kafka 操作标头中。

{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }

有关 Kerberos 设置的重要注意事项

  • 您的密钥分配中心 (KDC) 必须通过目标 VPC 内的私有域名系统 (DNS) 进行解析。一种可能的方法是将 KDC DNS 条目添加到私有托管区域。有关此方法的更多信息,请参阅如何使用私有托管区域

  • 每个 VPC 都必须启用 DNS 解析。有关更多信息,请参阅将 DNS 与您的 VPC 一起使用

  • VPC 目标中的网络接口安全组和实例级安全组必须允许来自 VPC 内部以下端口的流量。

    • 引导代理侦听器端口上的 TCP 流量(通常为 9092,但必须在 9000–9100 范围内)

    • KDC 端口 88 上的 TCP 和 UDP 流量

  • SCRAM-SHA-512 是 cn-north-1、cn-north-1、cn-northwest-1、us-gov-east-1 和 us-gov-west-1 区域中唯一支持的安全机制。