本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
适用于 Apache 的亚马逊托管流媒体 Kafka 主题作为 Pipes 中的来源 EventBridge
你可以使用 Pipes 接收来自亚马逊托 EventBridge 管流媒体 For Apache Kafka(MSK亚马逊)主题的记录。您可以选择筛选或增强这些记录,然后再将它们发送到可用的目标之一进行处理。在设置管道时MSK,您可以选择特定于 Amazon 的设置。 EventBridge 在将数据发送到目标时,Pipes 会维护消息代理中记录的顺序。
Amazon MSK 是一项完全托管的服务,您可以使用它来构建和运行使用 Apache Kafka 处理流数据的应用程序。亚马逊MSK简化了运行 Apache Kafka 的集群的设置、扩展和管理。借助 AmazonMSK,您可以为多个可用区配置应用程序,并使用 AWS Identity and Access Management (IAM) 配置应用程序的安全性。亚马逊MSK支持 Kafka 的多个开源版本。
亚马逊MSK作为来源的运作方式与使用亚马逊简单队列服务(亚马逊SQS)或亚马逊 Kinesis 类似。 EventBridge在内部轮询来自源的新消息,然后同步调用目标。 EventBridge 批量读取消息,并将这些消息作为事件负载提供给您的函数。最大批处理大小可配置。(默认值为 100 个消息。)
对于基于 Apache Kafka 的源, EventBridge 支持处理控制参数,例如批处理窗口和批处理大小。
EventBridge 按顺序读取每个分区的消息。 EventBridge 处理完每个批次后,它会提交该批次中消息的偏移量。如果管道的目标对批处理中的任何消息返回错误,则 EventBridge 会重试整批消息,直到处理成功或消息过期。
EventBridge 当它调用目标时,会发送事件中的一批消息。事件负载包含一个消息数组。每个数组项目都包含 Amazon MSK 主题和分区标识符的详细信息,以及时间戳和 base64 编码的消息。
示例事件
以下示例事件显示了管道接收到的信息。您可以使用此事件来创建和筛选您的事件模式,或定义输入转换。并非所有字段都可以筛选。有关可筛选字段的更多信息,请参阅 Amazon P EventBridge ipes 中的事件筛选。
[ { "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "eventSourceKey": "mytopic-0", "topic": "mytopic", "partition": "0", "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ]
轮询和流的起始位置
请注意,管道创建和更新期间的流源轮询最终将是一致的。
在管道创建期间,可能需要几分钟才能开始轮询来自流的事件。
在管道更新源轮询配置期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。
这意味着,如果您指定 LATEST
作为流的起始位置,在创建或更新管道期间,管道可能会错过发送的事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON
。
MSK集群认证
EventBridge 需要访问Amazon MSK 集群、检索记录和执行其他任务的权限。Amazon MSK 支持多种控制客户端访问MSK集群的选项。有关在哪种情况下使用哪种身份验证方法的更多信息,请参阅 如何 EventBridge 选择引导代理。
未经身份验证的访问
我们建议仅使用未经身份验证的访问权限进行开发。仅当集群禁用IAM基于角色的身份验证时,未经身份验证的访问才有效。
SASL/SCRAM认证
Amazon MSK 支持使用传输层安全 () 加密的简单身份验证和安全层/加盐质询响应身份验证机制 (SASL/SCRAMTLS) 身份验证。 EventBridge 要连接到集群,您需要将身份验证凭据(登录凭据)存储在 AWS Secrets Manager 密钥中。
有关使用 Secrets Manager 的更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的使用 AWS Secrets Manager进行用户名和密码身份验证。
Amazon MSK 不支持SASL/PLAIN身份验证。
基于 IAM 角色的身份验证
您可以使用IAM来验证连接到MSK集群的客户端的身份。如果您的MSK集群上的IAM身份验证处于活动状态,并且您没有为身份验证提供密钥,则 EventBridge 会自动默认为使用IAM身份验证。要创建和部署基于IAM用户或角色的策略,请使用IAM控制台或API。有关更多信息,请参阅 Apache Managed Streaming for Apache Kafka 开发者指南中的IAM访问控制。
要允许 EventBridge 连接到MSK集群、读取记录和执行其他必需的操作,请向管道的执行角色添加以下权限。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:
region
:account-id
:cluster/cluster-name
/cluster-uuid
", "arn:aws:kafka:region
:account-id
:topic/cluster-name
/cluster-uuid
/topic-name
", "arn:aws:kafka:region
:account-id
:group/cluster-name
/cluster-uuid
/consumer-group-id
" ] } ] }
您可以将这些权限范围限定为特定集群、主题和组。有关更多信息,请参阅《适用于 A pache MSK Kafka 的亚马逊托管流媒体 Kafka 开发者指南》中的亚马逊 Kafka 操作。
相互TLS认证
双向 TLS (mTLS) 在客户端和服务器之间提供双向身份验证。客户端向服务器发送证书以便服务器验证客户端,而服务器又向客户端发送证书以便客户端验证服务器。
对于亚马逊MSK,则 EventBridge 充当客户。您可以配置客户端证书(作为 Secrets Manager 中的密钥),以向MSK集群中的代理进行身份验证 EventBridge 。客户端证书必须由服务器信任存储中的证书颁发机构 (CA) 签名。集MSK群向发送服务器证书, EventBridge 以便对代理进行身份验证 EventBridge。服务器证书必须由 AWS 信任存储区中的 CA 签名。
亚马逊MSK不支持自签名服务器证书,因为亚马逊的所有经纪人都MSK使用由亚马逊信任服务签名的公共证书,默认情况下CAs,亚马逊信任服务
有关 m for Amazon TLS 的更多信息MSK,请参阅《适用于 A pache Managed Kafka 的亚马逊托管流媒体开发者指南》中的相互TLS身份验证。
配置 m TLS 密钥
CLIENT_ CERTIFICATE TLS _ AUTH 密钥需要证书字段和私钥字段。对于加密的私有密钥,密钥需要私有密钥密码。证书和私钥必须是PEM格式化的。
注意
EventBridge 支持 PBES1
证书字段必须包含证书列表,首先是客户端证书,然后是任何中间证书,最后是根证书。每个证书都必须按照以下结构在新行中启动:
-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----
Secrets Manager 支持最多包含 65536 字节的密钥,这为长证书链提供了充足的空间。
私钥必须采用 PKCS#8
-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----
对于加密的私有密钥,请使用以下结构:
-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----
以下示例显示了使用加密私钥进行TLS身份验证的密钥的内容。对于加密的私有密钥,您可以在密钥中包含私有密钥密码。
{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }
如何 EventBridge 选择引导代理
EventBridge 根据集群上可用的身份验证方法以及您是否为身份验证提供密钥来选择引导代理。如果您为 m TLS 或SASL/提供了密钥SCRAM,则 EventBridge 会自动选择该身份验证方法。如果您不提供密钥,请 EventBridge选择集群上处于活动状态的最强身份验证方法。以下是 EventBridge 选择代理的优先顺序,从最强到最弱的身份验证:
-
mTLS(为 m 提供的机密TLS)
-
SASL/SCRAM(为 SASL /提供的机密SCRAM)
-
SASLIAM(未提供密钥,且IAM身份验证处于活动状态)
-
未经身份验证TLS(未提供任何密钥,且IAM身份验证未激活)
-
纯文本(未提供任何密钥,身份验证和未经IAM身份验证均未激活TLS)
注意
如果 EventBridge 无法连接到最安全的代理类型,则不会尝试连接到其他(较弱的)代理类型。如果 EventBridge 要选择较弱的代理类型,请停用集群上所有更强的身份验证方法。
网络配置
EventBridge 必须有权访问与您的亚马逊MSK集群关联的 Amazon Virtual Private Cloud (AmazonVPC) 资源。
-
要访问您VPC的 Amazon MSK 集群, EventBridge 可以使用源子网的出站互联网访问权限。对于私有子网,它可以是NAT网关,也可以是您自己的NAT网关。确保NAT具有公有 IP 地址并且可以连接到互联网。对于公有子网,您必须使用VPC终端节点(详见下文)。
-
EventBridge Pipes 还支持通过传送事件 AWS PrivateLink
,允许您在不通过公共互联网的情况下将事件从位于 Amazon Virtual Private Cloud (Amazon VPC) 的事件源发送到 Pipes 目标。您可以使用 Pipes 从 Amazon Managed Streaming for Apache Kafka (Amazon MSK)、自我管理的 Apache Kafka 以及位于私有子网中的 Amazon MQ 源进行轮询,而无需部署互联网网关、配置防火墙规则或设置代理服务器。您还可以使用VPC终端节点来支持从公有子网中的 Kafka 集群进行传输。 要设置VPC终端节点,请参阅AWS PrivateLink 用户指南中的创建VPC终端节点。对于服务名称,请选择
com.amazonaws.
。region
.pipes-data
使用以下规则(至少)配置您的 Amazon VPC 安全组:
-
入站规则-允许为源指定的安全组在 Amazon b MSK roker 端口上的所有流量。
-
出站规则 – 允许所有目标的端口 443 上的所有流量传输。允许 Amazon b MSK roker 端口上为您的来源指定的安全组的所有流量。
代理端口包括:
9092 表示纯文本
9094 for TLS
9096 for SASL
9098 for IAM
注意
您可以通过亚马逊发现您的亚马逊VPCMSKAPI配置。在设置过程中,您不需要对其进行配置。
可自定义的使用者组 ID
将 Apache Kafka 设置为事件源时,您可以指定使用者组 ID。此使用者组 ID 是您希望管道加入的 Apache Kafka 使用者组的现有标识符。您可以使用此功能将任何正在进行的 Apache Kafka 记录处理设置从其他使用者迁移到。 EventBridge
如果指定了使用者组 ID,并且该使用者组中还有其他活跃的轮询器,则 Apache Kafka 会向所有使用者分发消息。换句话说, EventBridge 不会收到 Apache Kafka 主题的所有消息。如果 EventBridge 要处理主题中的所有消息,请关闭该使用者组中的任何其他轮询器。
此外,如果您指定了使用者组 ID,而 Apache Kafka 找到了具有相同 ID 的有效现有使用者组,则 EventBridge 会忽略管道的StartingPosition
参数。而是 EventBridge 开始根据使用者组的已提交偏移量处理记录。如果您指定了使用者组 ID,而 Apache Kafka 找不到现有的使用者组,则使用指定的来 EventBridge 配置您的来源。StartingPosition
在所有 Apache Kafka 事件源中,您指定的使用者组 ID 必须是唯一的。在使用指定的使用者组 ID 创建管道后,无法更新此值。
自动扩展 Amazon MSK 来源
最初创建 Amazon MSK 源时, EventBridge 会分配一个使用者来处理 Apache Kafka 主题中的所有分区。每个使用者都使用多个并行运行的处理器来处理增加的工作负载。此外,还可以根据工作负载 EventBridge 自动增加或缩小使用者的数量。要保留每个分区中的消息顺序,使用者的最大数量为主题中每个分区一个使用者。
每隔一分钟, EventBridge 评估主题中所有分区的消费偏移延迟。如果延迟太高,则表示分区接收消息的速度 EventBridge 快于处理消息的速度。如有必要,在主题中 EventBridge 添加或删除消费者。增加或移除使用者的扩缩过程会在评估完成后的三分钟内进行。
如果您的目标超负荷,则 EventBridge 减少消费者的数量。此操作通过减少使用者可以检索和发送到管道的消息数,来减少管道的工作负载。