Lambda에서 상태 저장 DynamoDB 스트림 처리 구현
Lambda 함수는 연속 스트림 처리 애플리케이션을 실행할 수 있습니다. 스트림은 애플리케이션을 통해 연속적으로 흐르는 무한 데이터를 나타냅니다. 이 지속적으로 업데이트되는 입력의 정보를 분석하기 위해 시간의 항으로 정의된 윈도우를 사용하여 포함된 레코드를 바인딩할 수 있습니다.
텀블링 기간은 일정한 간격으로 시작되고 끝나는 고유한 시간대입니다. 기본적으로 Lambda 호출은 상태 비저장입니다. 즉, 외부 데이터베이스 없이는 여러 연속 호출에서 데이터를 처리하는 데 사용할 수 없습니다. 그러나 텀블링 기간을 활성화하면 호출 간에 상태를 유지할 수 있습니다. 이 상태에는 현재 윈도우에 대해 이전에 처리된 메시지의 집계 결과가 포함됩니다. 상태는 샤드당 최대 1MB가 될 수 있습니다. 이 크기를 초과하면 Lambda가 윈도우를 조기 종료합니다.
스트림의 각 레코드는 특정 윈도우에 속합니다. Lambda는 각 레코드를 한 번 이상 처리하지만 각 레코드가 한 번만 처리된다고 보장하지는 않습니다. 드물게 오류 처리와 같이 일부 레코드가 두 번 이상 처리될 수 있습니다. 레코드는 항상 처음부터 순서대로 처리됩니다. 레코드가 두 번 이상 처리되는 경우 레코드가 비순차적으로 처리될 수 있습니다.
집계 및 처리
집계 및 해당 집계의 최종 결과 처리를 위해 사용자 관리형 함수가 간접 호출됩니다. Lambda는 윈도우 내에서 수신한 모든 레코드를 집계합니다. 이러한 레코드를 각각 별도의 호출인 여러 배치에서 받을 수 있습니다. 각 호출은 상태를 수신합니다. 따라서 텀블링 기간을 사용할 때 Lambda 함수 응답에는 state
속성을 포함해야 합니다. 응답에 state
속성이 포함되지 않은 경우 Lambda는 이 호출이 실패한 것으로 간주합니다. 이 조건을 충족하기 위해 함수는 다음 JSON 모양의 TimeWindowEventResponse
객체를 반환할 수 있습니다.
예 TimeWindowEventResponse
값
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
참고
Java 함수의 경우 Map<String, String>
을 사용하여 상태를 나타내는 것이 좋습니다.
윈도우 끝에서 isFinalInvokeForWindow
플래그는 이것이 최종 상태이며 처리 준비가 되었음을 나타내기 위해 true
로 설정됩니다. 처리 후 윈도우가 완료되고 최종 호출이 완료된 다음 상태가 삭제됩니다.
윈도우 끝에서 Lambda가 집계 결과에 대한 작업을 위해 최종 처리를 사용합니다. 최종 처리는 동기식으로 간접 호출됩니다. 성공적인 호출 후 함수가 시퀀스 번호를 검사하고 스트림 처리가 계속됩니다. 호출이 실패하면 Lambda 함수는 호출이 성공할 때까지 추가 처리를 일시 중단합니다.
예 DynamodbTimeWindowEvent
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ], "window": { "start": "2020-07-30T17:00:00Z", "end": "2020-07-30T17:05:00Z" }, "state": { "1": "state1" }, "shardId": "shard123456789", "eventSourceARN": "stream-ARN", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }
구성
이벤트 소스 매핑을 생성하거나 업데이트할 때 텀블링 윈도우를 구성할 수 있습니다. 텀블링 윈도우를 구성하려면 윈도우를 초 단위로 지정합니다(TumblingWindowInSeconds). 다음 예제 AWS Command Line Interface(AWS CLI) 명령은 텀블링 윈도우가 120초인 스트리밍 이벤트 소스 매핑을 생성합니다. 집계 및 처리를 위해 정의된 Lambda 함수는 tumbling-window-example-function
입니다.
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \ --function-name tumbling-window-example-function \ --starting-position TRIM_HORIZON \ --tumbling-window-in-seconds
120
Lambda는 레코드가 스트림에 삽입된 시간을 기준으로 텀블링 윈도우 경계를 결정합니다. 모든 레코드에는 Lambda가 경계 결정에서 사용하는 대략적인 타임스탬프가 있습니다.
텀블링 윈도우 집계는 리샤딩을 지원하지 않습니다. 샤드가 끝나면 Lambda는 윈도우가 닫힌 것으로 간주하며 자식 샤드는 새 상태로 고유한 윈도우를 시작합니다.
텀블링 윈도우는 기존 재시도 정책 maxRetryAttempts
및 maxRecordAge
를 완벽하게 지원합니다.
예 Handler.py - 집계 및 처리
다음 Python 함수는 최종 상태를 집계한 다음 처리하는 방법을 보여줍니다.
def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1 print('Returning state: ', state) return {'state': state}