Mantieni le migliori pratiche per le applicazioni Managed Service for Apache Flink - Servizio gestito per Apache Flink

Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Mantieni le migliori pratiche per le applicazioni Managed Service for Apache Flink

Questa sezione contiene informazioni e consigli per lo sviluppo di un servizio gestito stabile e performante per le applicazioni Apache Flink.

Riduci al minimo le dimensioni dell'uber JAR

Java/Scala application must be packaged in an uber (super/fat) JAR e include tutte le dipendenze aggiuntive richieste che non sono già fornite dal runtime. Tuttavia, le dimensioni di uber JAR influiscono sui tempi di avvio e riavvio dell'applicazione e possono far sì che venga JAR superato il limite di 512 MB.

Per ottimizzare i tempi di implementazione, il tuo uber non JAR dovrebbe includere quanto segue:

  • Eventuali dipendenze fornite dal runtime, come illustrato nell'esempio seguente. Dovrebbero avere un provided ambito nel POM file o compileOnly nella configurazione di Gradle.

  • Qualsiasi dipendenza utilizzata solo per i test, ad esempio JUnit Mockito. Dovrebbero avere un test ambito nel POM file o testImplementation nella configurazione di Gradle.

  • Eventuali dipendenze non effettivamente utilizzate dall'applicazione.

  • Qualsiasi dato o metadato statico richiesto dall'applicazione. I dati statici devono essere caricati dall'applicazione in fase di esecuzione, ad esempio da un datastore o da Amazon S3.

  • Consulta questo file di POM esempio per i dettagli sulle impostazioni di configurazione precedenti.

Dipendenze fornite

Il runtime Managed Service for Apache Flink fornisce una serie di dipendenze. Queste dipendenze non devono essere incluse nel file fat JAR e devono avere un provided ambito nel POM file o essere escluse esplicitamente nella configurazione. maven-shade-plugin Tutte queste dipendenze incluse nel file fat JAR vengono ignorate in fase di esecuzione, ma aumentano le dimensioni del sovraccarico aggiuntivo durante la JAR distribuzione.

Dipendenze fornite dal runtime, nelle versioni di runtime 1.18, 1.19 e 1.20:

  • org.apache.flink:flink-core

  • org.apache.flink:flink-java

  • org.apache.flink:flink-streaming-java

  • org.apache.flink:flink-scala_2.12

  • org.apache.flink:flink-table-runtime

  • org.apache.flink:flink-table-planner-loader

  • org.apache.flink:flink-json

  • org.apache.flink:flink-connector-base

  • org.apache.flink:flink-connector-files

  • org.apache.flink:flink-clients

  • org.apache.flink:flink-runtime-web

  • org.apache.flink:flink-metrics-code

  • org.apache.flink:flink-table-api-java

  • org.apache.flink:flink-table-api-bridge-base

  • org.apache.flink:flink-table-api-java-bridge

  • org.apache.logging.log4j:log4j-slf4j-impl

  • org.apache.logging.log4j:log4j-api

  • org.apache.logging.log4j:log4j-core

  • org.apache.logging.log4j:log4j-1.2-api

Inoltrecom.amazonaws:aws-kinesisanalytics-runtime:1.2.0, viene fornita anche la libreria utilizzata per recuperare le proprietà di runtime dell'applicazione in Managed Service for Apache Flink.

Tutte le dipendenze fornite dal runtime devono utilizzare i seguenti consigli per non includerle in uber: JAR

  • In Maven (pom.xml) e SBT (build.sbt), usa scope. provided

  • In Gradle (build.gradle), usa la configurazione. compileOnly

Qualsiasi dipendenza fornita accidentalmente inclusa in uber JAR verrà ignorata in fase di esecuzione a causa del caricamento della prima classe di Apache Flink. Per ulteriori informazioni, consulta la documentazione di Apache Flink. parent-first-patterns

Connectors (Connettori)

