Implémentation du traitement dynamique de Kinesis Data Streams dans Lambda - AWS Lambda

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Implémentation du traitement dynamique de Kinesis Data Streams dans Lambda

Les fonctions Lambda peuvent exécuter des applications de traitement de flux continu. Un flux représente des données illimitées qui circulent en continu dans votre application. Pour analyser les informations provenant de cette entrée de mise à jour continue, vous pouvez lier les enregistrements inclus à l’aide d’une fenêtre de temps définie.

Les fenêtres bascules sont des fenêtres temporelles distinctes qui s’ouvrent et se ferment à intervalles réguliers. Par défaut, les invocations Lambda sont sans état : vous ne pouvez pas les utiliser pour traiter des données sur plusieurs invocations continues sans base de données externe. Cependant, avec les fenêtres bascules, vous pouvez maintenir votre état au long des invocations. Cet état contient le résultat global des messages précédemment traités pour la fenêtre actuelle. Votre état peut être d’un maximum de 1 Mo par partition. S’il dépasse cette taille, Lambda met fin précocement à la fenêtre de traitement.

Chaque enregistrement d’un flux appartient à une fenêtre spécifique. La fonction Lambda traitera chaque enregistrement au moins une fois. Toutefois, elle ne garantit pas un seul traitement pour chaque enregistrement. Dans de rares cas, tels que pour la gestion des erreurs, certains enregistrements peuvent être sujet à de multiples traitements. Les dossiers sont toujours traités dans l’ordre dès la première fois. Si les enregistrements sont traités plusieurs fois, ils peuvent être traités dans le désordre.

Regroupement et traitement

Votre fonction gérée par l’utilisateur est invoquée tant pour l’agrégation que pour le traitement des résultats finaux de celle-ci. Lambda regroupe tous les enregistrements reçus dans la fenêtre. Vous pouvez recevoir ces enregistrements en plusieurs lots, chacun sous forme d’invocation séparée. Chaque invocation reçoit un état. Ainsi, lorsque vous utilisez des fenêtres bascules, votre réponse de fonction Lambda doit contenir une propriété state. Si la réponse ne contient pas de propriété state, Lambda considère qu’il s’agit d’une invocation ayant échoué. Pour satisfaire à cette condition, votre fonction peut renvoyer un objet TimeWindowEventResponse ayant la forme JSON suivante :

Exemple TimeWindowEventResponse values
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
Note

Pour les fonctions Java, nous vous recommandons d’utiliser Map<String, String> pour représenter l’état.

À la fin de la fenêtre, l’indicateur isFinalInvokeForWindow est défini sur true pour indiquer qu’il s’agit de l’état final et qu’il est prêt pour le traitement. Après le traitement, la fenêtre et votre invocation final se terminent, puis l’état est supprimé.

À la fin de votre fenêtre, Lambda applique un traitement final pour les actions sur les résultats de l’agrégation. Votre traitement final est invoqué de manière synchrone. Une fois l’invocation réussie, votre fonction contrôle le numéro de séquence et le traitement du flux continue. Si l’invocation échoue, votre fonction Lambda suspend le traitement ultérieur jusqu’à ce que l’invocation soit réussie.

Exemple KinesisTimeWindowEvent
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1607497475.000 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-1", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" } ], "window": { "start": "2020-12-09T07:04:00Z", "end": "2020-12-09T07:06:00Z" }, "state": { "1": 282, "2": 715 }, "shardId": "shardId-000000000006", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

Configuration

Vous pouvez configurer des fenêtres bascule lorsque vous créez ou mettez à jour un mappage de source d’événement. Pour configurer une fenêtre variable, spécifiez la fenêtre en secondes (TumblingWindowInSeconds). L'exemple de commande suivant AWS Command Line Interface (AWS CLI) crée un mappage des sources d'événements de streaming dont la fenêtre de basculement est de 120 secondes. La fonction Lambda définie pour l’agrégation et le traitement est nommée tumbling-window-example-function.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream \ --function-name tumbling-window-example-function \ --starting-position TRIM_HORIZON \ --tumbling-window-in-seconds 120

Lambda détermine les limites des fenêtres bascule en fonction de l’heure à laquelle les enregistrements ont été insérés dans le flux. Tous les enregistrements ont un horodatage approximatif disponible que Lambda utilise pour déterminer des limites.

Les agrégations de fenêtres bascule ne prennent pas en charge le repartitionnement. Lorsqu'une partition se termine, Lambda considère que la fenêtre actuelle est fermée, et toute partition enfant ouvre sa propre fenêtre dans un nouvel état. Lorsqu'aucun nouvel enregistrement n'est ajouté à la fenêtre actuelle, Lambda attend jusqu'à 2 minutes avant de supposer que la fenêtre est terminée. Cela permet de garantir que la fonction lit tous les enregistrements de la fenêtre actuelle, même si les enregistrements sont ajoutés par intermittence.

Les fenêtres bascule prennent complètement en charge les stratégies de nouvelle tentative existantes maxRetryAttempts et maxRecordAge.

Exemple Handler.py – Agrégation et traitement

La fonction Python suivante montre comment regrouper et traiter votre état final :

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1 print('Returning state: ', state) return {'state': state}