使用 Lambda 处理 Amazon MSK 消息 - AWS Lambda

使用 Lambda 处理 Amazon MSK 消息

注意

如果想要将数据发送到 Lambda 函数以外的目标,或要在发送数据之前丰富数据,请参阅 Amazon EventBridge Pipes(Amazon EventBridge 管道)。

将 Amazon MSK 添加为事件源

创建事件源映射,使用 Lambda 控制台、AWS开发工具包,或 AWS Command Line Interface (AWS CLI) 将您的 Amazon MSK 添加为 Lambda 函数触发器。请注意,当您将 Amazon MSK 添加为触发器时,Lambda 将假定 Amazon MSK 集群的 VPC 设置,而不是 Lambda 函数的 VPC 设置。

本节介绍了如何使用 Lambda 控制台和 AWS CLI 创建事件源映射。

先决条件

  • 一个 Amazon MSK 集群和一个 Kafka 主题。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的开始使用 Amazon MSK

  • 一个有权访问 MSK 集群所用 AWS 资源的执行角色

可自定义的使用者组 ID

将 Kafka 设置为事件源时,您可以指定使用者组 ID。此使用者组 ID 是您希望 Lambda 函数加入的 Kafka 使用者组的现有标识符。您可以使用此功能将任何正在进行的 Kafka 记录处理设置从其他使用者无缝迁移到 Lambda。

如果指定了使用者组 ID,并且该使用者组中还有其他活跃的轮询器,则 Kafka 会向所有使用者分发消息。换句话说,Lambda 不会收到 Kafka 主题的所有消息。如果希望 Lambda 处理主题中的所有消息,请关闭该使用者组中的任何其他轮询器。

此外,如果指定了使用者组 ID,而 Kafka 找到了具有相同 ID 的有效现有使用者组,则 Lambda 会忽略事件源映射的 StartingPosition 参数。相反,Lambda 开始根据使用者组的已提交偏移量处理记录。如果指定了使用者组 ID,而 Kafka 找不到现有使用者组,则 Lambda 会使用指定的 StartingPosition 配置事件源。

在所有 Kafka 事件源中,您指定的使用者组 ID 必须是唯一的。在使用指定的使用者组 ID 创建 Kafka 事件源映射后,无法更新此值。

添加 Amazon MSK 触发器(控制台)

按照以下步骤将 Amazon MSK 集群和 Kafka 主题添加为 Lambda 函数的触发器。

将 Amazon MSK 触发器添加到 Lambda 函数(控制台)
  1. 打开 Lamba 控制台的 Functions(函数)页面。

  2. 选择 Lambda 函数的名称。

  3. Function overview(函数概览)下,选择 Add trigger(添加触发器)。

  4. Trigger configuration(触发配置)下,执行以下操作:

    1. 选择 MSK 触发器类型。

    2. 对于 MSK cluster(MSK 集群),选择您的集群。

    3. 对于 Batch size(批处理大小),输入要在单个批次中接收的最大消息数。

    4. 对于 Batch window(批处理时段),输入 Lambda 在调用函数之前收集记录所花费的最大秒数。

    5. 对于 Topic name(主题名称),输入 Kafka 主题名称。

    6. (可选)对于 Consumer group ID(使用者组 ID),输入要加入的 Kafka 使用者组的 ID。

    7. (可选)对于起始位置,选择最新即可从最新记录开始读取流,选择最早即可从最早的可用记录开始读取流,选择在时间戳处即可从指定的时间戳开始读取流。

    8. (可选)对于 Authentication(身份验证),选择用于通过 MSK 集群中的代理进行身份验证的密钥。

    9. 要在禁用状态下创建触发器以进行测试(推荐),请清除 Enable trigger(启用触发器)。或者,要立即启用该触发器,请选择 Enable trigger(启用触发器)。

  5. 要创建触发器,请选择 Add(添加)。

添加 Amazon MSK 触发器(AWS CLI)

使用以下示例 AWS CLI 命令为 Lambda 函数创建和查看 Amazon MSK 触发器。

