使用 Lambda 处理自托管式 Apache Kafka 消息 - AWS Lambda

使用 Lambda 处理自托管式 Apache Kafka 消息

注意

如果想要将数据发送到 Lambda 函数以外的目标,或要在发送数据之前丰富数据,请参阅 Amazon EventBridge Pipes(Amazon EventBridge 管道)。

将 Kafka 集群添加为事件源

创建事件源映射,使用 Lambda 控制台、AWS开发工具包,或 AWS Command Line Interface (AWS CLI) 将您的 Kafka 集群添加为 Lambda 函数触发器

本节介绍了如何使用 Lambda 控制台和 AWS CLI 创建事件源映射。

先决条件

  • 自行管理的 Apache Kafka 集群。Lambda 支持 Apache Kafka 版本 0.10.1.0 及更高版本。

  • 一个有权访问您自行管理的 Kafka 集群所用 AWS 资源的执行角色

可自定义的使用者组 ID

将 Kafka 设置为事件源时,您可以指定使用者组 ID。此使用者组 ID 是您希望 Lambda 函数加入的 Kafka 使用者组的现有标识符。您可以使用此功能将任何正在进行的 Kafka 记录处理设置从其他使用者无缝迁移到 Lambda。

如果指定了使用者组 ID,并且该使用者组中还有其他活跃的轮询器,则 Kafka 会向所有使用者分发消息。换句话说,Lambda 不会收到 Kafka 主题的所有消息。如果希望 Lambda 处理主题中的所有消息,请关闭该使用者组中的任何其他轮询器。

此外,如果指定了使用者组 ID,而 Kafka 找到了具有相同 ID 的有效现有使用者组,则 Lambda 会忽略事件源映射的 StartingPosition 参数。相反,Lambda 开始根据使用者组的已提交偏移量处理记录。如果指定了使用者组 ID,而 Kafka 找不到现有使用者组,则 Lambda 会使用指定的 StartingPosition 配置事件源。

在所有 Kafka 事件源中,您指定的使用者组 ID 必须是唯一的。在使用指定的使用者组 ID 创建 Kafka 事件源映射后,无法更新此值。

添加自行管理的 Kafka 集群(控制台)

按照以下步骤将自行管理的 Apache Kafka 集群和 Kafka 主题添加为 Lambda 函数的触发器。

将 Apache Kafka 触发器添加到 Lambda 函数(控制台)
  1. 打开 Lamba 控制台的 Functions(函数)页面。

  2. 选择 Lambda 函数的名称。

  3. Function overview(函数概览)下,选择 Add trigger(添加触发器)。

  4. Trigger configuration(触发配置)下,执行以下操作:

    1. 选择 Apache Kafka 触发类型。

    2. 对于 Bootstrap servers(Bootstrap 引导服务器),输入集群中 Kafka 代理的主机和端口对地址,然后选择 Add(添加)。对集群中的每个 Kafka 代理重复此操作。

    3. 对于 Topic name(主题名称),输入用于在集群中存储记录的 Kafka 主题的名称。

    4. (可选)对于 Batch size(批处理大小),输入要在单个批次中接收的最大记录数。

    5. 对于 Batch window(批处理时段),输入 Lambda 在调用函数之前收集记录所花费的最大秒数。

    6. (可选)对于 Consumer group ID(使用者组 ID),输入要加入的 Kafka 使用者组的 ID。

    7. (可选)对于起始位置,选择最新即可从最新记录开始读取流,选择最早即可从最早的可用记录开始读取流,选择在时间戳处即可从指定的时间戳开始读取流。

    8. (可选)对于 VPC,请为您的 Kafka 集群选择 Amazon VPC。然后,选择 VPC subnets(VPC 子网)和 VPC security groups(VPC 安全组)。

      如果仅 VPC 内部的用户访问代理,则需要此设置。

    9. (可选)对于 Authentication(身份验证),请选择 Add(添加),然后执行以下操作:

      1. 选择集群中 Kafka 代理的访问权限或身份验证协议。

        • 如果 Kafka 代理使用 SASL/PLAIN 身份验证,请选择 BASIC_AUTH

        • 如果代理使用 SASL/SCRAM 身份验证,请选择其中一个 SASL_SCRAM 协议。

        • 如果要配置 mTLS 身份验证,请选择 CLIENT_CERTIFICATE_TLS_AUTH 协议。

      2. 对于 SASL/SCRAM 或 mTLS 身份验证,请选择包含 Kafka 集群凭据的 Secrets Manager 私有密钥。

    10. (可选)对于 Encryption(加密),如果您的 Kafka 代理使用由私有 CA 签名的证书,请选择包含根 CA 证书的 Secrets Manager 密钥,Kafka 代理使用该证进行 TLS 加密。

      此设置适用于 SASL/SCRAM 或 SASL/PLAIN 的 TLS 加密,以及 mTLS 身份验证。

    11. 要在禁用状态下创建触发器以进行测试(推荐),请清除 Enable trigger(启用触发器)。或者,要立即启用该触发器,请选择 Enable trigger(启用触发器)。

  5. 要创建触发器,请选择 Add(添加)。

添加自行管理的 Kafka 集群 (AWS CLI)

使用以下示例 AWS CLI 命令为 Lambda 函数创建和查看自行管理的 Apache Kafka 触发器。

使用 SASL/SCRAM

如果 Kafka 用户通过互联网访问您的 Kafka 代理,则需要指定为 SASL/SCRAM 身份验证创建的 Secrets Manager。以下示例使用 create-event-source-mapping AWS CLI 命令将名为 my-kafka-function 的 Lambda 函数映射至名为 AWSKafkaTopic 的 Kafka 主题。

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

使用 VPC

如果仅 VPC 中的 Kafka 用户访问 Kafka 代理,则必须指定 VPC、子网和 VPC 安全组。以下示例使用 create-event-source-mapping AWS CLI 命令将名为 my-kafka-function 的 Lambda 函数映射至名为 AWSKafkaTopic 的 Kafka 主题。

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

使用 AWS CLI 查看状态

以下示例使用 get-event-source-mapping AWS CLI 命令来描述您创建的事件源映射的状态。

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

自行管理的 Apache Kafka 配置参数

所有 Lambda 事件源类型共享相同的 CreateEventSourceMappingUpdateEventSourceMapping API 操作。但是,只有一些参数适用于 Apache Kafka。

参数 必需 默认值 备注

BatchSize

100

最大值:10000

DestinationConfig

不适用

捕获自托管式 Apache Kafka 事件源的丢弃批次

启用

True

FilterCriteria

不适用

控制 Lambda 向您的函数发送的事件

FunctionName

不适用

KMSKeyArn

不适用

筛选条件的加密

MaximumBatchingWindowInSeconds

500 毫秒

批处理行为

ProvisionedPollersConfig

MinimumPollers:如果未指定,则默认值为 1

MaximumPollers:如果未指定,则默认值为 200

配置预调配模式

SelfManagedEventSource

Y

不适用

Kafka 代理列表。只能在 Create(创建)设置

SelfManagedKafkaEventSourceConfig

包含 ConsumerGroupId 字段,该字段默认为唯一值。

只能在 Create(创建)设置

SourceAccessConfigurations

无凭证

集群的 VPC 信息或身份验证凭据

对于 SASL_PLAIN,设置为 BASIC_AUTH

StartingPosition

Y

不适用

AT_TIMESTAMP、TRIM_HORIZON 或 LATEST

只能在 Create(创建)设置

StartingPositionTimestamp

不适用

当 StartingPosition 设置为 AT_TIMESTAMP 时,为必需项

标签

不适用

在事件源映射上使用标签

主题

Y

不适用

主题名称

只能在 Create(创建)设置

将 Kafka 集群用作事件源

当您添加 Apache Kafka 或 Amazon MSK 集群作为 Lambda 函数的触发器时,该集群将用作事件源

Lambda 根据您指定的 StartingPosition,从您在 CreateEventSourceMapping 请求中指定为 Topics 的 Kafka 主题读取事件数据。成功进行处理后,会将 Kafka 主题提交给 Kafka 集群。

如果您指定 StartingPosition 作为 LATEST,则 Lambda 开始读取主题下每个分区中的最新消息。由于在触发器配置后 Lambda 开始读取消息之前可能会有一些延迟,因此 Lambda 不会读取在此窗口中生成的任何消息。

Lambda 处理来自一个或多个指定 Kafka 主题分区的记录,并将 JSON 有效负载发送到您的函数。单个 Lambda 负载可以包含来自多个分区的消息。当有更多记录可用时,Lambda 根据您在 CreateEventSourceMapping 中指定的 BatchSize 值,继续对记录进行批处理,直到函数赶上主题的速度。

如果函数为批处理中的任何消息返回错误,Lambda 将重试整批消息,直到处理成功或消息过期为止。您可以将所有重试都失败的记录发送到失败时的目标,以供日后处理。

注意

尽管 Lambda 函数的最大超时限制通常为 15 分钟,但 Amazon MSK、自行管理的 Apache Kafka、Amazon DocumentDB、Amazon MQ for ActiveMQ 和 RabbitMQ 的事件源映射,仅支持最大超时限制为 14 分钟的函数。此约束可确保事件源映射可以正确处理函数错误和重试。

轮询和流的起始位置

请注意,事件源映射创建和更新期间的流轮询最终是一致的。

  • 在事件源映射创建期间,可能需要几分钟才能开始轮询来自流的事件。

  • 在事件源映射更新期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。

此行为意味着,如果你指定 LATEST 作为流的起始位置,事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON 或 AT_TIMESTAMP

自行管理的 Apache Kafka 事件源映射的消息吞吐量扩展行为

您可以在 Amazon MSK 事件源映射的两种消息吞吐量扩展行为模式之间进行选择:

默认(按需)模式

当您最初创建自行管理的 Apache Kafka 事件源时,Lambda 会分配默认数量的事件轮询器来处理 Kafka 主题中的所有分区。Lambda 根据消息负载自动扩展或缩减事件轮询器的数量。