La maggior parte dei connettori, ad eccezione del FileSystem connettore, che non sono inclusi nel runtime devono essere inclusi nel POM file con l'ambito predefinito ()compile.

Altri consigli

Di norma, l'uber Apache Flink JAR fornito a Managed Service for Apache Flink deve contenere il codice minimo richiesto per eseguire l'applicazione. Le dipendenze che includono le classi di origine, i set di dati di test o lo stato di avvio non devono essere incluse in questo jar. Se è necessario inserire risorse statiche in fase di esecuzione, separa questo problema in una risorsa come Amazon S3. Ne sono un esempio i bootstrap di stato o un modello di inferenza.

Prenditi del tempo per considerare il tuo albero delle dipendenze approfondito e rimuovere le dipendenze non di runtime.

Sebbene Managed Service for Apache Flink supporti file di dimensioni jar da 512 MB, questa dovrebbe essere vista come un'eccezione alla regola. Apache Flink attualmente supporta file di dimensioni jar di ~104 MB tramite la sua configurazione predefinita, e questa dovrebbe essere la dimensione massima di destinazione di un jar necessaria.

Tolleranza agli errori: checkpoint e savepoint

Utilizza i checkpoint e i savepoint per implementare la tolleranza agli errori nell'applicazione Managed Service for Apache Flink. Quando sviluppi e gestisci un'applicazione, tieni presente quanto indicato di seguito:

  • Ti consigliamo di mantenere abilitato il checkpoint per la tua applicazione. La creazione di checkpoint offre tolleranza agli errori per l'applicazione durante la manutenzione programmata, nonché in caso di guasti imprevisti dovuti a problemi di servizio, errori di dipendenza dall'applicazione e altri problemi. Per ulteriori informazioni sulla manutenzione, consulta Gestisci le attività di manutenzione per Managed Service for Apache Flink.

  • Imposta ApplicationSnapshotConfiguration:: SnapshotsEnabled su false durante lo sviluppo o la risoluzione dei problemi dell'applicazione. A ogni arresto dell'applicazione viene creato uno snapshot, il che può causare problemi se l'applicazione non è integra o non è performante. Imposta SnapshotsEnabled su true una volta che l'applicazione è in produzione ed è stabile.

    Nota

    È consigliabile che l'applicazione crei uno snapshot più volte al giorno per riavviarsi correttamente con i dati di stato corretti. La frequenza corretta per l'acquisizione degli snapshot dipende dalla logica di business dell'applicazione. L'acquisizione di snapshot frequenti consente di ripristinare i dati più recenti, ma aumenta i costi e richiede più risorse di sistema.

    Per ulteriori informazioni sul monitoraggio dei tempi di inattività delle applicazioni, consulta .

Per ulteriori informazioni sull'implementazione della tolleranza di errore, consulta Implementa la tolleranza agli.

Versioni di connettori non supportate

A partire dalla versione 1.15 o successiva di Apache Flink, Managed Service for Apache Flink impedisce automaticamente l'avvio o l'aggiornamento delle applicazioni se utilizzano versioni del connettore Kinesis non supportate incluse nell'applicazione. JARs Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.15 o successiva, assicurati di utilizzare il connettore Kinesis più recente. Si tratta di qualsiasi versione uguale o successiva alla versione 1.15.2. Tutte le altre versioni non sono supportate da Managed Service for Apache Flink perché potrebbero causare problemi di coerenza o guasti con la funzione Stop with Savepoint, che impedisce le operazioni di clean stop/update. Per ulteriori informazioni sulla compatibilità dei connettori nelle versioni di Amazon Managed Service per Apache Flink, consulta Connettori Apache Flink.

Prestazioni e parallelismo