使用 AWS CLI 创建触发器

例 — 为使用 IAM 身份验证的集群创建事件源映射

以下示例使用 create-event-source-mapping AWS CLI 命令将名为 my-kafka-function 的 Lambda 函数映射至名为 AWSKafkaTopic 的 Kafka 主题。将主题的起始位置设置为 LATEST。当集群使用基于 IAM 角色的身份验证时,您不需要 SourceAccessConfiguration 对象。例如:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
例 — 为使用 SASL/SCRAM 身份验证的集群创建事件源映射

如果集群使用 SASL/SCRAM 身份验证,则必须包含指定 SASL_SCRAM_512_AUTHSourceAccessConfiguration 对象以及 Secrets Manager 密钥 ARN。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
例 — 为使用 mTLS 身份验证的集群创建事件源映射

如果集群使用 mTLS 身份验证,则必须包含指定 CLIENT_CERTIFICATE_TLS_AUTHSourceAccessConfiguration 对象以及 Secrets Manager 密钥 ARN。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

有关更多信息,请参阅 CreateEventSourceMapping API 参考文档。

使用 AWS CLI 查看状态

以下示例使用 get-event-source-mapping AWS CLI 命令来描述您创建的事件源映射的状态。

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

Amazon MSK 配置参数

所有 Lambda 事件源类型共享相同的 CreateEventSourceMappingUpdateEventSourceMapping API 操作。但是,只有部分参数适用于Amazon MSK。

参数 必需 默认值 注意

AmazonManagedKafkaEventSourceConfig

包含 ConsumerGroupId 字段,该字段默认为唯一值。

只能在 Create(创建)设置

BatchSize

100

最大值:10000

DestinationConfig

不适用

捕获 Amazon MSK 事件源丢弃的批处理

启用

True

EventSourceArn

Y

不适用

只能在 Create(创建)设置

FilterCriteria

不适用

控制 Lambda 向您的函数发送的事件

FunctionName

不适用

KMSKeyArn

不适用

筛选条件的加密

MaximumBatchingWindowInSeconds

500 毫秒

批处理行为

ProvisionedPollersConfig

MinimumPollers:如果未指定,则默认值为 1

MaximumPollers:如果未指定,则默认值为 200

配置预调配模式

SourceAccessConfigurations

无凭证

事件源的 SASL/SCRAM 或 CLIENT_CERTIFICATE_TLS_AUTH (MutualTLS) 身份验证凭证

StartingPosition

Y

不适用

AT_TIMESTAMP、TRIM_HORIZON 或 LATEST

只能在 Create(创建)设置

StartingPositionTimestamp

不适用

当 StartingPosition 设置为 AT_TIMESTAMP 时,为必需项

标签

不适用

在事件源映射上使用标签

主题

Y

不适用

Kafka 主题名称

只能在 Create(创建)设置

创建跨账户事件源映射

您可以使用多 VPC 私有连接将 Lambda 函数连接到不同 AWS 账户 中的预置 MSK 集群。多 VPC 连接使用 AWS PrivateLink,可将所有流量保持在 AWS 网络内。

注意

您无法为无服务器 MSK 集群创建跨账户事件源映射。

要创建跨账户事件源映射,必须先为 MSK 集群配置多 VPC 连接。创建事件源映射时,请使用托管 VPC 连接 ARN 而非集群 ARN,如以下示例所示。CreateEventSourceMapping 操作也因 MSK 集群使用的身份验证类型而异。

例 — 为使用 IAM 身份验证的集群创建跨账户事件源映射

当集群使用基于 IAM 角色的身份验证时,您不需要 SourceAccessConfiguration 对象。例如:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
例 — 为使用 SASL/SCRAM 身份验证的集群创建跨账户事件源映射

如果集群使用 SASL/SCRAM 身份验证,则必须包含指定 SASL_SCRAM_512_AUTHSourceAccessConfiguration 对象以及 Secrets Manager 密钥 ARN。

