将 Amazon Managed Streaming for Apache Kafka 主题定义为 EventBridge 管道的源 - Amazon EventBridge

将 Amazon Managed Streaming for Apache Kafka 主题定义为 EventBridge 管道的源

您可以使用 EventBridge Pipes 从 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 主题接收记录。您可以选择筛选或增强这些记录,然后再将它们发送到可用的目标之一进行处理。在设置管道时,可以选择特定于 Amazon MSK 的设置。将数据发送到目标时,EventBridge Pipes 会保持消息代理中记录的顺序。

Amazon MSK 是一项完全托管的服务,可构建并运行应用程序,使用 Apache Kafka 处理流数据。Amazon MSK 简化了运行 Apache Kafka 的集群的设置、扩展和管理。您可以使用 Amazon MSK 配置您的应用程序,以适用于多个可用区并保证 AWS Identity and Access Management (IAM) 的安全性。Amazon MSK 支持多个开源版本的 Kafka。

Amazon MSK 作为源,运行方式与使用 Amazon Simple Queue Service (Amazon SQS) 或 Amazon Kinesis 相似。EventBridge 在内部轮询来自源的新消息,然后同步调用目标。EventBridge 批量读取消息,并将这些消息作为事件有效负载提供给您的函数。最大批处理大小可配置。(默认值为 100 个消息。)

对于基于 Apache Kafka 的源,EventBridge 支持处理控制参数,例如批处理时段和批次大小。

EventBridge 按顺序读取各个分区的消息。EventBridge 处理各个批次后,会提交该批次中消息的偏移量。如果管道的目标为批次中的任何消息返回错误,EventBridge 将重试整批消息,直到处理成功或消息过期为止。

EventBridge 调用目标时会在事件中发送一批消息。事件负载包含一个消息数组。每个数组项目都包含 Amazon MSK 主题和分区标识符的详细信息,以及时间戳和 base64 编码的消息。

示例事件

以下示例事件显示了管道接收到的信息。您可以使用此事件来创建和筛选您的事件模式,或定义输入转换。并非所有字段都可以筛选。有关可筛选字段的更多信息,请参阅 Amazon EventBridge 管道中的事件筛选

[ { "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "eventSourceKey": "mytopic-0", "topic": "mytopic", "partition": "0", "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ]

轮询和流的起始位置

请注意,管道创建和更新期间的流源轮询最终将是一致的。

  • 在管道创建期间,可能需要几分钟才能开始轮询来自流的事件。

  • 在管道更新源轮询配置期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。

这意味着,如果您指定 LATEST 作为流的起始位置,在创建或更新管道期间,管道可能会错过发送的事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON

MSK 集群身份验证

EventBridge 需要访问 Amazon MSK 集群、检索记录和执行其他任务的权限。Amazon MSK 支持通过多种选项来控制客户端对 MSK 集群的访问。有关在哪种情况下使用哪种身份验证方法的更多信息,请参阅 EventBridge 如何选择引导代理

未经身份验证的访问

我们建议仅使用未经身份验证的访问权限进行开发。仅当集群禁用基于 IAM 角色的身份验证时,未经身份验证的访问权限才有效。

SASL/SCRAM 身份验证

Amazon MSK 支持使用传输层安全性协议(TLS)加密进行简单身份验证和安全层/加盐质疑应答身份验证机制(SASL/SCRAM)身份验证。为使 EventBridge 连接到集群,可以将身份验证凭证(登录凭证)存储在 AWS Secrets Manager 密钥中。

有关使用 Secrets Manager 的更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的使用 AWS Secrets Manager 进行用户名和密码身份验证

Amazon MSK 不支持 SASL/PLAIN 身份验证。

基于 IAM 角色的身份验证

您可以使用 IAM 来验证连接到 MSK 集群的客户端的身份。如果 IAM 身份验证在您的 MSK 集群上处于活动状态,并且您没有为身份验证提供密钥,EventBridge 将自动默认使用 IAM 身份验证。要创建和部署基于 IAM 用户或角色的策略,请使用 IAM 控制台或 API。有关更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 IAM 访问控制

要允许 EventBridge 连接到 MSK 集群、读取记录和执行其他必要操作,请将以下权限添加到管道的执行角色。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid", "arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name", "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/consumer-group-id" ] } ] }

您可以将这些权限范围限定为特定集群、主题和组。有关更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 Amazon MSK Kafka 操作

双向 TLS 身份验证

双向 TLS(mTLS)在客户端和服务器之间提供双向身份验证。客户端向服务器发送证书以便服务器验证客户端,而服务器又向客户端发送证书以便客户端验证服务器。

对于 Amazon MSK,EventBridge 充当客户端。您可以配置客户端证书(作为 Secrets Manager 中的密钥),使用 MSK 集群中的代理对 EventBridge 进行身份验证。客户端证书必须由服务器信任存储中的证书颁发机构 (CA) 签名。MSK 集群会向 EventBridge 发送服务器证书,以便使用 EventBridge 对代理进行身份验证。服务器证书必须由 AWS 信任存储中的 CA 签名。

Amazon MSK 不支持自签名服务器证书,因为 Amazon MSK 中的所有代理都使用由 Amazon Trust Services CA 签名的公有证书(EventBridge 默认信任这些证书)。

有关适用于 Amazon MSK 的 mTLS 的更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的双向 TLS 身份验证

配置 mTLS 密钥

CLIENT_CERTIFICATE_TLS_AUTH 密钥需要证书字段和私有密钥字段。对于加密的私有密钥,密钥需要私有密钥密码。证书和私有密钥必须采用 PEM 格式。

注意

EventBridge 支持 PBES1(而不是 PBES2)私有密钥加密算法。

证书字段必须包含证书列表,首先是客户端证书,然后是任何中间证书,最后是根证书。每个证书都必须按照以下结构在新行中启动:

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager 支持最多包含 65536 字节的密钥,这为长证书链提供了充足的空间。

私有密钥必须采用 PKCS #8 格式,并具有以下结构:

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

对于加密的私有密钥,请使用以下结构:

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

以下示例显示使用加密私有密钥进行 mTLS 身份验证的密钥内容。对于加密的私有密钥,您可以在密钥中包含私有密钥密码。

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

EventBridge 如何选择引导代理

EventBridge 根据集群中可用的身份验证方法,以及您是否提供用于身份验证的密钥来选择引导代理。如果您为 mTLS 或 SASL/SCRAM 提供了密钥,EventBridge 将自动选择该身份验证方法。如果不提供密钥,EventBridge 会选择在集群中处于活动状态的最强身份验证方法。下面是 EventBridge 选择代理的优先级顺序,从最强到最弱的身份验证:

  • mTLS(为 mTLS 提供的密钥)

  • SASL/SCRAM(为 SASL/SCRAM 提供的密钥)

  • SASL IAM(未提供密钥,IAM 身份验证处于活动状态)

  • 未经身份验证的 TLS(未提供密钥,IAM 身份验证未处于活动状态)

  • 纯文本(未提供密钥,IAM 身份验证和未经身份验证的 TLS 均未处于活动状态)

注意

如果 EventBridge 无法连接到最安全的代理类型,则不会尝试连接到其他(较弱)代理类型。如果希望 EventBridge 选择较弱的代理类型,请停用集群中所有更强的身份验证方法。

网络配置

EventBridge 必须具有对与 Amazon MSK 集群关联的 Amazon Virtual Private Cloud (Amazon VPC) 资源的访问权限。

  • 要访问 Amazon MSK 集群的 VPC,EventBridge 可以使用源子网的出站互联网访问权限。对于私有子网,它可以是 NAT 网关,也可以是您自己的 NAT。确保此 NAT 具有公共 IP 地址,可以连接到互联网。对于公有子网,必须使用 VPC 端点(详见下文)。

  • EventBridge 管道还支持通过 AWS PrivateLink 传送事件,这样您无需通过公共互联网就可以将事件从位于 Amazon Virtual Private Cloud(Amazon VPC)的事件源发送到管道目标。您可以使用管道从 Amazon Managed Streaming for Apache Kafka(Amazon MSK)、自托管 Apache Kafka 以及位于私有子网中的 Amazon MQ 源进行轮询,而无需部署互联网网关、配置防火墙规则或设置代理服务器。您还可以使用 VPC 端点来支持从公有子网中的 Kafka 集群传送事件。

    要设置 VPC 端点,请参阅《AWS PrivateLink用户指南》中的创建 VPC 端点。对于服务名称,选择 com.amazonaws.region.pipes-data

使用以下规则配置您的 Amazon VPC 安全组(至少):

  • 入站规则 – 允许 Amazon MSK 代理端口上为源指定的安全组的所有流量。

  • 出站规则 – 允许所有目标的端口 443 上的所有流量传输。允许 Amazon MSK 代理端口上为源指定的安全组的所有流量。

    代理端口包括:

    • 9092(适用于明文)

    • 9094(适用于 TLS)

    • 9096(适用于 SASL)

    • 9098(适用于 IAM)

注意

可通过 Amazon MSK API 发现您的 Amazon VPC 配置。在设置过程中,您不需要对其进行配置。

可自定义的使用者组 ID

将 Apache Kafka 设置为事件源时,您可以指定使用者组 ID。此使用者组 ID 是您希望管道加入的 Apache Kafka 使用者组的现有标识符。您可以使用此功能将任何正在进行的 Apache Kafka 记录处理设置从其他使用者迁移到 EventBridge。

如果指定了使用者组 ID,并且该使用者组中还有其他活跃的轮询器,则 Apache Kafka 会向所有使用者分发消息。换句话说,EventBridge 不会收到 Apache Kafka 主题的所有消息。如果希望 EventBridge 处理主题中的所有消息,请关闭该使用者组中的任何其他轮询器。

此外,如果指定了使用者组 ID,而 Apache Kafka 找到了具有相同 ID 的有效现有使用者组,则 EventBridge 会忽略管道的 StartingPosition 参数。相反,EventBridge 开始根据使用者组的已提交偏移量处理记录。如果指定了使用者组 ID,而 Apache Kafka 找不到现有使用者组,则 EventBridge 会使用指定的 StartingPosition 配置源。

在所有 Apache Kafka 事件源中,您指定的使用者组 ID 必须是唯一的。在使用指定的使用者组 ID 创建管道后,无法更新此值。

Amazon MSK 源的自动扩缩

当您最初创建 Amazon MSK 源时,EventBridge 会分配一个使用者来处理 Apache Kafka 主题中的所有分区。每个使用者都使用多个并行运行的处理器来处理增加的工作负载。此外,EventBridge 会根据工作负载自动增加或缩减使用者的数量。要保留每个分区中的消息顺序,使用者的最大数量为主题中每个分区一个使用者。

EventBridge 会按一分钟的间隔时间来评估主题中所有分区的使用者偏移滞后。如果延迟太高,则分区接收消息的速度比 EventBridge 处理消息的速度更快。如有必要,EventBridge 会在主题中添加或删除使用者。增加或移除使用者的扩缩过程会在评估完成后的三分钟内进行。

如果您的目标过载,EventBridge 会减少使用者的数量。此操作通过减少使用者可以检索和发送到管道的消息数,来减少管道的工作负载。