L'applicazione può essere dimensionata per soddisfare qualsiasi throughput ottimizzando il parallelismo delle applicazioni ed evitando problemi di prestazioni. Quando sviluppi e gestisci un'applicazione, tieni presente quanto indicato di seguito:

  • Verifica che tutte le origini e i sink dell'applicazione siano sufficientemente forniti e che non subiscano limitazioni della larghezza della banda della rete. Se le sorgenti e i sink sono altri AWS servizi, monitora l'utilizzo di tali servizi. CloudWatch

  • Per le applicazioni con un parallelismo molto elevato, controlla se gli alti livelli di parallelismo vengono applicati a tutti gli operatori dell'applicazione. Per impostazione predefinita, Apache Flink applica lo stesso parallelismo dell'applicazione per tutti gli operatori nel grafico dell'applicazione. Ciò può comportare problemi di approvvigionamento su origini o sink o rallentamenti nell'elaborazione dei dati degli operatori. È possibile modificare il parallelismo di ogni operatore in codice con. setParallelism

  • Comprendi il significato delle impostazioni di parallelismo per gli operatori della tua applicazione. Se modifichi il parallelismo di un operatore, potresti non essere in grado di ripristinare l'applicazione da uno snapshot creato quando l'operatore aveva un parallelismo incompatibile con le impostazioni correnti. Per ulteriori informazioni sull'impostazione del parallelismo degli operatori, consulta Impostazione esplicita del parallelismo massimo per gli operatori.

Per ulteriori informazioni sul dimensionamento semplice, consulta Implementa la scalabilità delle applicazioni.

Impostazione del parallelismo per operatore

Per impostazione predefinita, tutti gli operatori hanno il parallelismo impostato a livello di applicazione. È possibile sovrascrivere il parallelismo di un singolo operatore utilizzando. DataStream API .setParallelism(x) È possibile impostare il parallelismo dell'operatore su qualsiasi parallelismo uguale o inferiore al parallelismo dell'applicazione.

Se possibile, definisci il parallelismo dell'operatore in funzione del parallelismo dell'applicazione. In questo modo, il parallelismo dell'operatore varierà con il parallelismo dell'applicazione. Se utilizzi il dimensionamento automatico, ad esempio, tutti gli operatori varieranno il loro parallelismo nella stessa proporzione:

int appParallelism = env.getParallelism(); ... ...ops.setParalleism(appParallelism/2);

In alcuni casi, potrebbe essere preferibile impostare il parallelismo degli operatori su una costante. Ad esempio, impostando il parallelismo di un'origine di flusso Kinesis sul numero di partizioni. In questi casi, dovresti considerare di passare il parallelismo dell'operatore come parametro di configurazione dell'applicazione, in modo da modificarlo senza modificare il codice, se hai bisogno, ad esempio, di eseguire il resharding del flusso di origine.

Registrazione

È possibile monitorare le prestazioni e le condizioni di errore dell'applicazione utilizzando Logs. CloudWatch Quando configuri la registrazione per applicazioni specifiche, tieni presente quanto indicato di seguito:

  • Abilita CloudWatch la registrazione per l'applicazione in modo da poter eseguire il debug di eventuali problemi di runtime.

  • Non creare una voce di log per ogni record elaborato nell'applicazione. Ciò causa gravi rallentamenti durante l'elaborazione e potrebbe portare a una contropressione nell'elaborazione dei dati.

  • Crea CloudWatch allarmi per avvisarti quando l'applicazione non funziona correttamente. Per ulteriori informazioni, consulta

Per ulteriori informazioni sull'implementazione della registrazione, consulta .

Codifica

