在 Lambda 中實作可設定狀態的 Kinesis Data Streams 處理 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Lambda 中實作可設定狀態的 Kinesis Data Streams 處理

Lambda 函數可執行持續串流處理應用程式。串流表示持續在應用程式中流動的無限制資料。若要分析此持續更新輸入中的資訊,您可以使用定義的時段來限制包含的記錄。

輪轉時段是定期開啟和關閉的不同時段。依預設,Lambda 調用是無狀態的,您無法在沒有外部資料庫的情況下,將其用於處理多個持續調用的資料。然而,使用輪轉時段,您可以在不同的調用間維護狀態。此狀態包含之前為目前時段處理之訊息的彙總結果。狀態可以是每個分區最多 1 MB。如果超過該大小,則 Lambda 會提前終止時段。

串流中的每個記錄都屬於一個特定時段。Lambda 至少會處理一次每筆記錄,但不保證每筆記錄只會處理一次。在極少數情況下,例如錯誤處理,某些記錄可能會處理多次。第一次時一律會依序處理記錄。如果多次處理記錄,則可能不會按順序處理。

彙總與處理

調用您的使用者管理函數進行彙總,以及處理該彙總的最終結果。Lambda 會彙總時段中接收的所有記錄。您可以在多個批次中接收這些記錄,各自作為單獨的調用。每次調用會收到一個狀態。因此,當使用輪轉時段時,您的 Lambda 函數回應必須包含 state 屬性。如果回應不包含 state 屬性,Lambda 會將此視為失敗的調用。為了滿足此條件,您的函數可以返回一個 TimeWindowEventResponse 物件,它具有下列 JSON 形狀:

範例 TimeWindowEventResponse
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
注意

對於 Java 函數,我們建議使用 Map<String, String> 來表示狀態。

在時段結束時,標記 isFinalInvokeForWindow 會設定為 true 以指示這是最終狀態,並且可隨時進行處理。處理完成後,時段結束並完成最終調用,然後丟棄該狀態。

在時段結束時,Lambda 會針對彙總結果上的動作使用最終處理。您的最終處理將同步調用。成功調用後,您的函數檢查點序號和串流處理將會繼續。如果調用失敗,則您的 Lambda 函數會暫停進一步處理,直至成功調用。

範例 KinesisTimeWindowEvent
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1607497475.000 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-1", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" } ], "window": { "start": "2020-12-09T07:04:00Z", "end": "2020-12-09T07:06:00Z" }, "state": { "1": 282, "2": 715 }, "shardId": "shardId-000000000006", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

組態

您可以在建立或更新事件來源對映時設定輪轉時段。若要設定暫停視窗,請以秒為單位指定視窗 (TumblingWindowInSeconds)。下列範例 AWS Command Line Interface (AWS CLI) 命令會建立具有 120 秒暫停視窗的串流事件來源對應。針對彙總與處理定義的 Lambda 函數命名為 tumbling-window-example-function

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream \ --function-name tumbling-window-example-function \ --starting-position TRIM_HORIZON \ --tumbling-window-in-seconds 120

Lambda 根據記錄插入串流的時間,確定輪轉時段邊界。所有記錄都有 Lambda 在邊界確定中使用的近似時間戳記。

輪轉時段彙總不支援重新分區。當碎片結束時,Lambda 認為當前窗口被關閉,並且任何子碎片都將以新的狀態啟動自己的窗口。當沒有新記錄新增至目前視窗時,Lambda 會等待最多 2 分鐘,然後再假設視窗結束。這有助於確保函數讀取當前視窗中的所有記錄,即使記錄間歇性地添加也是如此。

輪轉時段完全支援現有的重試政策 maxRetryAttemptsmaxRecordAge

範例 Handler.py - 彙總與處理

下列 Python 函數示範了如何彙總,然後處理您的最終狀態:

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1 print('Returning state: ', state) return {'state': state}