有两种方法可以通过 SASL/SCRAM 身份验证将密钥用于跨账户 Amazon MSK 事件源映射:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
例 — 为使用 mTLS 身份验证的集群创建跨账户事件源映射

如果集群使用 mTLS 身份验证,则必须包含指定 CLIENT_CERTIFICATE_TLS_AUTHSourceAccessConfiguration 对象以及 Secrets Manager 密钥 ARN。密钥可以存储在集群账户或 Lambda 函数账户中。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'

将 Amazon MSK 集群用作事件源

当您添加 Apache Kafka 或 Amazon MSK 集群作为 Lambda 函数的触发器时,该集群将用作事件源

Lambda 根据您指定的 StartingPosition,从您在 CreateEventSourceMapping 请求中指定为 Topics 的 Kafka 主题读取事件数据。成功进行处理后,会将 Kafka 主题提交给 Kafka 集群。

如果您指定 StartingPosition 作为 LATEST,则 Lambda 开始读取主题下每个分区中的最新消息。由于在触发器配置后 Lambda 开始读取消息之前可能会有一些延迟,因此 Lambda 不会读取在此窗口中生成的任何消息。

Lambda 按顺序读取每个 Kafka 主题分区的消息。单个 Lambda 负载可以包含来自多个分区的消息。当有更多记录可用时,Lambda 根据您在 CreateEventSourceMapping 中指定的 BatchSize 值,继续对记录进行批处理,直到函数赶上主题的速度。

Lambda 处理各个批次后,会提交该批次中消息的偏移量。如果函数为批处理中的任何消息返回错误,Lambda 将重试整批消息,直到处理成功或消息过期为止。您可以将所有重试都失败的记录发送到失败时的目标,以供日后处理。

注意

尽管 Lambda 函数的最大超时限制通常为 15 分钟,但 Amazon MSK、自行管理的 Apache Kafka、Amazon DocumentDB、Amazon MQ for ActiveMQ 和 RabbitMQ 的事件源映射,仅支持最大超时限制为 14 分钟的函数。此约束可确保事件源映射可以正确处理函数错误和重试。

轮询和流的起始位置

请注意,事件源映射创建和更新期间的流轮询最终是一致的。

  • 在事件源映射创建期间,可能需要几分钟才能开始轮询来自流的事件。

  • 在事件源映射更新期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。

此行为意味着,如果你指定 LATEST 作为流的起始位置,事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON 或 AT_TIMESTAMP

Amazon CloudWatch 指标

Lambda 会在您的函数处理记录时发出 OffsetLag 指标。此指标的值是写入 Kafka 事件源主题的最后一条记录与函数的使用者组处理的最后一条记录之间的偏移量差值。您可以使用 OffsetLag 来估计添加记录和使用者组处理记录之间的延迟。

如果 OffsetLag 呈上升趋势,则可能表明函数的使用者组中的轮询器存在问题。有关更多信息,请参阅 将 CloudWatch 指标与 Lambda 结合使用

Amazon MSK 事件源映射的消息吞吐量扩展行为

您可以在 Amazon MSK 事件源映射的两种消息吞吐量扩展行为模式之间进行选择:

默认(按需)模式

当您最初创建 Amazon MSK 事件源时,Lambda 会分配默认数量的事件轮询器来处理 Kafka 主题中的所有分区。Lambda 根据消息负载自动扩展或缩减事件轮询器的数量。

Lambda 会按一分钟的间隔时间来评估主题中所有分区的偏移滞后。如果偏移延迟太高,则分区接收消息的速度比 Lambda 处理消息的速度更快。如有必要,Lambda 会在主题中添加或删除事件轮询器。添加或删除事件轮询器的自动扩缩过程在评估后的三分钟内发生。

如果目标 Lambda 函数受到限制,Lambda 会减少事件轮询器的数量。此操作通过减少事件轮询器可以检索和发送到函数的消息数来减少函数的工作负载。

配置预调配模式

