

# 使用 Lambda 处理来自 Amazon Kinesis Data Streams 的记录
<a name="with-kinesis"></a>

您可以使用 Lambda 函数来处理 [Amazon Kinesis 数据流](https://docs.aws.amazon.com/streams/latest/dev/introduction.html)中的记录。您可以将 Lambda 函数映射到 Kinesis Data Streams 共享吞吐量使用者（标准迭代器）或具有[增强型扇出功能](https://docs.aws.amazon.com/kinesis/latest/dev/enhanced-consumers.html)的专用吞吐量使用者。对于标准迭代器，Lambda 使用 HTTP 协议轮询 Kinesis 流中的每个分片以查找记录。事件源映射与分片的其他使用者共享读取吞吐量。

 有关 Kinesis 数据流的详细信息，请参阅[读取 Amazon Kinesis Data Streams 中的数据](https://docs.aws.amazon.com/kinesis/latest/dev/building-consumers.html)。

**注意**  
Kinesis 按每个分区收费；对于增强型扇出功能，从流中读取数据。有关定价详细信息，请参阅 [Amazon Kinesis 定价](https://aws.amazon.com/kinesis/data-streams/pricing)。

## 轮询和批处理流
<a name="kinesis-polling-and-batching"></a>

Lambda 从数据流中读取记录并[同步](invocation-sync.md)调用您的函数，带有一个包含流记录的事件。Lambda 分批读取记录并调用您的函数来处理批处理中的记录。每个批处理包含来自单个分区/数据流的记录。

您的 Lambda 函数是数据流的用户应用程序。对于每个分片，它一次处理一批记录。您可以将 Lambda 函数映射到共享吞吐量使用者（标准迭代器）或具有增强扇出功能的专用吞吐量使用者。
+ **标准迭代器：**Lambda 将针对记录轮询 Kinesis 流中的每个分片（按照每秒一次的基本频率）。当有更多记录可用时，Lambda 会继续进行批处理，直到函数赶上流的速度。事件源映射与分片的其他使用者共享读取吞吐量。
+ **增强型扇出功能：**为了最大限度地减少延迟并最大限度地提高读取吞吐量，请创建具有[增强型扇出功能](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html)的数据流使用者。增强扇出功能使用者将获得与每个分片的专用连接，这不会影响从流中读取信息的其他应用程序。流使用者使用 HTTP/2 通过长期连接将记录推送到 Lambda 并压缩请求头来减少延迟。您可以使用 Kinesis [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html) API 创建流使用者。

```
aws kinesis register-stream-consumer \
--consumer-name con1 \
--stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream
```

您应看到以下输出：

```
{
    "Consumer": {
        "ConsumerName": "con1",
        "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608",
        "ConsumerStatus": "CREATING",
        "ConsumerCreationTimestamp": 1540591608.0
    }
}
```

要提高函数处理记录的速度，[请将分片添加到数据流中](https://repost.aws/knowledge-center/kinesis-data-streams-open-shards)。Lambda 按顺序处理各个分区中的记录。如果您的函数返回错误，它会停止处理分片中的其他记录。使用更多分片，可以同时处理更多批次，从而降低错误对并发性的影响。

如果您的函数无法扩展以处理并发批处理的总数，请为您的函数[请求提高配额](https://docs.aws.amazon.com/servicequotas/latest/userguide/request-quota-increase.html)或[预留并发](configuration-concurrency.md)。

默认情况下，Lambda 会在记录可用时尽快调用您的函数。如果 Lambda 从事件源中读取的批处理只有一条记录，则 Lambda 将会只向该函数发送一条记录。为避免在记录数量较少的情况下调用该函数，您可以配置 *batching window*（批处理时段），让事件源缓冲最多五分钟的记录。调用函数前，Lambda 会持续从事件源中读取记录，直到收集完整批处理、批处理时段到期或批处理达到 6MB 的有效负载时为止。有关更多信息，请参阅[批处理行为](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)。

**警告**  
Lambda 事件源映射至少处理每个事件一次，有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题，我们强烈建议您将函数代码设为幂等性。要了解更多信息，请参阅 AWS 知识中心的[如何让我的 Lambda 函数保持幂等性](https://repost.aws/knowledge-center/lambda-function-idempotent)。

Lambda 在发送下次批处理之前不会等待任何配置的[扩展](lambda-extensions.md)完成。换句话说，扩展可能会在 Lambda 处理下一批记录时继续运行。如果您违反了账户的任何[并发](lambda-concurrency.md)设置或限制，可能会导致节流问题。要检测这是否是潜在问题，请监控函数并检查所显示的[并发指标](monitoring-concurrency.md#general-concurrency-metrics)是否高于事件源映射的预期。由于调用间隔时间较短，Lambda 可能会短暂报告高于分片数量的并发使用量。即使对于没有扩展名的 Lambda 函数也是如此。

配置 [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor) 设置以同时使用多个 Lambda 调用处理 Kinesis 数据流的一个分片。您可以指定 Lambda 通过从 1（默认值）到 10 的并行化因子从分区中轮询的并发批次数。例如，假设您将 `ParallelizationFactor` 设置为 2，则最多可以有 200 个并发 Lambda 调用来处理 100 个 Kinesis 数据分片（但您可能实际上会看到不同的 `ConcurrentExecutions` 指标值）。这有助于在数据量不稳定并且 `IteratorAge` 较高时纵向扩展处理吞吐量。增加每个分片的并发批次数后，Lambda 仍然可以确保分区密钥级别的顺序处理。

您还可以将 `ParallelizationFactor` 与 Kinesis 聚合一起使用。事件源映射的行为取决于您是否使用[增强型扇出功能](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html)：
+ **如果没有增强型扇出功能**：聚合事件中的所有事件都必须具有相同的分区键。该分区键还必须与聚合事件的分区键相匹配。如果聚合事件中的事件具有不同的分区键，则 Lambda 无法保证按分区键依照顺序处理事件。
+ **借助增强型扇出功能**：首先，Lambda 将聚合的事件解码为其单个事件。聚合事件可以具有与其包含的事件不同的分区键。但是，与分区键不对应的事件会被[丢弃并丢失](https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md)。Lambda 不处理这些事件，也不会将它们发送到配置的失败目标。

## 示例事件
<a name="services-kinesis-event-example"></a>

**Example**  

```
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}
```