Lambda 如何处理来自 Amazon Kinesis Data Streams 的记录
您可以使用 Lambda 函数来处理 Amazon Kinesis 数据流中的记录。您可以将 Lambda 函数映射到 Kinesis Data Streams 共享吞吐量使用者(标准迭代器)或具有增强型扇出功能的专用吞吐量使用者。对于标准迭代器,Lambda 使用 HTTP 协议轮询 Kinesis 流中的每个分片以查找记录。事件源映射与分片的其他使用者共享读取吞吐量。
有关 Kinesis 数据流的详细信息,请参阅读取 Amazon Kinesis Data Streams 中的数据。
注意
Kinesis 按每个分区收费;对于增强型扇出功能,从流中读取数据。有关定价详细信息,请参阅 Amazon Kinesis 定价
主题
轮询和批处理流
Lambda 从数据流中读取记录并同步调用您的函数,带有一个包含流记录的事件。Lambda 分批读取记录并调用您的函数来处理批处理中的记录。每个批处理包含来自单个分区/数据流的记录。
对于标准 Kinesis 数据流,Lambda 将轮询流中的每个分片来获取记录(按照每个分片每秒一次的频率)。对于 Kinesis 增强型扇出功能,Lambda 使用 HTTP/2 连接来监听从 Kinesis 推送的记录。如果记录可用,Lambda 会调用函数并等待结果。
默认情况下,Lambda 会在记录可用时尽快调用您的函数。如果 Lambda 从事件源中读取的批处理只有一条记录,则 Lambda 将会只向该函数发送一条记录。为避免在记录数量较少的情况下调用该函数,您可以配置 batching window(批处理时段),让事件源缓冲最多五分钟的记录。调用函数前,Lambda 会持续从事件源中读取记录,直到收集完整批处理、批处理时段到期或批处理达到 6MB 的有效负载时为止。有关更多信息,请参阅 批处理行为。
警告
Lambda 事件源映射至少处理每个事件一次,有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题,我们强烈建议您将函数代码设为幂等性。要了解更多信息,请参阅 AWS 知识中心的如何使我的 Lambda 函数具有幂等性
Lambda 在发送下次批处理之前不会等待任何配置的扩展完成。换句话说,扩展可能会在 Lambda 处理下一批记录时继续运行。如果您违反了账户的任何并发设置或限制,可能会导致节流问题。要检测这是否是潜在问题,请监控函数并检查所显示的并发指标是否高于事件源映射的预期。由于调用间隔时间较短,Lambda 可能会短暂报告高于分片数量的并发使用量。即使对于没有扩展名的 Lambda 函数也是如此。
配置 ParallelizationFactor 设置以同时使用多个 Lambda 调用处理 Kinesis 数据流的一个分片。您可以指定 Lambda 通过从 1(默认值)到 10 的并行化因子从分区中轮询的并发批次数。例如,假设您将 ParallelizationFactor
设置为 2,则最多可以有 200 个并发 Lambda 调用来处理 100 个 Kinesis 数据分片(但您可能实际上会看到不同的 ConcurrentExecutions
指标值)。这有助于在数据量不稳定并且 IteratorAge
较高时纵向扩展处理吞吐量。增加每个分片的并发批次数后,Lambda 仍然可以确保分区密钥级别的顺序处理。
您还可以将 ParallelizationFactor
与 Kinesis 聚合一起使用。事件源映射的行为取决于您是否使用增强型扇出功能:
-
如果没有增强型扇出功能:聚合事件中的所有事件都必须具有相同的分区键。该分区键还必须与聚合事件的分区键相匹配。如果聚合事件中的事件具有不同的分区键,则 Lambda 无法保证按分区键依照顺序处理事件。
-
借助增强型扇出功能:首先,Lambda 将聚合的事件解码为其单个事件。聚合事件可以具有与其包含的事件不同的分区键。但是,与分区键不对应的事件会被丢弃并丢失
。Lambda 不处理这些事件,也不会将它们发送到配置的失败目标。
示例事件
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }