Apache Kafka 流作为 EventBridge 管道中的源 - Amazon EventBridge

Apache Kafka 流作为 EventBridge 管道中的源

Apache Kafka 是开源事件流平台,支持数据管道和流分析等工作负载。您可以使用 Amazon Managed Streaming for Apache Kafka(Amazon MSK)或自托管 Apache Kafka 集群。用 AWS 术语来讲,自托管集群是指未由 AWS 托管的任何 Apache Kafka 集群。这包括您自己管理的集群,以及由第三方提供商(例如 Confluent CloudCloudKarafkaRedpanda)托管的集群。

有关集群中 AWS 托管的其他选项的更多信息,请参阅 AWS Big Data 博客中的关于在 AWS 上运行 Apache Kafka 的最佳实践

Apache Kafka 作为源,运行方式与使用 Amazon Simple Queue Service(Amazon SQS)或 Amazon Kinesis 相似。EventBridge 在内部轮询来自源的新消息,然后同步调用目标。EventBridge 批量读取消息,并将这些消息作为事件有效负载提供给您的函数。最大批处理大小可配置。(默认值为 100 个消息。)

对于基于 Apache Kafka 的源,EventBridge 支持处理控制参数,例如批处理时段和批次大小。

EventBridge 调用管道时会在事件参数中发送一批消息。事件负载包含一个消息数组。每个数组项目都包含 Apache Kafka 主题和 Apache Kafka 分区标识符的详细信息,以及时间戳和 base64 编码的消息。

示例事件

以下示例事件显示了管道接收到的信息。您可以使用此事件来创建和筛选您的事件模式,或定义输入转换。并非所有字段都可以筛选。有关可筛选字段的更多信息,请参阅 Amazon EventBridge 管道中的事件筛选

