您可以使用 Lambda 函数来处理 Amazon Kinesis 数据流中的记录。您可以将 Lambda 函数映射到 Kinesis Data Streams 共享吞吐量使用者(标准迭代器)或具有增强型扇出功能的专用吞吐量使用者。对于标准迭代器,Lambda 使用 HTTP 协议轮询 Kinesis 流中的每个分片以查找记录。事件源映射与分片的其他使用者共享读取吞吐量。
有关 Kinesis 数据流的详细信息,请参阅读取 Amazon Kinesis Data Streams 中的数据。
注意
Kinesis 按每个分区收费;对于增强型扇出功能,从流中读取数据。有关定价详细信息,请参阅 Amazon Kinesis 定价
主题
轮询和批处理流
Lambda 从数据流中读取记录并同步调用您的函数,带有一个包含流记录的事件。Lambda 分批读取记录并调用您的函数来处理批处理中的记录。每个批处理包含来自单个分区/数据流的记录。
对于标准 Kinesis 数据流,Lambda 将轮询流中的每个分片来获取记录(按照每个分片每秒一次的频率)。对于 Kinesis 增强型扇出功能,Lambda 使用 HTTP/2 连接来监听从 Kinesis 推送的记录。如果记录可用,Lambda 会调用函数并等待结果。
默认情况下,Lambda 会在记录可用时尽快调用您的函数。如果 Lambda 从事件源中读取的批处理只有一条记录,则 Lambda 将会只向该函数发送一条记录。为避免在记录数量较少的情况下调用该函数,您可以配置 batching window(批处理时段),让事件源缓冲最多五分钟的记录。调用函数前,Lambda 会持续从事件源中读取记录,直到收集完整批处理、批处理时段到期或批处理达到 6MB 的有效负载时为止。有关更多信息,请参阅 批处理行为。
警告
Lambda 事件源映射至少处理每个事件一次,有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题,我们强烈建议您将函数代码设为幂等性。要了解更多信息,请参阅 AWS 知识中心的如何使我的 Lambda 函数具有幂等性
Lambda 在发送下次批处理之前不会等待任何配置的扩展完成。换句话说,扩展可能会在 Lambda 处理下一批记录时继续运行。如果您违反了账户的任何并发设置或限制,可能会导致节流问题。要检测这是否是潜在问题,请监控函数并检查所显示的并发指标是否高于事件源映射的预期。由于调用间隔时间较短,Lambda 可能会短暂报告高于分片数量的并发使用量。即使对于没有扩展名的 Lambda 函数也是如此。
配置 ParallelizationFactor 设置以同时使用多个 Lambda 调用处理 Kinesis 数据流的一个分片。您可以指定 Lambda 通过从 1(默认值)到 10 的并行化因子从分区中轮询的并发批次数。例如,假设您将 ParallelizationFactor
设置为 2,则最多可以有 200 个并发 Lambda 调用来处理 100 个 Kinesis 数据分片(但您可能实际上会看到不同的 ConcurrentExecutions
指标值)。这有助于在数据量不稳定并且 IteratorAge
较高时纵向扩展处理吞吐量。增加每个分片的并发批次数后,Lambda 仍然可以确保分区密钥级别的顺序处理。
您还可以将 ParallelizationFactor
与 Kinesis 聚合一起使用。事件源映射的行为取决于您是否使用增强型扇出功能:
-
如果没有增强型扇出功能:聚合事件中的所有事件都必须具有相同的分区键。该分区键还必须与聚合事件的分区键相匹配。如果聚合事件中的事件具有不同的分区键,则 Lambda 无法保证按分区键依照顺序处理事件。
-
借助增强型扇出功能:首先,Lambda 将聚合的事件解码为其单个事件。聚合事件可以具有与其包含的事件不同的分区键。但是,与分区键不对应的事件会被丢弃并丢失
。Lambda 不处理这些事件,也不会将它们发送到配置的失败目标。
示例事件
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1545084650.987
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
"data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
"approximateArrivalTimestamp": 1545084711.166
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
]
}