Lambda 会按一分钟的间隔时间来评估主题中所有分区的使用者偏移滞后。如果偏移延迟太高,则分区接收消息的速度比 Lambda 处理消息的速度更快。如有必要,Lambda 会在主题中添加或删除事件轮询器。添加或删除事件轮询器的自动扩缩过程在评估后的三分钟内发生。

如果目标 Lambda 函数受到限制,Lambda 会减少事件轮询器的数量。此操作通过减少事件轮询器可以检索和发送到函数的消息数来减少函数的工作负载。

要监控 Kafka 主题的吞吐量,您可以查看 Apache Kafka 使用者指标,例如 consumer_lagconsumer_offset

配置预调配模式

对于需要微调事件源映射吞吐量的工作负载,您可以使用预调配模式。在预调配模式下,您可以为预调配事件轮询器数量定义最小和最大限制。这些预调配事件轮询器专用于事件源映射,可以在出现意外消息激增时立即对其处理。对于具有严格性能要求的 Kafka 工作负载,我们建议您使用预调配模式。

在 Lambda 中,事件轮询器是一种能够处理高达 5 Mbps 吞吐量的计算单位。作为参考,假设您的事件源产生的平均有效载荷为 1 MB,并且平均函数持续时间为 1 秒。如果有效载荷未进行任何转换(例如筛选),则单个轮询器可以支持 5 Mbps 的吞吐量和 5 次并发 Lambda 调用。使用预调配模式会产生额外成本。有关定价估算,请参阅 AWS Lambda 定价

在预调配模式下,最小事件轮询器数量 (MinimumPollers) 的可接受值范围介于 1 到 200 之间(含首尾)。事件轮询器的最大数量 (MaximumPollers) 的可接受值范围介于 1 到 2000 之间(含首尾)。MaximumPollers 必须大于或等于 MinimumPollers。此外,为了保持分区内的有序处理,Lambda 會将 MaximumPollers 限制为主题中的分区数量。

有关选择适当的最小和最大事件轮询器值的更多详细信息,请参阅使用预调配模式时的最佳实践和注意事项

您可以使用控制台或 Lambda API 为自行管理的 Apache Kafka 事件源映射配置预调配模式。

为现有的自行管理的 Apache Kafka 事件源映射配置预调配模式(控制台)
  1. 打开 Lamba 控制台的函数页面

  2. 选择具有要为其配置预调配模式的自行管理 Apache Kafka 事件源映射的函数。

  3. 选择配置,然后选择触发器

  4. 选择要为其配置预调配模式的自行管理 Apache Kafka 事件源映射,然后選擇编辑

  5. 事件源映射配置下,选择配置预调配模式

    • 对于最少事件轮询器,输入介于 1 到 200 之间的值。如果未指定值,则 Lambda 将选择默认值 1。

    • 对于最大事件轮询器,输入介于 1 到 2000 之间的值。此值必须大于或等于最少事件轮询器的值。如果未指定值,则 Lambda 将选择默认值 200。

  6. 选择保存

您可以使用 EventSourceMappingConfiguration 中的 ProvisionedPollerConfig 对象,以编程方式配置预调配模式。例如,以下 UpdateEventSourceMapping CLI 命令将 MinimumPollers 值配置为 5,将 MaximumPollers 值配置为 100。

aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --provisioned-poller-config '{"MinimumPollers": 5, "MaximumPollers": 100}'

配置预调配模式后,您可以通过监控 ProvisionedPollers 指标来观测事件轮询器对您的工作负载的使用情况。有关更多信息,请参阅 事件源映射指标

要禁用预调配模式并返回默认(按需)模式,您可以使用以下 UpdateEventSourceMapping CLI 命令:

aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --provisioned-poller-config '{}'

使用预调配模式时的最佳实践和注意事项

事件源映射的最小和最大事件轮询器的最佳配置取决于应用程序的性能需求。建议您从默认最小事件轮询器开始,以设定性能配置文件的基准。根据观测到的消息处理模式和所需的性能配置文件调整配置。

对于流量激增且性能需求严格的工作负载,请增加最少的事件轮询器数以处理消息突然激增。要确定所需的最少事件轮询器数,请考虑工作负载的每秒消息数和平均有效载荷大小,并使用单个事件轮询器的吞吐能力(最高 5 Mbps)作为参考。

为了保持分区内的有序处理,Lambda 会将最大事件轮询器数限制为主题中的分区数量。此外,您的事件源映射可以扩展到的最大事件轮询器数取决于函数的并发设置。

激活预调配模式时,更新您的网络设置以删除 AWS PrivateLink VPC 端点和关联的权限。

Amazon CloudWatch 指标

Lambda 会在您的函数处理记录时发出 OffsetLag 指标。此指标的值是写入 Kafka 事件源主题的最后一条记录与函数的使用者组处理的最后一条记录之间的偏移量差值。您可以使用 OffsetLag 来估计添加记录和使用者组处理记录之间的延迟。

如果 OffsetLag 呈上升趋势,则可能表明函数的使用者组中的轮询器存在问题。有关更多信息,请参阅 将 CloudWatch 指标与 Lambda 结合使用