È possibile rendere l'applicazione performante e stabile utilizzando le pratiche di programmazione consigliate. Quando scrivi il codice di applicazione, tieni presente quanto segue:

  • Non utilizzare system.exit() nel codice dell'applicazione, né nel metodo main dell'applicazione né nelle funzioni definite dall'utente. Se desideri chiudere l'applicazione dall'interno del codice, lancia un'eccezione derivata da Exception o RuntimeException contenente un messaggio su cosa è andato storto con l'applicazione.

    Tieni presente quanto segue su come il servizio gestisce questa eccezione:

    • Se l'eccezione viene generata dal metodo main della tua applicazione, il servizio la inserirà in una ProgramInvocationException quando l'applicazione passerà allo stato RUNNING e il job manager non riuscirà a inviare il processo.

    • Se l'eccezione viene generata da una funzione definita dall'utente, il job manager interromperà il processo e lo riavvierà, e i dettagli dell'eccezione verranno scritti nel log delle eccezioni.

  • Prendi in considerazione l'idea di ombreggiare il JAR file dell'applicazione e le relative dipendenze incluse. L'ombreggiatura è consigliata in caso di potenziali conflitti nei nomi dei pacchetti tra l'applicazione e il runtime di Apache Flink. Se si verifica un conflitto, i log dell'applicazione possono contenere un'eccezione di tipo java.util.concurrent.ExecutionException. Per ulteriori informazioni sull'ombreggiatura del JAR file dell'applicazione, consulta Apache Maven Shade Plugin.

Credenziali root.

Non dovresti inserire credenziali a lungo termine nelle applicazioni di produzione (o in qualsiasi altra). Le credenziali a lungo termine vengono probabilmente archiviate in un sistema di controllo delle versioni e possono facilmente perdersi. È invece possibile associare un ruolo all'applicazione del servizio gestito per Apache Flink e concedere i privilegi a tale ruolo. L'applicazione Flink in esecuzione può quindi raccogliere credenziali temporanee con i rispettivi privilegi dall'ambiente. Nel caso in cui sia necessaria l'autenticazione per un servizio che non è integrato in modo nativoIAM, ad esempio, con un database che richiede un nome utente e una password per l'autenticazione, è consigliabile archiviare i AWS segreti in Secrets Manager.

Molti servizi AWS nativi supportano l'autenticazione:

Lettura da fonti con pochi shard/partizioni

Durante la lettura da Apache Kafka o da un flusso di dati Kinesis, potrebbe esserci una discrepanza tra il parallelismo del flusso (ad esempio, il numero di partizioni per Kafka e il numero di partizioni per Kinesis) e il parallelismo dell'applicazione. Con un design ingenuo, il parallelismo di un'applicazione non può superare il parallelismo di un flusso: ogni sottoattività di un operatore di origine può leggere solo da 1 o più shard/partizioni. Ciò significa che per un flusso con sole 2 partizioni e un'applicazione con un parallelismo di 8, solo due sottoattività consumano effettivamente il flusso e 6 sottoattività rimangono inattive. Ciò può limitare notevolmente il throughput dell'applicazione, in particolare se la deserializzazione è costosa e viene eseguita dall'origine (impostazione predefinita).

Per mitigare questo effetto, puoi dimensionare il flusso. Tuttavia, ciò potrebbe non essere sempre auspicabile o possibile. In alternativa, è possibile ristrutturare il codice sorgente in modo che non esegua alcuna serializzazione e trasmetta semplicemente il byte[]. È quindi possibile ribilanciare i dati per distribuirli uniformemente tra tutte le attività e quindi deserializzare i dati in quell'area. In questo modo, è possibile sfruttare tutte le sottoattività per la deserializzazione e questa operazione potenzialmente costosa non è più vincolata dal numero di shard/partizioni del flusso.

Intervallo di aggiornamento del notebook Studio

Se modifichi l'intervallo di aggiornamento dei risultati del paragrafo, impostalo su un valore pari almeno a 1000 millisecondi.

Prestazioni ottimali del notebook Studio

Abbiamo eseguito il test utilizzando la seguente dichiarazione e abbiamo ottenuto le migliori prestazioni con events-per-second moltiplicati per number-of-keys davano un risultato inferiore a 25.000.000. Questo era per events-per-second inferiore a 150.000.

SELECT key, sum(value) FROM key-values GROUP BY key

Come le strategie di watermark e le partizioni inattive influiscono sulle finestre temporali

