Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Commutazione del tipo di sessione streaming
Utilizzo di AWS Glue configurazione magic sessioni interattive%streaming
, per definire il processo che si sta eseguendo e inizializzare una sessione interattiva in streaming.
Flusso di input di campionamento per lo sviluppo interattivo
Uno strumento che abbiamo creato per contribuire a migliorare l'esperienza interattiva in AWS Glue le sessioni interattive sono l'aggiunta di un nuovo metodo GlueContext
per ottenere un'istantanea di uno stream in modo statico DynamicFrame. GlueContext
consente di ispezionare, interagire e implementare il flusso di lavoro.
Con l'istanza di classe GlueContext
, sarai in grado di localizzare il metodo getSampleStreamingDynamicFrame
. Gli argomenti richiesti per questo metodo sono:
-
dataFrame
: Lo Spark Streaming DataFrame -
options
: vedi le opzioni disponibili di seguito
Le opzioni disponibili includono:
-
windowSize: viene chiamato anche durata del microbatch. Questo parametro determinerà la durata di attesa di una query di streaming dopo l'attivazione del batch precedente. Il valore del parametro deve essere inferiore a
pollingTimeInMs
. -
pollingTimeInMs: tempo totale di esecuzione del metodo. Esso spedirà almeno un micro batch per ottenere registri campione dal flusso di input.
-
recordPollingLimit: questo parametro ti aiuta a limitare il numero totale di registri per cui esegui il polling dallo stream.
-
(Facoltativo) È possibile utilizzare anche
writeStreamFunction
per applicare questa funzione personalizzata a ogni funzione di campionamento del registro. Vedi di seguito alcuni esempi in Scala e Python.
-
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here} val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}""" val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction) dynFrame.show()
Nota
Se il DynFrame
d'esempio è vuoto, le ragioni possono essere varie:
-
La fonte di streaming è impostata su "Più recente" e non sono stati inseriti nuovi dati durante il periodo di campionamento.
-
Il tempo del polling non è sufficiente per elaborare i registri importati. I dati non verranno visualizzati a meno che l'intero batch non sia stato elaborato.
Esecuzione di applicazioni di streaming in sessioni interattive
In AWS Glue sessioni interattive, è possibile eseguire un AWS Glue applicazione di streaming simile a come si creerebbe un'applicazione di streaming in AWS Glue console. Poiché le sessioni interattive sono basate su sessione, l'individuazione di eccezioni nel runtime non provoca l'interruzione della sessione. Ora abbiamo l'ulteriore vantaggio di sviluppare iterativamente la funzione batch. Ad esempio:
def batch_function(data_frame, batch_id):
log.info(data_frame.count())
invalid_method_call()
glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})
Nell'esempio precedente, abbiamo incluso un utilizzo non valido di un metodo e a differenza del normale AWS Glue i processi che usciranno dall'intera applicazione, il contesto di codifica e le definizioni dell'utente sono completamente conservati e la sessione è ancora operativa. Non è necessario avviare un nuovo cluster e rieseguire tutte le trasformazioni precedenti. Ciò consente di concentrarsi sull'iterazione rapida delle implementazioni delle funzioni batch per ottenere risultati desiderati.
È importante notare che le sessioni interattive valutano ogni istruzione in modo bloccante per far sì che la sessione esegua una sola istruzione alla volta. Poiché le query di streaming sono continue e senza fine, le sessioni con query di streaming attive non saranno in grado di gestire alcuna istruzione di follow-up a meno che non vengano interrotte. Puoi emettere il comando di interruzione direttamente da Jupyter Notebook e il nostro kernel gestirà la cancellazione per te.
Prendi come esempio la seguente sequenza di istruzioni in attesa dell'esecuzione:
Statement 1:
val number = df.count()
#Spark Action with deterministic result
Result: 5
Statement 2:
streamingQuery.start().awaitTermination()
#Spark Streaming Query that will be executing continously
Result: Constantly updated with each microbatch
Statement 3:
val number2 = df.count()
#This will not be executed as previous statement will be running indefinitely