[ { "eventSource": "SelfManagedKafka", "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "eventSourceKey": "mytopic-0", "topic": "mytopic", "partition": 0, "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ]

Apache Kafka 集群身份验证

EventBridge Pipes 支持多种方法来使用自托管 Apache Kafka 集群进行身份验证。请确保将 Apache Kafka 集群配置为使用支持的下列身份验证方法之一。有关 Apache Kafka 安全的更多信息,请参阅 Apache Kafka 文档的安全部分。

VPC 访问

如果使用的是自托管 Apache Kafka 且其中只有 VPC 中的 Apache Kafka 用户可以访问 Apache Kafka 代理,则必须在 Apache Kafka 源中配置 Amazon Virtual Private Cloud(Amazon VPC)。

SASL/SCRAM 身份验证

EventBridge Pipes 支持使用传输层安全性协议 (TLS) 加密进行简单身份验证和安全层/加盐质疑应答身份验证机制 (SASL/SCRAM) 身份验证。EventBridge Pipes 发送已加密凭证以使用集群进行身份验证。有关 SASL/SCRAM 身份验证的更多信息,请参阅 RFC 5802

EventBridge Pipes 支持使用 TLS 加密进行 SASL/PLAIN 身份验证。EventBridge Pipes 使用 SASL/PLAIN 身份验证,将凭证以明文(未加密)形式发送到服务器。

为了 SASL 身份验证,需要将登录凭证作为密钥存储在 AWS Secrets Manager 中。

双向 TLS 身份验证

双向 TLS(mTLS)在客户端和服务器之间提供双向身份验证。客户端向服务器发送证书以便服务器验证客户端,而服务器又向客户端发送证书以便客户端验证服务器。

在自托管 Apache Kafka 中,EventBridge Pipes 充当客户端。您可以配置客户端证书(作为 Secrets Manager 中的密钥),以使用 Apache Kafka 代理对 EventBridge Pipes 进行身份验证。客户端证书必须由服务器信任存储中的证书颁发机构 (CA) 签名。

Apache Kafka 集群向 EventBridge Pipes 发送服务器证书,以便使用 EventBridge Pipes 对 Apache Kafka 代理进行身份验证。服务器证书可以是公有 CA 证书。也可以是私有 CA/自签名证书。公有 CA 证书必须由 EventBridge Pipes 信任存储中的 CA 签名。对于私有 CA /自签名证书,您可以配置服务器根 CA 证书(作为 Secrets Manager 中的密钥)。EventBridge Pipes 使用根证书来验证 Apache Kafka 代理。

有关 mTLS 的更多信息,请参阅为作为源的 Amazon MSK 引入双向 TLS 身份验证

配置客户端证书密钥

CLIENT_CERTIFICATE_TLS_AUTH 密钥需要证书字段和私有密钥字段。对于加密的私有密钥,密钥需要私有密钥密码。证书和私有密钥必须采用 PEM 格式。

注意

EventBridge Pipes 支持 PBES1(而不是 PBES2)私有密钥加密算法。

证书字段必须包含证书列表,首先是客户端证书,然后是任何中间证书,最后是根证书。每个证书都必须按照以下结构在新行中启动:

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager 支持最多包含 65536 字节的密钥,这为长证书链提供了充足的空间。

私有密钥必须采用 PKCS #8 格式,并具有以下结构:

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

对于加密的私有密钥,请使用以下结构:

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

以下示例显示使用加密私有密钥进行 mTLS 身份验证的密钥内容。对于加密的私有密钥,可以在密钥中包含私有密钥密码。

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

配置服务器根 CA 证书密钥

如果您的 Apache Kafka 代理使用 TLS 加密(具有由私有 CA 签名的证书),则创建此密钥。您可以将 TLS 加密用于 VPC、SASL/SCRAM、SASL/PLAIN 或 mTLS 身份验证。

服务器根 CA 证书密钥需要一个字段,其中包含 PEM 格式的 Apache Kafka 代理的根 CA 证书。以下示例显示密钥的结构。

{ "certificate": "-----BEGIN CERTIFICATE----- MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG... -----END CERTIFICATE-----"

网络配置

如果使用的自托管 Apache Kafka 环境使用私有 VPC 连接,EventBridge 必须具有对与 Apache Kafka 代理关联的 Amazon Virtual Private Cloud(Amazon VPC)资源的访问权限。

  • 要访问 Apache Kafka 集群的 VPC,EventBridge 可以使用源子网的出站互联网访问权限。对于私有子网,它可以是 NAT 网关,也可以是您自己的 NAT。确保此 NAT 具有公共 IP 地址,可以连接到互联网。对于公有子网,必须使用 VPC 端点(详见下文)。

  • EventBridge 管道还支持通过 AWS PrivateLink 传送事件,这样您无需通过公共互联网就可以将事件从位于 Amazon Virtual Private Cloud(Amazon VPC)的事件源发送到管道目标。您可以使用管道从 Amazon Managed Streaming for Apache Kafka(Amazon MSK)、自托管 Apache Kafka 以及位于私有子网中的 Amazon MQ 源进行轮询,而无需部署互联网网关、配置防火墙规则或设置代理服务器。您还可以使用 VPC 端点来支持从公有子网中的 Kafka 集群传送事件。

    要设置 VPC 端点,请参阅《AWS PrivateLink 用户指南》中的创建 VPC 端点。对于服务名称,选择 com.amazonaws.region.pipes-data

使用以下规则配置您的 Amazon VPC 安全组(至少):

  • 入站规则 – 允许 Apache Kafka 代理端口上为源指定的安全组的所有流量。

  • 出站规则 – 允许所有目标的端口 443 上的所有流量传输。允许 Apache Kafka 代理端口上为源指定的安全组的所有流量。

    代理端口包括:

    • 9092(适用于明文)

    • 9094(适用于 TLS)

    • 9096(适用于 SASL)

    • 9098(适用于 IAM)

使用 Apache Kafka 源进行使用者自动扩缩

当您最初创建 Apache Kafka 源时,EventBridge 会分配一个使用者来处理 Kafka 主题中的所有分区。每个使用者都使用多个并行运行的处理器来处理增加的工作负载。此外,EventBridge 会根据工作负载自动增加或缩减使用者的数量。要保留每个分区中的消息顺序,使用者的最大数量为主题中每个分区一个使用者。

EventBridge 会按一分钟的间隔时间来评估主题中所有分区的使用者偏移滞后。如果延迟太高,则分区接收消息的速度比 EventBridge 处理消息的速度更快。如有必要,EventBridge 会在主题中添加或删除使用者。增加或移除使用者的扩缩过程会在评估完成后的三分钟内进行。

如果您的目标过载,EventBridge 会减少使用者的数量。此操作通过减少使用者可以检索和发送到函数的消息数来减少函数的工作负载。