Durante la lettura di eventi da Apache Kafka e dal flusso di dati Kinesis, l'origine può impostare l'ora dell'evento in base agli attributi del flusso. Nel caso di Kinesis, l'ora dell'evento è uguale all'ora approssimativa di arrivo degli eventi. Tuttavia, impostare l'ora dell'evento all'origine degli eventi non è sufficiente per consentire a un'applicazione Flink di utilizzare tale orario. L'origine deve inoltre generare watermark che diffondano le informazioni sull'ora dell'evento dall'origine a tutti gli altri operatori. La documentazione di Flink offre una buona panoramica di come funziona questo processo.

Per impostazione predefinita, il timestamp di un evento letto da Kinesis è impostato sull'ora di arrivo approssimativa determinata da Kinesis. Un ulteriore prerequisito affinché l'ora dell'evento funzioni nell'applicazione è una strategia di watermark.

WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(...));

La strategia di watermark viene quindi applicata a un DataStream con il metodo assignTimestampsAndWatermarks. Esistono alcune utili strategie integrate:

  • forMonotonousTimestamps() utilizzerà semplicemente l'ora dell'evento (ora di arrivo approssimativa) e genererà periodicamente il valore massimo come watermark (per ogni sottoattività specifica)

  • forBoundedOutOfOrderness(Duration.ofSeconds(...)) simile alla strategia precedente, ma utilizzerà l'ora e la durata dell'evento per la generazione del watermark.

Funziona, ma ci sono un paio di avvertenze da tenere a mente. I watermark vengono generati a livello di sottoattività e scorrono nel grafico dell'operatore.

Dalla documentazione di Flink:

Ogni sottoattività parallela di una funzione di origine di solito genera le proprie filigrane in modo indipendente. Questi watermark definiscono l'ora dell'evento in quella particolare origine parallela.

Man mano che i watermark scorrono attraverso il programma di streaming, anticipano l'ora dell'evento presso gli operatori a cui arrivano. Ogni volta che un operatore anticipa l'orario dell'evento, genera un nuovo watermark a valle per gli operatori successivi.

Alcuni operatori utilizzano più flussi di input, ad esempio un'unione o operatori che seguono una funzione keyBy (...) o di partizione (...). L'ora corrente degli eventi di un operatore di questo tipo è la durata minima degli eventi dei suoi flussi di input. Man mano che i flussi di input aggiornano gli orari degli eventi, così fa anche l'operatore.

Ciò significa che se un'attività secondaria di origine utilizza una partizione inattiva, gli operatori a valle non ricevono nuovi watermark da quella sottoattività e quindi l'elaborazione si blocca per tutti gli operatori a valle che utilizzano finestre temporali. Per evitare ciò, i clienti possono aggiungere l'opzione withIdleness alla strategia watermark. Con questa opzione, un operatore esclude i watermark dalle sottoattività inattive durante il calcolo dell'orario dell'evento da parte dell'operatore. Le sottoattività inattive quindi non bloccano più l'avanzamento dell'orario degli eventi negli operatori a valle.

