在 Lambda 中实现有状态的 DynamoDB 流处理 - AWS Lambda

在 Lambda 中实现有状态的 DynamoDB 流处理

Lambda 函数可以运行连续流处理应用程序。流表示通过您的应用程序持续流动的无边界数据。要分析这种不断更新的输入中的信息,可以使用按时间定义的窗口来限制包含的记录。

滚动窗口是定期打开和关闭的不同窗口。预设情况下,Lambda 调用是无状态的,在没有外部数据库的情况下,无法使用它们跨多次连续调用处理数据。但是,有了滚动窗口后,您可以在不同调用中保持状态。此状态包含之前为当前窗口处理的消息的汇总结果。您的状态最多可以是每个分片 1MB。如果超过该大小,Lambda 将提前终止窗口。

流中的每条记录都属于特定窗口。Lambda 将至少处理每条记录一次,但不保证每条记录只处理一次。在极少数情况下(例如错误处理),某些记录可能会被多次处理。第一次处理记录时始终按顺序处理。如果多次处理记录,则可能会不按顺序处理。

聚合和处理

系统将调用您的用户托管函数以便聚合和处理该聚合的最终结果。Lambda 汇总在该窗口中接收的所有记录。您可以分多个批次接收这些记录,每个批次都作为单独的调用。每次调用都会收到一个状态。因此,当使用滚动窗口时,Lambda 函数响应必须包含 state 属性。如果响应不包含 state 属性,Lambda 会将其视作失败的调用。为了满足该条件,您的函数可以返回一个具有以下 JSON 形状的 TimeWindowEventResponse 对象:

TimeWindowEventResponse
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
注意

对于 Java 函数,我们建议使用 Map<String, String> 来表示状态。

在窗口末尾,标志 isFinalInvokeForWindow 被设置 true,以表示这是最终状态,并且已准备好进行处理。处理完成后,窗口完成,最终调用完成,然后状态将被删除。

在窗口结束时,Lambda 会对针对聚合结果的操作应用最终处理。您的最终处理将同步调用。成功调用后,函数会检查序列号并继续进行流处理。如果调用失败,则您的 Lambda 函数将暂停进一步处理,直到成功调用为止。

例 DynamodbTimeWindowEvent
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ], "window": { "start": "2020-07-30T17:00:00Z", "end": "2020-07-30T17:05:00Z" }, "state": { "1": "state1" }, "shardId": "shard123456789", "eventSourceARN": "stream-ARN", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

配置

您可以在创建或更新事件源映射时配置滚动窗口。要配置翻转窗口,请以秒为单位进行指定(TumblingWindowInSeconds)。以下示例 AWS Command Line Interface (AWS CLI) 命令会创建一个滚动窗口为 120 秒的流式事件源映射。为聚合和处理定义的 Lambda 函数被命名为 tumbling-window-example-function

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \ --function-name tumbling-window-example-function \ --starting-position TRIM_HORIZON \ --tumbling-window-in-seconds 120

Lambda 根据记录插入到流的时间来确定滚动窗口的边界。所有记录都有一个大致的时间戳,供 Lambda 在确定边界时使用。

滚动窗口聚合不支持重新分片。当分区结束时,Lambda 会认为窗口已关闭,子分区将以全新的状态启动自己的窗口。

滚动窗口完全支持现有的重试策略 maxRetryAttemptsmaxRecordAge

例 Handler.py – 聚合和处理

以下 Python 函数演示了如何聚合然后处理您的最终状态:

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1 print('Returning state: ', state) return {'state': state}