对于需要微调事件源映射吞吐量的工作负载,您可以使用预调配模式。在预调配模式下,您可以为预调配事件轮询器数量定义最小和最大限制。这些预调配事件轮询器专用于事件源映射,并且可以通过响应式自动扩缩处理意外的消息激增。对于具有严格性能要求的 Kafka 工作负载,我们建议您使用预调配模式。

在 Lambda 中,事件轮询器是一种能够处理高达 5 Mbps 吞吐量的计算单位。作为参考,假设您的事件源产生的平均有效载荷为 1 MB,并且平均函数持续时间为 1 秒。如果有效载荷未进行任何转换(例如筛选),则单个轮询器可以支持 5 Mbps 的吞吐量和 5 次并发 Lambda 调用。使用预调配模式会产生额外成本。有关定价估算,请参阅 AWS Lambda 定价

注意

使用预调配模式时,您无需创建 AWS PrivateLink VPC 端点或在网络配置过程中授予关联权限。

在预调配模式下,最小事件轮询器数量 (MinimumPollers) 的可接受值范围介于 1 到 200 之间(含首尾)。事件轮询器的最大数量 (MaximumPollers) 的可接受值范围介于 1 到 2000 之间(含首尾)。MaximumPollers 必须大于或等于 MinimumPollers。此外,为了保持分区内的有序处理,Lambda 會将 MaximumPollers 限制为主题中的分区数量。

有关选择适当的最小和最大事件轮询器值的更多详细信息,请参阅使用预调配模式时的最佳实践和注意事项

您可以使用控制台或 Lambda API 为 Amazon MSK 事件源映射配置预调配模式。

为现有的 Amazon MSK 事件源映射配置预调配模式(控制台)
  1. 打开 Lamba 控制台的函数页面

  2. 选择具有要为其配置预调配模式的 Amazon MSK 事件源映射的函数。

  3. 选择配置,然后选择触发器

  4. 选择要为其配置预调配模式的 Amazon MSK 事件源映射,然后選擇编辑

  5. 事件源映射配置下,选择配置预调配模式

    • 对于最少事件轮询器,输入介于 1 到 200 之间的值。如果未指定值,则 Lambda 将选择默认值 1。

    • 对于最大事件轮询器,输入介于 1 到 2000 之间的值。此值必须大于或等于最少事件轮询器的值。如果未指定值,则 Lambda 将选择默认值 200。

  6. 选择保存

您可以使用 EventSourceMappingConfiguration 中的 ProvisionedPollerConfig 对象,以编程方式配置预调配模式。例如,以下 UpdateEventSourceMapping CLI 命令将 MinimumPollers 值配置为 5,将 MaximumPollers 值配置为 100。

aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --provisioned-poller-config '{"MinimumPollers": 5, "MaximumPollers": 100}'

配置预调配模式后,您可以通过监控 ProvisionedPollers 指标来观测事件轮询器对您的工作负载的使用情况。有关更多信息,请参阅 事件源映射指标

要禁用预调配模式并返回默认(按需)模式,您可以使用以下 UpdateEventSourceMapping CLI 命令:

aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --provisioned-poller-config '{}'

使用预调配模式时的最佳实践和注意事项

事件源映射的最小和最大事件轮询器的最佳配置取决于应用程序的性能需求。建议您从默认最小事件轮询器开始,以设定性能配置文件的基准。根据观测到的消息处理模式和所需的性能配置文件调整配置。

对于流量激增且性能需求严格的工作负载,请增加最少的事件轮询器数以处理消息突然激增。要确定所需的最少事件轮询器数,请考虑工作负载的每秒消息数和平均有效载荷大小,并使用单个事件轮询器的吞吐能力(最高 5 Mbps)作为参考。

为了保持分区内的有序处理,Lambda 会将最大事件轮询器数限制为主题中的分区数量。此外,您的事件源映射可以扩展到的最大事件轮询器数取决于函数的并发设置。

激活预调配模式时,更新您的网络设置以删除 AWS PrivateLink VPC 端点和关联的权限。