Tuttavia, l'opzione di inattività con le strategie di watermark integrate non farà avanzare la durata dell'evento se nessuna sottoattività legge alcun evento, ovvero se non ci sono eventi nel flusso. Ciò diventa particolarmente visibile nei casi di test in cui un insieme finito di eventi viene letto dal flusso. Poiché l'ora dell'evento non avanza dopo la lettura dell'ultimo evento, l'ultima finestra (contenente l'ultimo evento) non si chiuderà mai.

Riepilogo

  • l'impostazione withIdleness non genererà nuovi watermark nel caso in cui una partizione sia inattiva, ma escluderà solo l'ultimo watermark inviato dalle sottoattività inattive dal calcolo del watermark minimo negli operatori a valle

  • con le strategie integrate di watermark, l'ultima finestra aperta non si chiuderà mai (a meno che non vengano inviati nuovi eventi che fanno avanzare il watermark, ma ciò crea una nuova finestra che poi rimane aperta)

  • anche quando l'ora è impostata dal flusso Kinesis, possono comunque verificarsi eventi in ritardo se una partizione viene consumata più velocemente di altre (ad esempio, durante l'inizializzazione dell'app o quando si utilizza TRIM_HORIZON in cui tutte le partizioni esistenti vengono consumate in parallelo, ignorando la relazione padre/figlio)

  • le impostazioni withIdleness della strategia watermark sembrano deprecare le impostazioni specifiche dell'origine Kinesis per le partizioni inattive (ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS

Esempio

La seguente applicazione legge da un flusso e crea finestre di sessione in base all'ora dell'evento.

Properties consumerConfig = new Properties(); consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("...", new SimpleStringSchema(), consumerConfig); WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(15)); env.addSource(consumer) .assignTimestampsAndWatermarks(s) .map(new MapFunction<String, Long>() { @Override public Long map(String s) throws Exception { return Long.parseLong(s); } }) .keyBy(l -> 0l) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .process(new ProcessWindowFunction<Long, Object, Long, TimeWindow>() { @Override public void process(Long aLong, ProcessWindowFunction<Long, Object, Long, TimeWindow>.Context context, Iterable<Long>iterable, Collector<Object> collector) throws Exception { long count = StreamSupport.stream(iterable.spliterator(), false).count(); long timestamp = context.currentWatermark(); System.out.print("XXXXXXXXXXXXXX Window with " + count + " events"); System.out.println("; Watermark: " + timestamp + ", " + Instant.ofEpochMilli(timestamp)); for (Long l : iterable) { System.out.println(l); } } });

Nell'esempio seguente, 8 eventi vengono scritti in un flusso di 16 partizioni (i primi 2 e l'ultimo evento finiscono nella stessa partizione).

$ aws kinesis put-record --stream-name hp-16 --partition-key 1 --data MQ== $ aws kinesis put-record --stream-name hp-16 --partition-key 2 --data Mg== $ aws kinesis put-record --stream-name hp-16 --partition-key 3 --data Mw== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028721934184977530127978070210" } { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028795678659974022576354623682" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275134360684221592378842022114" } Wed Mar 23 11:19:57 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 4 --data NA== $ aws kinesis put-record --stream-name hp-16 --partition-key 5 --data NQ== $ date { "ShardId": "shardId-000000000010", "SequenceNumber": "49627894338570054070103749783042116732419934393936642210" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275659034489934342334017700066" } Wed Mar 23 11:20:10 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 6 --data Ng== $ date { "ShardId": "shardId-000000000001", "SequenceNumber": "49627894338369347363316974173886988345467035365375213586" } Wed Mar 23 11:20:22 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 7 --data Nw== $ date { "ShardId": "shardId-000000000008", "SequenceNumber": "49627894338525452579706688535878947299195189349725503618" } Wed Mar 23 11:20:34 CET 2022 $ sleep 60 $ aws kinesis put-record --stream-name hp-16 --partition-key 8 --data OA== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811029600823255837371928900796610" } Wed Mar 23 11:21:27 CET 2022

Questo input dovrebbe generare 5 finestre di sessione: evento 1,2,3; evento 4,5; evento 6; evento 7; evento 8. Tuttavia, il programma produce solo le prime 4 finestre.

11:59:21,529 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 5 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 5 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,531 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 4 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 4 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:23,209 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,244 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 event: 6; timestamp: 1648030822428, 2022-03-23T10:20:22.428Z 11:59:23,377 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,405 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,581 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,586 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:24,790 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 4; timestamp: 1648030809282, 2022-03-23T10:20:09.282Z event: 3; timestamp: 1648030797697, 2022-03-23T10:19:57.697Z event: 5; timestamp: 1648030810871, 2022-03-23T10:20:10.871Z 11:59:24,907 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 7; timestamp: 1648030834105, 2022-03-23T10:20:34.105Z event: 1; timestamp: 1648030794441, 2022-03-23T10:19:54.441Z event: 2; timestamp: 1648030796122, 2022-03-23T10:19:56.122Z event: 8; timestamp: 1648030887171, 2022-03-23T10:21:27.171Z XXXXXXXXXXXXXX Window with 3 events; Watermark: 1648030809281, 2022-03-23T10:20:09.281Z 3 1 2 XXXXXXXXXXXXXX Window with 2 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 4 5 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 6 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030887170, 2022-03-23T10:21:27.170Z 7

