

# Lambda 如何处理来自基于流和队列的事件源的记录
<a name="invocation-eventsourcemapping"></a>

*事件源映射*是一种 Lambda 资源，它从流或基于队列的服务中读取项目并调用包含批次记录的函数。在事件源映射内，称为*事件轮询器*的资源会主动轮询新消息并调用函数。默认情况下，Lambda 会自动扩展事件轮询器，但对于某些事件源类型，您可以使用[预调配模式](#invocation-eventsourcemapping-provisioned-mode)来控制专用于事件源映射的事件轮询器的最小和最大数量。

以下服务使用事件源映射调用 Lambda 函数：
+ [Amazon DocumentDB（兼容 MongoDB）（Amazon DocumentDB）](with-documentdb.md)
+ [Amazon DynamoDB](with-ddb.md)
+ [Amazon Kinesis](with-kinesis.md)
+ [Amazon MQ](with-mq.md)
+ [Amazon Managed Streaming for Apache Kafka (Amazon MSK)](with-msk.md)
+ [自行管理的 Apache Kafka](with-kafka.md)
+ [Amazon Simple Queue Service (Amazon SQS)](with-sqs.md)

**警告**  
Lambda 事件源映射至少处理每个事件一次，有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题，我们强烈建议您将函数代码设为幂等性。要了解更多信息，请参阅 AWS 知识中心的[如何让我的 Lambda 函数保持幂等性](https://repost.aws/knowledge-center/lambda-function-idempotent)。

## 事件源映射与直接触发器的区别
<a name="eventsourcemapping-trigger-difference"></a>

某些 AWS 服务可以使用*触发器*直接调用 Lambda 函数。这些服务将事件推送到 Lambda，并在指定事件发生时立即调用该函数。触发器适用于离散事件和实时处理。当您[使用 Lambda 控制台创建触发器](lambda-services.md#lambda-invocation-trigger)时，控制台会与相应的 AWS 服务交互以配置该服务的事件通知。触发器实际上由生成事件的服务而不是 Lambda 存储和管理。以下是一些使用触发器调用 Lambda 函数的服务示例：
+ **Amazon Simple Storage Service（Amazon S3）：**当在存储桶中创建、删除或修改对象时调用函数。有关更多信息，请参阅 [教程：使用 Amazon S3 触发器调用 Lambda 函数](with-s3-example.md)。
+ **Amazon Simple Notiﬁcation Service（Amazon SNS）：**将消息发布到 SNS 主题时调用函数。有关更多信息，请参阅 [教程：将 AWS Lambda 与 Amazon Simple Notification Service 结合使用](with-sns-example.md)。
+ **Amazon API Gateway：**在向特定端点发出 API 请求时调用函数。有关更多信息，请参阅 [使用 Amazon API Gateway 端点调用 Lambda 函数](services-apigateway.md)。

事件源映射是在 Lambda 服务中创建和管理的 Lambda 资源。事件源映射旨在处理来自队列的大量流数据或消息。分批处理来自流或队列的记录比单独处理记录更高效。

## 批处理行为
<a name="invocation-eventsourcemapping-batching"></a>

预设情况下，事件源映射会将记录合并为单个有效载荷进行批处理，并由 Lambda 将其发送到您的函数。要微调批处理行为，您可以配置批处理时段（[MaximumBatchingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumBatchingWindowInSeconds)）和批处理大小（[BatchSize](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-BatchSize)）。批处理时段是将记录收集到单个有效载荷中的最长时间。批处理大小是单个批处理中的最大记录数。满足以下三个条件中的任意一个时，Lambda 会调用您的函数：
+ **批处理时段达到其最大值。**默认的批处理时段行为因特定的事件源而异。
  + **对于 Kinesis、DynamoDB 和 Amazon SQS 事件源：**原定设置的批处理时段是 0 秒。这意味着，一旦记录可用，Lambda 就会调用您的函数。要设置批处理时段，请配置 `MaximumBatchingWindowInSeconds`。您可以将此参数设置为介于 0 秒到 300 秒之间的任意值，以 1 秒为增量。如果您配置了批处理时段，则下一个时段将在上一个函数调用完成后立即开始计算。
  + **对于 Amazon MSK、自托管式 Apache Kafka、Amazon MQ 和 Amazon DocumentDB 事件源：**默认批处理时段为 500 毫秒。您可以将 `MaximumBatchingWindowInSeconds` 配置为介于 0 秒到 300 秒之间的任意值，以秒的整数倍调整。在 Kafka 事件源映射的预置模式中，如果您配置了批处理时段，则下一个时段将在上一批处理完成后立即开始计算。在非预置 Kafka 事件源映射中，如果您配置了批处理时段，则下一个时段将在上一个函数调用完成后立即开始计算。要最大限度地减小在预置模式下使用 Kafka 事件源映射的延迟，请将 `MaximumBatchingWindowInSeconds` 设置为 0。此设置可确保 Lambda 在完成当前函数调用后立即开始处理下一个批次。有关低延迟处理的更多信息，请参阅[低延迟 Apache Kafka](with-kafka-low-latency.md)。
  + **对于 Amazon MQ 和 Amazon DocumentDB 事件源：**默认批处理时段为 500 毫秒。您可以将 `MaximumBatchingWindowInSeconds` 配置为介于 0 秒到 300 秒之间的任意值，以秒的整数倍调整。第一条记录到达后，批处理时段将立即开始计算。
**注意**  
由于您只能以秒的整数倍调整 `MaximumBatchingWindowInSeconds`，因此无法在更改该值后恢复到 500 毫秒的默认批处理时段。要恢复原定设置的批处理时段，必须创建新的事件源映射。
+ **达到批处理大小。**最小批处理大小为 1。原定设置和最大批处理大小取决于事件源。有关这些值的详细信息，请参阅 `CreateEventSourceMapping` API 操作的 [BatchSize](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-BatchSize) 规范。
+ **有效载荷大小达到 [6MB](https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html)。**您不能修改此限制。

下图演示了这三个条件。假设批处理时段从 `t = 7` 秒开始。在第一种场景中，批处理时段累积 5 条记录后在 `t = 47` 秒达到 40 秒的最大值。在第二种场景中，批处理大小在批处理时段到期之前达到 10，因此批处理时段会提前结束。在第三种场景中，在批处理时段到期之前达到最大有效载荷大小，因此批处理时段会提前结束。

![\[\]](http://docs.aws.amazon.com/zh_cn/lambda/latest/dg/images/batching-window.png)


建议您测试不同批处理和记录的大小，这样每个事件源的轮询频率都会根据函数完成任务的速度进行调整。[CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) `BatchSize` 参数控制每次调用可向函数发送记录的最大数量。批处理大小如果较大，通常可以更有效地吸收大量记录的调用开销，从而增加吞吐量。

Lambda 在发送下次批处理之前不会等待任何配置的[扩展](lambda-extensions.md)完成。换句话说，扩展可能会在 Lambda 处理下一批记录时继续运行。如果您违反了账户的任何[并发](lambda-concurrency.md)设置或限制，可能会导致节流问题。要检测这是否是潜在问题，请监控函数并检查所显示的[并发指标](monitoring-concurrency.md#general-concurrency-metrics)是否高于事件源映射的预期。由于调用间隔时间较短，Lambda 可能会短暂报告高于分片数量的并发使用量。即使对于没有扩展名的 Lambda 函数也是如此。

预设情况下，如果函数返回错误，事件源映射会重新处理整个批处理，直到函数成功，或直到批处理中的项目到期。为确保按顺序处理，在错误得到解决之前，事件源映射会暂停处理受影响的分片。对于流源（DynamoDB 和 Kinesis），可以配置 Lambda 在函数返回错误时重试的最大次数。批次未到达您的函数时出现的服务错误或节流，不计入重试次数。您还可以配置事件源映射，以便在丢弃事件批处理时将调用记录发送到[目标](invocation-async-retain-records.md#invocation-async-destinations)。

## 预调配模式
<a name="invocation-eventsourcemapping-provisioned-mode"></a>

Lambda 事件源映射使用事件轮询器对您的事件源进行轮询，以获取新消息。默认情况下，Lambda 根据消息量管理这些轮询器的自动扩缩。当消息流量增加时，Lambda 会自动增加用于处理负载的事件轮询器数量，并在流量减少时减少事件轮询器数量。

在预置模式下，您可以通过定义专用轮询资源的最低和最高限制来微调事件源映射的吞吐量，这些资源始终处于准备状态以应对预期的流量模式。这些资源能够自动扩展 3 倍的速度，以应对事件流量的突然激增，并提供高出 16 倍的容量来处理数百万个事件。这样有助于您构建响应速度快且具有严格性能要求的事件驱动型工作负载。

在 Lambda 中，事件轮询器是一种计算单元，其吞吐能力因事件源类型的不同而有所差异。对于 Amazon MSK 和自行管理的 Apache Kafka，每个事件轮询器最多可以处理 5 MB/秒的吞吐量或最多 5 次并发调用。例如，如果您的事件源平均生成 1MB 的有效载荷，而您的函数平均运行时间为 1 秒，则单一的 Kafka 事件轮询器能够支持每秒 5MB 的吞吐量以及 5 次并发的 Lambda 调用（假设没有有效载荷的转换操作）。对于 Amazon SQS，每个事件轮询器最多可以处理 1 MB/秒的吞吐量或最多 10 次并发调用。使用预置模式会产生额外成本，具体取决于您的事件轮询器使用情况。有关定价的详细信息，请参阅 [AWS Lambda 定价](https://aws.amazon.com/lambda/pricing/)。

预置模式适用于 Amazon MSK、自行管理的 Apache Kafka 和 Amazon SQS 事件源。虽然并发设置允许您控制函数的扩展，但预调配模式允许您控制事件源映射的吞吐量。为确保获得最佳性能，您可能需要分别调整这两个设置。

预置模式非常适合需要稳定的事件处理延迟的实时应用程序，例如金融服务公司处理市场数据源、电子商务平台提供实时个性化推荐以及游戏公司管理实时玩家互动等。

每个事件轮询器均支持不同的吞吐能力：
+ 对于 Amazon MSK 和自行管理的 Apache Kafka：最高可以处理 5 MB/秒的吞吐量或最多 5 次并发调用
+ 对于 Amazon SQS：最多可以处理 1 MB/秒 的吞吐量或最多 10 次并发调用，或者每秒最多进行 10 次 SQS 轮询 API 调用。

对于 Amazon SQS 事件源映射，您可以将轮询器的最小数量设置在 2 到 200 之间，默认值为 2；同时，将最大数量设置在 2 到 2000 之间，默认值为 200。Lambda 会根据您所配置的最小值和最大值来扩展事件轮询器的数量，能够迅速将并发次数提升至最多每分钟 1000 次，从而实现对事件的一致性低延迟处理。

对于 Kafka 事件源映射，您可以将轮询器的最小数量设置在 1 到 200 之间（默认值为 1），同时将最大数量设置在 1 到 2000 之间（默认值为 200）。Lambda 会根据您主题中的事件积压情况，在您配置的最小值和最大值之间扩缩事件轮询器的数量，从而实现对事件的低延迟处理。

请注意，对于 Amazon SQS 事件源，最大并发设置不能与预置模式一起使用。使用预置模式时，您可以通过最大的事件轮询器设置来控制并发性。

有关配置预调配模式的详细信息，请参阅以下各部分：
+ [为 Amazon MSK 事件源映射配置预调配模式](kafka-scaling-modes.md)
+ [为自行管理 Apache Kafka 事件源映射配置预调配模式](kafka-scaling-modes.md#kafka-provisioned-mode)
+ [对 Amazon SQS 事件源映射使用预置模式](with-sqs.md#sqs-provisioned-mode)

要最大限度地降低预置模式下的延迟，请将 `MaximumBatchingWindowInSeconds` 设置为 0。此设置可确保 Lambda 在完成当前函数调用后立即开始处理下一个批次。有关低延迟处理的更多信息，请参阅[低延迟 Apache Kafka](with-kafka-low-latency.md)。

配置预调配模式后，您可以通过监控 `ProvisionedPollers` 指标来观测事件轮询器对您的工作负载的使用情况。有关更多信息，请参阅 [事件源映射指标](monitoring-metrics-types.md#event-source-mapping-metrics)。

## 事件源映射 API
<a name="event-source-mapping-api"></a>

要使用 [AWS Command Line Interface（AWS CLI）](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) 或 [AWS SDK](https://aws.amazon.com/getting-started/tools-sdks/) 来管理事件源，可以使用以下 API 操作：
+ [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html)
+ [ListEventSourceMappings](https://docs.aws.amazon.com/lambda/latest/api/API_ListEventSourceMappings.html)
+ [GetEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_GetEventSourceMapping.html)
+ [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html)
+ [DeleteEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_DeleteEventSourceMapping.html)

# 在事件源映射上使用标签
<a name="tags-esm"></a>

对事件源映射进行标记可以组织和管理资源。标签是与资源关联的自由格式键值对，在 AWS 服务中受支持。有关标签用例的更多信息，请参阅《Tagging AWS Resources and Tag Editor Guide》**中的 [Common tagging strategies](https://docs.aws.amazon.com//tag-editor/latest/userguide/best-practices-and-strats.html#tag-strategies)。

事件源映射与函数相关联，而函数也可以有自己的标签。事件源映射并不会自动继承函数的标签。可以使用 AWS Lambda API 来查看和更新标签。在 Lambda 控制台中管理特定事件源映射时，还可以查看和更新标签，包括将预置模式用于 Amazon SQS 的那些标签。

## 使用标签所需的权限
<a name="esm-tags-required-permissions"></a>

要允许 AWS Identity and Access Management（IAM）身份（用户、组或角色）读取资源或为其设置标签，请授予该身份相应的权限：
+ **lambda:ListTags** – 当资源有标签时，将此权限授予需要在其上调用 `ListTags` 的任何人。对于带标签的函数，`GetFunction` 也需要此权限。
+ **lambda:TagResource** – 将此权限授予需要调用 `TagResource` 或执行在创建时授予标记的操作的任何人。

或者，也可以考虑授予 **lambda:UntagResource** 权限以允许 `UntagResource` 调用该资源。

有关更多信息，请参阅 [Lambda 的基于身份的 IAM policy](access-control-identity-based.md)。

## 通过 Lambda 控制台使用标签
<a name="tags-esm-console"></a>

您可以使用 Lambda 控制台创建带标签的事件源映射、向现有事件源映射添加标签以及按标签筛选事件源映射，包括在预置模式中为 Amazon SQS 配置的那些映射。

使用 Lambda 控制台为支持的流服务和基于队列的服务添加触发器时，Lambda 会自动创建事件源映射。有关这些事件源的更多信息，请参阅 [Lambda 如何处理来自基于流和队列的事件源的记录](invocation-eventsourcemapping.md)。若要在控制台中创建事件源映射，必须满足以下先决条件：
+ 函数
+ 来自受影响服务的事件源

可在用于创建或更新触发器的同一用户界面中添加标签。

**在创建事件源映射时添加标签**

1. 打开 Lamba 控制台的[函数](https://console.aws.amazon.com/lambda/home#/functions)页面。

1. 选择函数的名称。

1. 在 **Function overview**（函数概览）下，选择 **Add trigger**（添加触发器）。

1. 在**触发器配置**下的下拉列表中，选择事件源所属服务的名称。

1. 为事件源提供核心配置。有关配置事件源的更多信息，请参阅 [使用来自其 AWS 他服务的事件调用 Lambda](lambda-services.md)中相关服务的部分。

1. 在**事件源映射配置**下选择**其他设置**。

1. 在**标签**下选择**添加新标签**。

1. 在**键**字段中输入标签键。有关标记限制的信息，请参阅《Tagging AWS Resources and Tag Editor Guide》中的 [Tag naming limits and requirements](https://docs.aws.amazon.com//tag-editor/latest/userguide/best-practices-and-strats.html#id_tags_naming_best_practices)。**

1. 选择**添加**。

**向现有事件源映射添加标签**

1. 在 Lambda 控制台中打开[事件源映射](https://console.aws.amazon.com/lambda/home#/event-source-mappings)。

1. 在资源列表中，选择与**函数**和**事件源 ARN** 对应的事件源映射的 **UUID**。

1. 从**常规配置**窗格下方的选项卡列表中选择**标签**。

1. 选择**管理标签**。

1. 选择 **Add new tag（添加新标签）**。

1. 在**键**字段中输入标签键。有关标记限制的信息，请参阅《Tagging AWS Resources and Tag Editor Guide》中的 [Tag naming limits and requirements](https://docs.aws.amazon.com//tag-editor/latest/userguide/best-practices-and-strats.html#id_tags_naming_best_practices)。**

1. 选择**保存**。

**按标签筛选事件源映射**

1. 在 Lambda 控制台中打开[事件源映射](https://console.aws.amazon.com/lambda/home#/event-source-mappings)。

1. 选择搜索框。

1. 在下拉列表中，从**标签**副标题下方选择标签键。

1. 选择**使用：“tag-name”**查看所有使用此键标记的事件源映射，或者选择一个**运算符**进一步按值筛选。

1. 选择标签值以按标签键和值的组合进行筛选。

搜索框还支持搜索标签键。输入键名称，即可在列表中查找该键。

## 通过 AWS CLI 使用标签
<a name="tags-esm-cli"></a>

可以使用 Lambda API 在现有 Lambda 资源（包括事件源映射）上添加和删除标签。还可以在创建事件源映射时添加标签，这样就可以在资源的整个生命周期中对其进行标记。

### 使用 Lambda 标签 API 更新标签
<a name="tags-esm-api-config"></a>

可以通过 [TagResource](https://docs.aws.amazon.com/lambda/latest/api/API_TagResource.html) 和 [UntagResource](https://docs.aws.amazon.com/lambda/latest/api/API_UntagResource.html) API 操作，添加和删除受支持 Lambda 资源的标签。

可以使用 AWS CLI 调用这些操作。要向现有资源添加标签，请使用 `tag-resource` 命令。此示例添加了两个标签，一个带有键 *Department*，另一个带有键 *CostCenter*。

```
aws lambda tag-resource \
--resource arn:aws:lambda:us-east-2:123456789012:resource-type:my-resource \
--tags Department=Marketing,CostCenter=1234ABCD
```

要删除标签，请使用 `untag-resource` 命令。此示例删除了键为 *Department* 的标签。

```
aws lambda untag-resource --resource arn:aws:lambda:us-east-1:123456789012:resource-type:resource-identifier \
--tag-keys Department
```

### 在创建事件源映射时添加标签
<a name="tags-esm-on-create"></a>

若要使用标签创建 Lambda 事件源映射，请使用 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) API 操作。指定 `Tags` 参数。可以使用 `create-event-source-mapping` AWS CLI 命令和 `--tags` 选项调用此操作。有关使用此 CLI 命令的更多信息，请参阅《AWS CLI Command Reference》中的 [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html)。**

在将 `Tags` 参数与 `CreateEventSourceMapping` 一起使用之前，请确保角色拥有标记资源的权限以及此操作所需的常规权限。有关标记权限的更多信息，请参阅 [使用标签所需的权限](#esm-tags-required-permissions)。

### 使用 Lambda 标签 API 查看标签
<a name="tags-esm-api-view"></a>

要查看应用于特定 Lambda 资源的标签，请使用 `ListTags` API 操作。有关更多信息，请参阅 [ListTags](https://docs.aws.amazon.com/lambda/latest/api/API_ListTags.html)。

可以提供 ARN（Amazon 资源名称），以使用 `list-tags` AWS CLI 命令调用此操作。

```
aws lambda list-tags --resource arn:aws:lambda:us-east-1:123456789012:resource-type:resource-identifier
```

### 按标签筛选资源
<a name="tags-esm-filtering"></a>

您可以使用 AWS Resource Groups Tagging API [GetResources](https://docs.aws.amazon.com/resourcegroupstagging/latest/APIReference/API_GetResources.html) API 操作按标签筛选资源。`GetResources` 操作最多可接收 10 个筛选条件，每个筛选条件包含一个标签键和最多 10 个标签值。提供具有 `ResourceType` 的 `GetResources`，可按特定资源类型进行筛选。

可以使用 `get-resources` AWS CLI 命令调用此操作。有关使用 `get-resources` 的示例，请参阅《AWS CLI Command Reference》**中的 [get-resources](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/resourcegroupstaggingapi/get-resources.html#examples)。