Implementación del procesamiento con estado de Kinesis Data Streams en Lambda
Las funciones de Lambda pueden ejecutar aplicaciones de procesamiento de flujo continuo. Una secuencia representa datos ilimitados que fluyen de forma continua a través de su aplicación. Para analizar la información de esta entrada de actualización continua, puede enlazar los registros incluidos mediante una ventana definida en términos de tiempo.
Las ventanas de salto constante son ventanas de tiempo distintas que se abren y cierran a intervalos regulares. De forma predeterminada, las invocaciones de Lambda no tienen estado: no se pueden utilizar para procesar datos en múltiples invocaciones continuas sin una base de datos externa. Sin embargo, con las ventanas de salto constante, puede mantener su estado en todas las invocaciones. Este estado contiene el resultado agregado de los mensajes procesados previamente para la ventana actual. Su estado puede ser un máximo de 1 MB por partición. Si supera ese tamaño, Lambda finaliza la ventana antes de tiempo.
Cada registro de una secuencia pertenece a un periodo específico. Lambda procesará cada registro al menos una vez, pero no garantiza que cada registro se procese solo una vez. En casos excepcionales, como el manejo de errores, es posible que algunos registros se procesen más de una vez. Los registros siempre se procesan en orden la primera vez. Si los registros se procesan más de una vez, es posible que lo hagan de forma desordenada.
Agregación y procesamiento
Su función administrada por el usuario se invoca tanto para la agregación como para procesar los resultados finales de esa agregación. Lambda agrega todos los registros recibidos en la ventana. Puede recibir estos registros en varios lotes, cada uno como una invocación independiente. Cada invocación recibe un estado. Por lo tanto, al usar las ventanas de salto constante, su respuesta de la función de Lambda debe contener una propiedad de state
. Si la respuesta no contiene una propiedad de state
, Lambda considera que esto es una invocación fallida. Para satisfacer esta condición, la función puede devolver un objeto de TimeWindowEventResponse
, que tiene la siguiente forma JSON:
ejemplo Valores TimeWindowEventResponse
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
nota
Para las funciones Java, se recomienda utilizar un Map<String, String>
para representar el estado.
Al final de la ventana, el indicador isFinalInvokeForWindow
está configurado en true
para indicar que este es el estado final y que está listo para su procesamiento. Después del procesamiento, la ventana se completa y su invocación final se completa, y luego se elimina el estado.
Al final de la ventana, Lambda utiliza el procesamiento final para las acciones en los resultados de agregación. Su procesamiento final se invoca sincrónicamente. Después de la invocación exitosa, los puntos de control de la función, el número de secuencia y el procesamiento de flujo continúa. Si la invocación no tiene éxito, su función de Lambda suspende el procesamiento posterior hasta una invocación exitosa.
ejemplo KinesisTimeWindowEvent
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1607497475.000 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-1", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" } ], "window": { "start": "2020-12-09T07:04:00Z", "end": "2020-12-09T07:06:00Z" }, "state": { "1": 282, "2": 715 }, "shardId": "shardId-000000000006", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }
Configuración
Puede configurar ventanas de salto constante al crear o actualizar una asignación de orígenes de eventos. Para configurar una ventana de saltos de tamaño constante, especifique la ventana en segundos (TumblingWindowInSeconds). El siguiente comando de ejemplo AWS Command Line Interface (AWS CLI) crea una asignación de origen de eventos de streaming que tiene una ventana de salto constante de 120 segundos. Se nombra la función de Lambda definida para la agregación y el procesamiento se llama tumbling-window-example-function
.
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream \ --function-name tumbling-window-example-function \ --starting-position TRIM_HORIZON \ --tumbling-window-in-seconds
120
Lambda determina los límites de la ventana de salto constante en función de la hora en que se insertaron los registros en la secuencia. Todos los registros tienen una marca de hora aproximada disponible que Lambda utiliza en las determinaciones de límites.
Las agregaciones de ventanas de saltos constantes no admiten el reendurecimiento. Cuando una partición termina, Lambda considera la ventana actual como cerrada y las particiones secundarias comienzan su propia ventana en un estado renovado. Cuando no se agrega ningún registro nuevo a la ventana actual, Lambda espera hasta 2 minutos antes de suponer que la ventana ha terminado. Esto ayuda a garantizar que la función lea todos los registros de la ventana actual, incluso si los registros se agregan de forma intermitente.
Ventanas de saltos constantes son totalmente compatibles con las directivas de reintento existentes maxRetryAttempts
y maxRecordAge
.
ejemplo Handler.py: agregación y procesamiento
La siguiente función de Python muestra cómo agregar y luego procesar su estado final:
def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1 print('Returning state: ', state) return {'state': state}