L'output mostra solo 4 finestre (manca l'ultima finestra contenente l'evento 8). Ciò è dovuto all'ora dell'evento e alla strategia di watermark. L'ultima finestra non può chiudersi perché, con le strategie di watermark integrate, il tempo non avanza mai oltre l'ora dell'ultimo evento letto dal flusso. Ma perché la finestra si chiuda, il tempo deve avanzare di oltre 10 secondi dopo l'ultimo evento. In questo caso l'ultimo watermark è 2022-03-23T10:21:27.170Z ma per chiudere la finestra della sessione è necessario un watermark di 10 secondi e 1 millisecondo dopo.

Se l'opzione withIdleness viene rimossa dalla strategia di watermark, nessuna finestra di sessione si chiuderà mai, perché il “watermark globale” dell'operatore finestra non può avanzare.

Tieni presente che all'avvio dell'applicazione Flink (o in caso di distorsione dei dati), alcune partizioni potrebbero essere consumate più velocemente di altre. Ciò può far sì che alcuni watermark vengano generati troppo presto da una sottoattività (la sottoattività può generare il watermark in base al contenuto di una partizione senza aver consumato le altre partizioni a cui è abbonata). I modi per mitigare la situazione sono le diverse strategie di watermarking, che aggiungono un buffer di sicurezza (forBoundedOutOfOrderness(Duration.ofSeconds(30)) o consentono esplicitamente l'arrivo di eventi tardivi (allowedLateness(Time.minutes(5)).

Imposta a UUID per tutti gli operatori

Quando Managed Service for Apache Flink avvia un processo Flink per un'applicazione con uno snapshot, il processo Flink può non avviarsi a causa di determinati problemi. Uno di questi è la mancata corrispondenza degli ID dell'operatore. Flink si aspetta un operatore esplicito e coerente IDs per gli operatori del Job Graph di Flink. Se non è impostato in modo esplicito, Flink genera automaticamente un ID per gli operatori. Questo perché Flink utilizza questi operatori per identificare in modo univoco IDs gli operatori in un grafico del lavoro e li utilizza per memorizzare lo stato di ciascun operatore in un punto di salvataggio.

Il problema della mancata corrispondenza dell'ID dell'operatore si verifica quando Flink non trova una mappatura 1:1 tra l'operatore di un grafico IDs del lavoro e l'operatore definito in un savepoint. IDs Ciò accade quando non IDs sono impostati operatori coerenti espliciti e Flink genera automaticamente un operatore IDs che potrebbe non essere coerente con ogni creazione di grafico di lavoro. La probabilità che le applicazioni riscontrino questo problema è elevata durante gli interventi di manutenzione. Per evitare ciò, consigliamo ai clienti di impostare il codice UUID flink per tutti gli operatori. Per ulteriori informazioni, consulta l'argomento Set a UUID per tutti gli operatori in Production Readiness.

Aggiungi ServiceResourceTransformer al plugin Maven shade

Flink utilizza Java Service Provider Interfaces (SPI) per caricare componenti come connettori e formati. L'utilizzo di più dipendenze Flink SPI può causare conflitti nell'uber-jar e comportamenti imprevisti delle applicazioni. Si consiglia di aggiungere il plugin Maven shade, definito nel ServiceResourceTransformerfile pom.xml

<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <id>shade</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- ... --> </transformers> </configuration> </execution> </executions> </plugin>