Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Utilisation des opérations de streaming dans AWS Glue sessions interactives
Changement de type de séance de streaming
Utilisez la commande AWS Glue magie de configuration des sessions interactives%streaming
, pour définir le travail que vous exécutez et initialiser une session interactive de streaming.
Échantillonnage du flux d'entrée pour un développement interactif
Un outil que nous avons développé pour améliorer l'expérience interactive dans AWS Glue les sessions interactives sont l'ajout d'une nouvelle méthode ci-dessous GlueContext
pour obtenir un instantané d'un flux dans une statique DynamicFrame. GlueContext
vous permet d'inspecter, d'interagir et de mettre en œuvre votre flux de travail.
Avec l’instance de classe GlueContext
, vous serez en mesure de localiser la méthode getSampleStreamingDynamicFrame
. Les arguments requis pour cette méthode sont les suivants :
-
dataFrame
: The Spark Streaming DataFrame -
options
: voir les options disponibles ci-dessous
Les options disponibles sont les suivantes :
-
windowSize : ceci est également appelé Durée du micro-lot. Ce paramètre déterminera la durée d'attente d'une requête en streaming après le déclenchement du lot précédent. La valeur de ce paramètre doit être inférieure à
pollingTimeInMs
. -
pollingTimeInMme : La durée totale pendant laquelle la méthode sera exécutée. Il déclenche au moins un micro-lot pour obtenir des échantillons de registre à partir du flux d'entrée.
-
recordPollingLimit: Ce paramètre vous permet de limiter le nombre total d'enregistrements que vous allez interroger à partir du flux.
-
(Facultatif) Vous pouvez également utiliser
writeStreamFunction
pour appliquer cette fonction personnalisée à chaque fonction d'échantillonnage de registre. Voir ci-dessous des exemples dans Scala et Python.
Note
Lorsque le paramètre DynFrame
échantillonné est vide, cela peut être causé par certaines des raisons suivantes :
-
La source Streaming est définie sur « Dernier » et aucune nouvelle donnée n'a été ingérée pendant la période d'échantillonnage.
-
Le temps d'interrogation n'est pas suffisant pour traiter les registres qu'il a ingérés. Les données ne s'afficheront pas à moins que l'ensemble du lot n'ait été traité.
Exécution d'applications de streaming dans des séances interactives
Entrée AWS Glue sessions interactives, vous pouvez exécuter un AWS Glue application de streaming, comme la façon dont vous créeriez une application de streaming dans AWS Glue Console. Étant donné que les séances interactives sont basées sur une séance, la présence d'exceptions dans le moteur d'exécution ne provoque pas l'arrêt de la séance. Nous avons maintenant l'avantage supplémentaire de développer votre fonction par lots de manière itérative. Par exemple :
def batch_function(data_frame, batch_id): log.info(data_frame.count()) invalid_method_call() glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})
Dans l'exemple ci-dessus, nous avons inclus une utilisation non valide d'une méthode et, contrairement à la méthode normale AWS Glue les tâches qui quitteront l'ensemble de l'application, le contexte de codage et les définitions de l'utilisateur sont entièrement préservés et la session est toujours opérationnelle. Il n'est pas nécessaire d'amorcer un nouveau cluster et de ré-exécuter toute la transformation précédente. Cela vous autorise à vous concentrer sur l'itération rapide de vos implémentations de fonctions par lots pour obtenir des résultats souhaitables.
Il est important de noter que la séance interactive évalue chaque instruction de manière bloquante afin que la séance n'exécute qu'une seule instruction à la fois. Étant donné que les requêtes de streaming sont continues et ne se terminent jamais, les séances avec des requêtes de streaming actives ne seront pas en mesure de gérer les instructions de suivi à moins qu'elles ne soient interrompues. Vous pouvez émettre la commande d'interruption directement à partir de Bloc-notes Jupyter et notre noyau s'occupera de l'annulation pour vous.
Prenons l'exemple de la séquence suivante d'instructions qui attendent d'être exécutées :
Statement 1: val number = df.count() #Spark Action with deterministic result Result: 5 Statement 2: streamingQuery.start().awaitTermination() #Spark Streaming Query that will be executing continously Result: Constantly updated with each microbatch Statement 3: val number2 = df.count() #This will not be executed as previous statement will be running indefinitely