在 Lambda 中实现有状态的 Kinesis Data Streams 处理
Lambda 函数可以运行连续流处理应用程序。流表示通过您的应用程序持续流动的无边界数据。要分析这种不断更新的输入中的信息,可以使用按时间定义的窗口来限制包含的记录。
滚动窗口是定期打开和关闭的不同窗口。预设情况下,Lambda 调用是无状态的,在没有外部数据库的情况下,无法使用它们跨多次连续调用处理数据。但是,有了滚动窗口后,您可以在不同调用中保持状态。此状态包含之前为当前窗口处理的消息的汇总结果。您的状态最多可以是每个分片 1MB。如果超过该大小,Lambda 将提前终止窗口。
流中的每条记录都属于特定窗口。Lambda 将至少处理每条记录一次,但不保证每条记录只处理一次。在极少数情况下(例如错误处理),某些记录可能会被多次处理。第一次处理记录时始终按顺序处理。如果多次处理记录,则可能会不按顺序处理。
聚合和处理
系统将调用您的用户托管函数以便聚合和处理该聚合的最终结果。Lambda 汇总在该窗口中接收的所有记录。您可以分多个批次接收这些记录,每个批次都作为单独的调用。每次调用都会收到一个状态。因此,当使用滚动窗口时,Lambda 函数响应必须包含 state
属性。如果响应不包含 state
属性,Lambda 会将其视作失败的调用。为了满足该条件,您的函数可以返回一个具有以下 JSON 形状的 TimeWindowEventResponse
对象:
例 TimeWindowEventResponse
值
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
注意
对于 Java 函数,我们建议使用 Map<String, String>
来表示状态。
在窗口末尾,标志 isFinalInvokeForWindow
被设置 true
,以表示这是最终状态,并且已准备好进行处理。处理完成后,窗口完成,最终调用完成,然后状态将被删除。
在窗口结束时,Lambda 会对针对聚合结果的操作应用最终处理。您的最终处理将同步调用。成功调用后,函数会检查序列号并继续进行流处理。如果调用失败,则您的 Lambda 函数将暂停进一步处理,直到成功调用为止。
例 KinesisTimeWindowEvent
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1607497475.000 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-1", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" } ], "window": { "start": "2020-12-09T07:04:00Z", "end": "2020-12-09T07:06:00Z" }, "state": { "1": 282, "2": 715 }, "shardId": "shardId-000000000006", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }
配置
您可以在创建或更新事件源映射时配置滚动窗口。要配置翻转窗口,请以秒为单位进行指定(TumblingWindowInSeconds)。以下示例 AWS Command Line Interface (AWS CLI) 命令会创建一个滚动窗口为 120 秒的流式事件源映射。为聚合和处理定义的 Lambda 函数被命名为 tumbling-window-example-function
。
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream \ --function-name tumbling-window-example-function \ --starting-position TRIM_HORIZON \ --tumbling-window-in-seconds
120
Lambda 根据记录插入到流的时间来确定滚动窗口的边界。所有记录都有一个大致的时间戳,供 Lambda 在确定边界时使用。
滚动窗口聚合不支持重新分片。当分片结束时,Lambda 认为当前窗口已关闭,并且任何子分片都将以全新状态启动自己的窗口。如果没有向当前窗口添加任何新记录,则 Lambda 会等待最多 2 分钟,然后假定该窗口已结束。这有助于确保函数读取当前窗口中的所有记录,即使这些记录是间歇性添加的。
滚动窗口完全支持现有的重试策略 maxRetryAttempts
和 maxRecordAge
。
例 Handler.py – 聚合和处理
以下 Python 函数演示了如何聚合然后处理您的最终状态:
def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1 print('Returning state: ', state) return {'state': state}