Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.
Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Behalten Sie die bewährten Methoden für Managed Service für Apache Flink-Anwendungen bei
Dieser Abschnitt enthält Informationen und Empfehlungen für die Entwicklung eines stabilen, performanten Managed Service für Apache Flink-Anwendungen.
Themen
- Minimiere die Größe des Ubers JAR
- Fehlertoleranz: Prüfpunkte und Savepoints
- Nicht unterstützte Konnektor-Versionen
- Leistung und Parallelität
- Parallelität pro Operator festlegen
- Protokollierung
- Codierung
- Verwalten von Anmeldeinformationen.
- Lesen aus Quellen mit wenigen Shards/Partitionen
- Aktualisierungsintervall für Studio-Notebooks
- Optimale Leistung des Studio-Notebooks
- Wie sich Strategien mit Wasserzeichen und ungenutzte Shards auf Zeitfenster auswirken
- Stellen Sie a UUID für alle Operatoren ein
- ServiceResourceTransformer Zum Maven Shade-Plugin hinzufügen
Minimiere die Größe des Ubers JAR
Java/Scala application must be packaged in an uber (super/fat) JAR und schließen Sie alle zusätzlichen erforderlichen Abhängigkeiten ein, die nicht bereits von der Laufzeit bereitgestellt werden. Die Größe des Ubers JAR wirkt sich jedoch auf die Start- und Neustartzeiten der Anwendung aus und kann dazu führenJAR, dass das Limit von 512 MB überschritten wird.
Um die Bereitstellungszeit zu optimieren, JAR sollte dein Uber Folgendes nicht enthalten:
-
Alle Abhängigkeiten, die von der Laufzeit bereitgestellt werden, wie im folgenden Beispiel dargestellt. Sie sollten einen
provided
Geltungsbereich in der POM Datei odercompileOnly
in Ihrer Gradle-Konfiguration haben. -
Alle Abhängigkeiten, die nur zum Testen verwendet werden, zum Beispiel JUnit oder Mockito. Sie sollten einen
test
Geltungsbereich in der POM Datei odertestImplementation
in Ihrer Gradle-Konfiguration haben. -
Alle Abhängigkeiten, die von Ihrer Anwendung nicht tatsächlich verwendet werden.
-
Alle statischen Daten oder Metadaten, die für Ihre Anwendung erforderlich sind. Statische Daten sollten von der Anwendung zur Laufzeit geladen werden, beispielsweise aus einem Datenspeicher oder aus Amazon S3.
-
In dieser POMBeispieldatei
finden Sie Einzelheiten zu den vorherigen Konfigurationseinstellungen.
Bereitgestellte Abhängigkeiten
Die Laufzeit von Managed Service for Apache Flink bietet eine Reihe von Abhängigkeiten. Diese Abhängigkeiten sollten nicht im FAT enthalten sein JAR und müssen einen provided
Gültigkeitsbereich in der POM Datei haben oder in der maven-shade-plugin
Konfiguration explizit ausgeschlossen werden. Jede dieser im FAT enthaltenen Abhängigkeiten JAR wird zur Laufzeit ignoriert, erhöht jedoch den JAR zusätzlichen Overhead während der Bereitstellung.
Von der Runtime bereitgestellte Abhängigkeiten in den Runtime-Versionen 1.18, 1.19 und 1.20:
-
org.apache.flink:flink-core
-
org.apache.flink:flink-java
-
org.apache.flink:flink-streaming-java
-
org.apache.flink:flink-scala_2.12
-
org.apache.flink:flink-table-runtime
-
org.apache.flink:flink-table-planner-loader
-
org.apache.flink:flink-json
-
org.apache.flink:flink-connector-base
-
org.apache.flink:flink-connector-files
-
org.apache.flink:flink-clients
-
org.apache.flink:flink-runtime-web
-
org.apache.flink:flink-metrics-code
-
org.apache.flink:flink-table-api-java
-
org.apache.flink:flink-table-api-bridge-base
-
org.apache.flink:flink-table-api-java-bridge
-
org.apache.logging.log4j:log4j-slf4j-impl
-
org.apache.logging.log4j:log4j-api
-
org.apache.logging.log4j:log4j-core
-
org.apache.logging.log4j:log4j-1.2-api
Darüber hinaus wird die Bibliothek bereitgestelltcom.amazonaws:aws-kinesisanalytics-runtime:1.2.0
, die zum Abrufen der Laufzeiteigenschaften von Anwendungen in Managed Service for Apache Flink verwendet wird.
Alle von der Runtime bereitgestellten Abhängigkeiten müssen die folgenden Empfehlungen befolgen, um sie nicht in den Uber aufzunehmen: JAR
-
Verwenden Sie in Maven (
pom.xml
) und SBT (build.sbt
)provided
scope. -
Verwenden
compileOnly
Sie in Gradle (build.gradle
) die Konfiguration.
Jede angegebene Abhängigkeit, die versehentlich im Uber enthalten ist, JAR wird zur Laufzeit ignoriert, da Apache Flink das Parent-First-Classloading verwendet. Weitere Informationen finden Sie parent-first-patterns
Konnektoren
Die meisten Konnektoren, mit Ausnahme des FileSystem Konnektors, die nicht in der Runtime enthalten sind, müssen in der POM Datei mit dem Standardbereich (compile
) enthalten sein.
Andere Empfehlungen
In der Regel sollte Ihr Apache Flink-Uber, der Managed Service für Apache Flink zur JAR Verfügung gestellt wird, den Mindestcode enthalten, der für die Ausführung der Anwendung erforderlich ist. Einschließlich Abhängigkeiten, die die Quellklassen, Testdatensätze oder den Bootstrapping-Status beinhalten, sollten nicht in dieser JAR-Datei enthalten sein. Wenn statische Ressourcen zur Laufzeit abgerufen werden müssen, trennen Sie dieses Problem in eine Ressource wie Amazon S3. Beispiele hierfür sind State Bootstraps oder ein Inferenzmodell.
Nehmen Sie sich etwas Zeit, um Ihren umfassenden Abhängigkeitsbaum zu betrachten und Abhängigkeiten zu entfernen, die nicht zur Laufzeit gehören.
Managed Service for Apache Flink unterstützt zwar Jar-Größen von 512 MB, dies sollte jedoch als Ausnahme von der Regel angesehen werden. Apache Flink unterstützt derzeit in seiner Standardkonfiguration Jar-Größen von ~104 MB, und das sollte die maximale Zielgröße für ein Jar sein, das benötigt wird.
Fehlertoleranz: Prüfpunkte und Savepoints
Verwenden Sie Checkpoints und Savepoints, um Fehlertoleranz in Ihrer Managed Service for Apache Flink-Anwendung zu implementieren. Berücksichtigen Sie bei Entwicklung und Wartung Ihrer Anwendung Folgendes:
Wir empfehlen, dass Sie Checkpointing für Ihre Anwendung aktiviert lassen. Prüfpunktprüfung bietet Fehlertoleranz für Ihre Anwendung bei geplanten Wartungsarbeiten sowie bei unerwarteten Ausfällen aufgrund von Serviceproblemen, Fehlern bei der Anwendungsabhängigkeit und anderen Problemen. Weitere Informationen zur geplanten Wartung finden Sie unter Wartungsaufgaben für Managed Service für Apache Flink verwalten.
Stellen Sie ApplicationSnapshotConfiguration:: SnapshotsEnabled
false
während der Anwendungsentwicklung oder Fehlerbehebung auf: ein. Bei jedem Anwendungsstopp wird ein Snapshot erstellt. Dies kann zu Problemen führen, wenn sich die Anwendung in einem fehlerhaften Zustand befindet oder nicht leistungsfähig ist. Setzen SieSnapshotsEnabled
auftrue
, wenn die Anwendung in Produktion und stabil ist.Anmerkung
Wir empfehlen, dass Ihre Anwendung mehrmals täglich einen Snapshot erstellt, um einen ordnungsgemäßen Neustart mit den korrekten Statusdaten zu gewährleisten. Die richtige Häufigkeit für Ihre Snapshots hängt von der Geschäftslogik Ihrer Anwendung ab. Durch häufiges Erstellen von Snapshots können Sie neuere Daten wiederherstellen. Dies erhöht jedoch die Kosten und erfordert mehr Systemressourcen.
Informationen zur Überwachung von Anwendungsausfällen finden Sie unter .
Weitere Informationen zur Implementierung der Fehlertoleranz finden Sie unter Implementieren Sie Fehlertoleranz.
Nicht unterstützte Konnektor-Versionen
Ab Apache Flink Version 1.15 oder höher verhindert Managed Service for Apache Flink automatisch, dass Anwendungen gestartet oder aktualisiert werden, wenn sie nicht unterstützte Kinesis Connector-Versionen verwenden, die in der Anwendung gebündelt sind. JARs Stellen Sie beim Upgrade auf Managed Service for Apache Flink Version 1.15 oder höher sicher, dass Sie den neuesten Kinesis-Connector verwenden. Dies ist jede Version, die Version 1.15.2 entspricht oder neuer ist. Alle anderen Versionen werden von Managed Service für Apache Flink nicht unterstützt, da sie zu Konsistenzproblemen oder Ausfällen bei der Funktion „Mit Savepoint beenden“ führen können, wodurch saubere Stop-/Aktualisierungsvorgänge verhindert werden. Weitere Informationen zur Connector-Kompatibilität in Amazon Managed Service für Apache Flink-Versionen finden Sie unter Apache Flink-Konnektoren.
Leistung und Parallelität
Ihre Anwendung kann so skaliert werden, dass sie jedes Durchsatzniveau erreicht, indem Sie die Parallelität Ihrer Anwendung optimieren und Leistungsprobleme vermeiden. Berücksichtigen Sie bei Entwicklung und Wartung Ihrer Anwendung Folgendes:
Stellen Sie sicher, dass alle Ihre Anwendungsquellen und -senken ausreichend bereitgestellt sind und nicht gedrosselt werden. Wenn es sich bei den Quellen und Senken um andere AWS Dienste handelt, überwachen Sie diese Dienste mithilfe von. CloudWatch
Prüfen Sie bei Anwendungen mit sehr hoher Parallelität, ob die hohen Parallelitätsebenen auf alle Operatoren in der Anwendung angewendet werden. Standardmäßig wendet Apache Flink dieselbe Anwendungsparallelität für alle Operatoren im Anwendungsdiagramm an. Dies kann entweder zu Problemen bei der Bereitstellung auf Quellen oder Senken oder zu Engpässen bei der Datenverarbeitung durch die Operatoren führen. Sie können die Parallelität der einzelnen Operatoren im Code mit ändern. setParallelism
Machen Sie sich mit der Bedeutung der Parallelitätseinstellungen für die Operatoren in Ihrer Anwendung vertraut. Wenn Sie die Parallelität für einen Operator ändern, können Sie die Anwendung möglicherweise nicht aus einem Snapshot wiederherstellen, der erstellt wurde, als der Operator eine Parallelität hatte, die mit den aktuellen Einstellungen nicht kompatibel ist. Weitere Informationen zum Einstellen der Operatorparallelität finden Sie unter Explizite Festlegung der maximalen Parallelität für Operatoren
.
Weitere Informationen über die Implementierung von Skalierung finden Sie unter Implementieren Sie Anwendungsskalierung.
Parallelität pro Operator festlegen
Standardmäßig ist die Parallelität für alle Operatoren auf Anwendungsebene festgelegt. Sie können die Parallelität eines einzelnen Operators überschreiben, indem Sie DataStream API .setParallelism(x)
Sie können eine Operatorparallelität auf eine beliebige Parallelität festlegen, die gleich oder niedriger als die Anwendungsparallelität ist.
Wenn möglich, definieren Sie die Operatorparallelität als Funktion der Anwendungsparallelität. Auf diese Weise variiert die Operatorparallelität mit der Anwendungsparallelität. Wenn Sie beispielsweise automatische Skalierung verwenden, variieren alle Operatoren ihre Parallelität im gleichen Verhältnis:
int appParallelism = env.getParallelism(); ... ...ops.setParalleism(appParallelism/2);
In einigen Fällen möchten Sie möglicherweise die Operatorparallelität auf eine Konstante setzen. Stellen Sie beispielsweise die Parallelität einer Kinesis Stream-Quelle auf die Anzahl der Shards ein. In diesen Fällen sollten Sie erwägen, die Operator-Parallelität als Anwendungskonfigurationsparameter zu übergeben, um sie zu ändern, ohne den Code zu ändern, wenn Sie beispielsweise den Quellstream resharden müssen.
Protokollierung
Sie können die Leistung und die Fehlerbedingungen Ihrer Anwendung mithilfe CloudWatch von Protokollen überwachen. Berücksichtigen Sie bei der Konfiguration der Protokollierung für Ihre Anwendung Folgendes:
Aktivieren CloudWatch Sie die Protokollierung für die Anwendung, damit alle Laufzeitprobleme behoben werden können.
Erstellen Sie nicht für jeden Datensatz, der in der Anwendung verarbeitet wird, einen Protokolleintrag. Dies führt zu schwerwiegenden Engpässen bei der Verarbeitung und kann zu Gegendruck bei der Datenverarbeitung führen.
Erstellen Sie CloudWatch Alarme, um Sie zu benachrichtigen, wenn Ihre Anwendung nicht ordnungsgemäß ausgeführt wird. Weitere Informationen finden Sie unter
Weitere Informationen über die Implementierung von Protokollierung finden Sie unter .
Codierung
Sie können Ihre Anwendung leistungsfähig und stabil machen, indem Sie empfohlene Programmierpraktiken anwenden. Berücksichtigen Sie beim Schreiben von Anwendungscode Folgendes:
Verwenden Sie
system.exit()
in Ihrem Anwendungscode nicht, weder in dermain
-Methode Ihrer Anwendung noch in benutzerdefinierten Funktionen. Wenn Sie Ihre Anwendung aus dem Code heraus beenden möchten, lösen Sie eine vonException
oderRuntimeException
abgeleitete Ausnahme aus, die eine Meldung darüber enthält, was bei der Anwendung schiefgelaufen ist.Beachten Sie Folgendes darüber, wie der Service mit dieser Ausnahme umgeht:
Wenn die Ausnahme von der
main
-Methode Ihrer Anwendung ausgelöst wird, wird sie vom Service beim Übergang der Anwendung in denRUNNING
-Status in eineProgramInvocationException
eingeschlossen, und der Job-Manager kann den Job nicht weiterleiten.Wenn die Ausnahme von einer benutzerdefinierten Funktion ausgelöst wird, schlägt der Job-Manager den Job fehl und startet ihn neu. Die Details der Ausnahme werden in das Ausnahmeprotokoll geschrieben.
Erwägen Sie, Ihre JAR Anwendungsdatei und die darin enthaltenen Abhängigkeiten zu schattieren. Das Schattieren wird empfohlen, wenn es potenzielle Konflikte bei den Paketnamen zwischen Ihrer Anwendung und der Apache Flink-Laufzeit gibt. Wenn ein Konflikt auftritt, können Ihre Anwendungsprotokolle eine Ausnahme des Typs
java.util.concurrent.ExecutionException
enthalten. Weitere Informationen zum Schattieren Ihrer JAR Anwendungsdatei finden Sie unter Apache MavenShade Plugin.
Verwalten von Anmeldeinformationen.
Sie sollten keine langfristigen Anmeldeinformationen in Produktionsanwendungen (oder andere Anwendungen) integrieren. Langfristige Anmeldeinformationen werden wahrscheinlich in ein Versionskontrollsystem eingecheckt und können leicht verloren gehen. Stattdessen können Sie der Anwendung Managed Service für Apache Flink eine Rolle zuordnen und dieser Rolle Berechtigungen gewähren. Die laufende Flink-Anwendung kann dann temporäre Anmeldeinformationen mit den entsprechenden Rechten aus der Umgebung abrufen. Falls eine Authentifizierung für einen Dienst erforderlich ist, der nicht nativ in eine Datenbank integriert istIAM, z. B. für eine Datenbank, die einen Benutzernamen und ein Passwort für die Authentifizierung benötigt, sollten Sie erwägen, Geheimnisse in AWS Secrets Manager
Viele AWS native Dienste unterstützen die Authentifizierung:
Amazon MSK — https://github.com/aws/aws-msk-iam-auth/# using-the-amazon-msk
- library-for-iam-authentication Amazon Elasticsearch Service — .java AmazonElasticsearchSink
Amazon S3 – funktioniert direkt mit Managed Service für Apache Flink
Lesen aus Quellen mit wenigen Shards/Partitionen
Beim Lesen aus Apache Kafka oder einem Kinesis Data Stream kann es zu einer Diskrepanz zwischen der Parallelität des Streams (d. h. der Anzahl der Partitionen für Kafka und der Anzahl der Shards für Kinesis) und der Parallelität der Anwendung kommen. Bei einem naiven Design kann die Parallelität einer Anwendung nicht über die Parallelität eines Streams skalieren: Jede Unteraufgabe eines Quelloperators kann nur aus 1 oder mehreren Shards/Partitionen lesen. Das bedeutet für einen Stream mit nur 2 Shards und eine Anwendung mit einer Parallelität von 8, dass nur zwei Unteraufgaben den Stream tatsächlich verbrauchen und 6 Unteraufgaben inaktiv bleiben. Dies kann den Durchsatz der Anwendung erheblich einschränken, insbesondere wenn die Deserialisierung teuer ist und von der Quelle durchgeführt wird (was die Standardeinstellung ist).
Um diesen Effekt zu mildern, können Sie den Stream entweder skalieren. Dies ist jedoch möglicherweise nicht immer wünschenswert oder möglich. Alternativ können Sie die Quelle so umstrukturieren, dass sie keine Serialisierung durchführt und nur die byte[]
weitergibt. Anschließend können Sie die Daten umverteilen
Aktualisierungsintervall für Studio-Notebooks
Wenn Sie das Aktualisierungsintervall für die Absatzergebnisse ändern, setzen Sie es auf einen Wert, der mindestens 1000 Millisekunden beträgt.
Optimale Leistung des Studio-Notebooks
Wir haben mit der folgenden Aussage getestet und die beste Leistung erhalten, wenn events-per-second
mit number-of-keys
multipliziert unter 25.000.000 ergab. Das war bei events-per-second
weniger als 150.000.
SELECT key, sum(value) FROM key-values GROUP BY key
Wie sich Strategien mit Wasserzeichen und ungenutzte Shards auf Zeitfenster auswirken
Beim Lesen von Ereignissen aus Apache Kafka und Kinesis Data Streams kann die Quelle die Ereigniszeit basierend auf den Attributen des Streams festlegen. Im Fall von Kinesis entspricht die Ereigniszeit der ungefähren Ankunftszeit von Ereignissen. Die Festlegung der Ereigniszeit an der Quelle für Ereignisse reicht jedoch nicht aus, damit eine Flink-Anwendung die Ereigniszeit verwenden kann. Die Quelle muss außerdem Wasserzeichen erzeugen, die Informationen über die Ereigniszeit von der Quelle an alle anderen Operatoren weitergeben. Die Flink-Dokumentation
Standardmäßig ist der Zeitstempel eines aus Kinesis gelesenen Ereignisses auf die ungefähre Ankunftszeit gesetzt, die von Kinesis bestimmt wird. Eine zusätzliche Voraussetzung dafür, dass die Ereigniszeit in der Anwendung funktioniert, ist eine Wasserzeichen-Strategie.
WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(...));
Die Wasserzeichenstrategie wird dann auf einen DataStream
mit der Methode assignTimestampsAndWatermarks
angewendet. Es gibt einige nützliche integrierte Strategien:
forMonotonousTimestamps()
verwendet einfach die Uhrzeit des Ereignisses (ungefähre Ankunftszeit) und gibt in regelmäßigen Abständen den Maximalwert als Wasserzeichen aus (für jede spezifische Unteraufgabe)forBoundedOutOfOrderness(Duration.ofSeconds(...))
ähnelt der vorherigen Strategie, verwendet jedoch die Ereigniszeit – Dauer für die Generierung von Wasserzeichen.
Das funktioniert, aber es gibt ein paar Vorbehalte, die Sie beachten sollten. Wasserzeichen werden auf Unteraufgabenebene generiert und fließen durch das Operatordiagramm.
Aus der Flink-Dokumentation
Jede parallel Unteraufgabe einer Quellfunktion generiert ihre Wasserzeichen normalerweise unabhängig. Diese Wasserzeichen definieren die Ereigniszeit an dieser bestimmten parallelen Quelle.
Während die Wasserzeichen durch das Streaming-Programm fließen, verschieben sie die Ereigniszeit bei den Operatoren, wo sie ankommen. Immer wenn ein Betreiber seine Ereigniszeit vorverlegt, generiert er nachgelagert ein neues Wasserzeichen für seine nachfolgenden Operatoren.
Manche Operatoren verwenden mehrere Eingabestreams, zum Beispiel eine Union oder Operatoren, die einer keyBy (...) - oder Partitionsfunktion (...) folgen. Die aktuelle Ereigniszeit eines solchen Operators ist das Minimum der Ereigniszeiten seiner Eingabestreams. Wenn seine Eingabestreams ihre Ereigniszeiten aktualisieren, tut dies auch der Operator.
Das heißt, wenn eine Quell-Unteraufgabe von einem inaktiven Shard aus konsumiert, erhalten nachgeschaltete Operatoren keine neuen Wasserzeichen von dieser Unteraufgabe, sodass die Verarbeitung für alle nachgeschalteten Operatoren, die Zeitfenster verwenden, zum Stillstand kommt. Um dies zu vermeiden, können Kunden die withIdleness
-Option zur Wasserzeichenstrategie hinzufügen. Bei dieser Option schließt ein Operator bei der Berechnung der Ereigniszeit des Operators die Wasserzeichen aus ungenutzten Upsteam-Unteraufgaben aus. Unteraufgaben im Leerlauf blockieren somit nicht mehr die Weiterleitung der Ereigniszeit bei nachgeschalteten Operatoren.
Die Idleness-Option mit den integrierten Wasserzeichen-Strategien verschiebt die Ereigniszeit jedoch nicht nach vorn, wenn keine Unteraufgabe ein Ereignis liest, d. h. es keine Ereignisse im Stream gibt. Dies wird besonders bei Testfällen sichtbar, in denen eine begrenzte Menge von Ereignissen aus dem Stream gelesen wird. Da die Ereigniszeit nach dem Lesen des letzten Ereignisses nicht vorverlegt wird, wird das letzte Fenster (das das letzte Ereignis enthält) niemals geschlossen.
Übersicht
Mit
withIdleness
-Einstellung werden keine neuen Wasserzeichen generiert, falls ein Shard inaktiv ist. Sie schließt lediglich das letzte Wasserzeichen aus, das von Unteraufgaben im Leerlauf gesendet wurde, bei der Berechnung des Mindestwasserzeichens in nachgeschalteten OperatorenBei den integrierten Wasserzeichen-Strategien wird das zuletzt geöffnete Fenster nie geschlossen (es sei denn, es werden neue Ereignisse gesendet, die das Wasserzeichen weiterleiten, aber dadurch wird ein neues Fenster erstellt, das dann geöffnet bleibt)
Selbst wenn die Zeit durch den Kinesis-Stream festgelegt wird, können spät eintreffende Ereignisse immer noch auftreten, wenn ein Shard schneller verbraucht wird als andere (z. B. während der App-Initialisierung oder bei der Verwendung von
TRIM_HORIZON
, wo alle vorhandenen Shards parallel konsumiert werden, wobei ihre Eltern/Kind-Beziehung ignoriert wird)Die
withIdleness
-Einstellungen der Wasserzeichenstrategie scheinen die quellspezifischen Kinesis-Einstellungen für inaktive Shards als veraltet zu markieren(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS
Beispiel
Die folgende Anwendung liest aus einem Stream und erstellt Sitzungsfenster auf der Grundlage der Ereigniszeit.
Properties consumerConfig = new Properties(); consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("...", new SimpleStringSchema(), consumerConfig); WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(15)); env.addSource(consumer) .assignTimestampsAndWatermarks(s) .map(new MapFunction<String, Long>() { @Override public Long map(String s) throws Exception { return Long.parseLong(s); } }) .keyBy(l -> 0l) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .process(new ProcessWindowFunction<Long, Object, Long, TimeWindow>() { @Override public void process(Long aLong, ProcessWindowFunction<Long, Object, Long, TimeWindow>.Context context, Iterable<Long>iterable, Collector<Object> collector) throws Exception { long count = StreamSupport.stream(iterable.spliterator(), false).count(); long timestamp = context.currentWatermark(); System.out.print("XXXXXXXXXXXXXX Window with " + count + " events"); System.out.println("; Watermark: " + timestamp + ", " + Instant.ofEpochMilli(timestamp)); for (Long l : iterable) { System.out.println(l); } } });
Im folgenden Beispiel werden 8 Ereignisse in einen Stream mit 16 Shards geschrieben (die ersten 2 und das letzte Ereignis landen zufällig im selben Shard).
$ aws kinesis put-record --stream-name hp-16 --partition-key 1 --data MQ== $ aws kinesis put-record --stream-name hp-16 --partition-key 2 --data Mg== $ aws kinesis put-record --stream-name hp-16 --partition-key 3 --data Mw== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028721934184977530127978070210" } { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028795678659974022576354623682" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275134360684221592378842022114" } Wed Mar 23 11:19:57 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 4 --data NA== $ aws kinesis put-record --stream-name hp-16 --partition-key 5 --data NQ== $ date { "ShardId": "shardId-000000000010", "SequenceNumber": "49627894338570054070103749783042116732419934393936642210" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275659034489934342334017700066" } Wed Mar 23 11:20:10 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 6 --data Ng== $ date { "ShardId": "shardId-000000000001", "SequenceNumber": "49627894338369347363316974173886988345467035365375213586" } Wed Mar 23 11:20:22 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 7 --data Nw== $ date { "ShardId": "shardId-000000000008", "SequenceNumber": "49627894338525452579706688535878947299195189349725503618" } Wed Mar 23 11:20:34 CET 2022 $ sleep 60 $ aws kinesis put-record --stream-name hp-16 --partition-key 8 --data OA== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811029600823255837371928900796610" } Wed Mar 23 11:21:27 CET 2022
Diese Eingabe sollte zu 5 Sitzungsfenstern führen: Ereignis 1,2,3; Ereignis 4,5; Ereignis 6; Ereignis 7; Ereignis 8. Das Programm liefert jedoch nur die ersten 4 Fenster.
11:59:21,529 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 5 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 5 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,531 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 4 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 4 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:23,209 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,244 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 event: 6; timestamp: 1648030822428, 2022-03-23T10:20:22.428Z 11:59:23,377 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,405 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,581 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,586 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:24,790 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 4; timestamp: 1648030809282, 2022-03-23T10:20:09.282Z event: 3; timestamp: 1648030797697, 2022-03-23T10:19:57.697Z event: 5; timestamp: 1648030810871, 2022-03-23T10:20:10.871Z 11:59:24,907 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 7; timestamp: 1648030834105, 2022-03-23T10:20:34.105Z event: 1; timestamp: 1648030794441, 2022-03-23T10:19:54.441Z event: 2; timestamp: 1648030796122, 2022-03-23T10:19:56.122Z event: 8; timestamp: 1648030887171, 2022-03-23T10:21:27.171Z XXXXXXXXXXXXXX Window with 3 events; Watermark: 1648030809281, 2022-03-23T10:20:09.281Z 3 1 2 XXXXXXXXXXXXXX Window with 2 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 4 5 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 6 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030887170, 2022-03-23T10:21:27.170Z 7
Die Ausgabe zeigt nur 4 Fenster (es fehlt das letzte Fenster, das Ereignis 8 enthält). Das liegt an der Ereigniszeit und der Wasserzeichen-Strategie. Das letzte Fenster kann nicht geschlossen werden, da bei den vorgefertigten Wasserzeichen-Strategie die Zeit nie über den Zeitpunkt des letzten Ereignisses hinausgeht, das aus dem Stream gelesen wurde. Damit das Fenster geschlossen werden kann, muss die Zeit jedoch um mehr als 10 Sekunden über das letzte Ereignis hinausgehen. In diesem Fall ist das letzte Wasserzeichen 2022-03-23T10:21:27.170Z, aber damit das Sitzungsfenster geschlossen werden kann, ist ein Wasserzeichen 10s und 1ms später erforderlich.
Wenn die withIdleness
-Option aus der Wasserzeichenstrategie entfernt wird, wird kein Sitzungsfenster jemals geschlossen, da das „globale Wasserzeichen“ des Fensteroperators nicht vorrücken kann.
Beachten Sie, dass beim Start der Flink-Anwendung (oder bei Datenverzerrungen) einige Shards möglicherweise schneller verbraucht werden als andere. Dies kann dazu führen, dass einige Wasserzeichen zu früh von einer Unteraufgabe ausgegeben werden (die Unteraufgabe kann das Wasserzeichen basierend auf dem Inhalt eines Shards ausgeben, ohne die anderen Shards, die sie abonniert hat, verbraucht zu haben). Abhilfe schaffen verschiedene Strategien mit Wasserzeichen, die einen Sicherheitspuffer hinzufügen (forBoundedOutOfOrderness(Duration.ofSeconds(30))
oder verspätet eintreffende Ereignisse ausdrücklich zulassen (allowedLateness(Time.minutes(5))
.
Stellen Sie a UUID für alle Operatoren ein
Wenn Managed Service für Apache Flink einen Flink-Auftrag für eine Anwendung mit einem Snapshot startet, kann der Flink-Auftrag aufgrund bestimmter Probleme nicht gestartet werden. Eines davon ist die Nichtübereinstimmung der Operator-ID. Flink erwartet explizite, konsistente Operatoren IDs für Flink-Jobgraph-Operatoren. Wenn nicht explizit gesetzt, generiert Flink automatisch eine ID für die Operatoren. Dies liegt daran, dass Flink diese Operatoren verwendetIDs, um die Operatoren in einem Job-Graph eindeutig zu identifizieren, und sie verwendet, um den Status jedes Operators in einem Savepoint zu speichern.
Das Problem, dass die Operator-ID nicht übereinstimmt, tritt auf, wenn Flink keine 1:1 -Zuordnung zwischen dem Operator IDs eines Job-Graphen und dem in einem Savepoint definierten Operator IDs findet. Dies passiert, wenn keine expliziten konsistenten Operatoren gesetzt IDs sind und Flink automatisch Operatoren generiertIDs, die möglicherweise nicht bei jeder Jobgraph-Erstellung konsistent sind. Die Wahrscheinlichkeit, dass Anwendungen bei Wartungsarbeiten auf dieses Problem stoßen, ist hoch. Um dies zu vermeiden, empfehlen wir Kunden, UUID für alle Operatoren einen Flink-Code festzulegen. Weitere Informationen finden Sie im Thema Set a UUID für alle Operatoren unter Produktionsbereitschaft.
ServiceResourceTransformer Zum Maven Shade-Plugin hinzufügen
Flink verwendet die Service Provider Interfaces (SPI)
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <id>shade</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- ... --> </transformers> </configuration> </execution> </executions> </plugin>