Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.
Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Erstellen Sie eine Managed Service for Apache Flink-Anwendung und führen Sie sie aus
In diesem Schritt erstellen Sie eine Managed Service für Apache Flink-Anwendung mit Kinesis-Datenströmen als Quelle und Senke.
Dieser Abschnitt enthält die folgenden Schritte:
- Erstellen Sie abhängige Ressourcen
- Einrichten der lokalen Entwicklungsumgebung
- Laden Sie den Apache Flink-Streaming-Java-Code herunter und untersuchen Sie ihn
- Schreiben Sie Beispieldatensätze in den Eingabestream
- Führen Sie Ihre Anwendung lokal aus
- Beobachten Sie Eingabe- und Ausgabedaten in Kinesis-Streams
- Stoppen Sie, dass Ihre Anwendung lokal ausgeführt wird
- Kompilieren und verpacken Sie Ihren Anwendungscode
- Laden Sie die JAR Anwendungscodedatei hoch
- Erstellen und konfigurieren Sie die Anwendung Managed Service für Apache Flink
- Nächster Schritt
Erstellen Sie abhängige Ressourcen
Bevor Sie für diese Übung eine Anwendung von Managed Service für Apache Flink erstellen, erstellen Sie die folgenden abhängigen Ressourcen:
-
Zwei Kinesis-Datenströme für Eingabe und Ausgabe
-
Ein Amazon S3 S3-Bucket zum Speichern des Anwendungscodes
Anmerkung
In diesem Tutorial wird davon ausgegangen, dass Sie Ihre Anwendung in der Region us-east-1 US East (Nord-Virginia) bereitstellen. Wenn Sie eine andere Region verwenden, passen Sie alle Schritte entsprechend an.
Erstellen Sie zwei Amazon Kinesis Kinesis-Datenstreams
Bevor Sie für diese Übung eine Anwendung von Managed Service für Apache Flink erstellen, erstellen Sie zwei Kinesis Data Streams (ExampleInputStream
und ExampleOutputStream
). Ihre Anwendung verwendet diese Streams für die Quell- und Ziel-Streams der Anwendung.
Sie können diese Streams entweder mit der Amazon Kinesis Kinesis-Konsole oder mit dem folgenden AWS CLI Befehl erstellen. Anweisungen für die Konsole finden Sie unter Erstellen und Aktualisieren von Datenströmen im Amazon Kinesis Data Streams Entwicklerhandbuch. Um die Streams mit dem zu erstellen AWS CLI, verwenden Sie die folgenden Befehle und passen Sie sie an die Region an, die Sie für Ihre Anwendung verwenden.
So erstellen Sie die Daten-Streams (AWS CLI)
-
Verwenden Sie den folgenden Amazon Kinesis
create-stream
AWS CLI Kinesis-Befehl, um den ersten Stream (ExampleInputStream
) zu erstellen:$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \
-
Um den zweiten Stream zu erstellen, den die Anwendung zum Schreiben der Ausgabe verwendet, führen Sie denselben Befehl aus und ändern Sie den Stream-Namen in
ExampleOutputStream
:$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \
Erstellen Sie einen Amazon S3 S3-Bucket für den Anwendungscode
Sie können ein Amazon-S3-Bucket mithilfe der Konsole erstellen. Informationen zum Erstellen eines Amazon S3 S3-Buckets mithilfe der Konsole finden Sie unter Erstellen eines Buckets im Amazon S3 S3-Benutzerhandbuch. Benennen Sie den Amazon S3 S3-Bucket mit einem weltweit eindeutigen Namen, indem Sie beispielsweise Ihren Anmeldenamen anhängen.
Anmerkung
Stellen Sie sicher, dass Sie den Bucket in der Region erstellen, die Sie für dieses Tutorial verwenden (us-east-1).
Sonstige Ressourcen
Wenn Sie Ihre Anwendung erstellen, erstellt Managed Service for Apache Flink automatisch die folgenden CloudWatch Amazon-Ressourcen, sofern sie noch nicht vorhanden sind:
-
Eine Protokollgruppe mit dem Namen
/AWS/KinesisAnalytics-java/<my-application>
-
Einen Protokollstream mit dem Namen
kinesis-analytics-log-stream
Einrichten der lokalen Entwicklungsumgebung
Für die Entwicklung und das Debuggen können Sie die Apache Flink-Anwendung auf Ihrem Computer direkt von einem beliebigen Computer aus ausführen. IDE Alle Apache Flink-Abhängigkeiten werden wie normale Java-Abhängigkeiten mit Apache Maven behandelt.
Anmerkung
Auf Ihrem Entwicklungscomputer müssen Sie Java JDK 11, Maven und Git installiert haben. Wir empfehlen Ihnen, eine Entwicklungsumgebung wie Eclipse, Java Neon
Authentifizieren Sie Ihre Sitzung AWS
Die Anwendung verwendet Kinesis-Datenströme, um Daten zu veröffentlichen. Bei der lokalen Ausführung benötigen Sie eine gültige AWS authentifizierte Sitzung mit Schreibberechtigungen in den Kinesis-Datenstrom. Verwenden Sie die folgenden Schritte, um Ihre Sitzung zu authentifizieren:
-
Wenn Sie das Profil AWS CLI und ein benanntes Profil mit gültigen Anmeldeinformationen nicht konfiguriert haben, finden Sie weitere Informationen unter. Richten Sie das AWS Command Line Interface (AWS CLI) ein
-
Vergewissern Sie sich, dass Ihre korrekt konfiguriert AWS CLI ist und dass Ihre Benutzer über Schreibberechtigungen in den Kinesis-Datenstrom verfügen, indem Sie den folgenden Testdatensatz veröffentlichen:
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
-
Wenn Sie IDE über ein Plug-in zur Integration verfügen AWS, können Sie es verwenden, um die Anmeldeinformationen an die Anwendung zu übergeben, die in der IDE ausgeführt wird. Weitere Informationen finden Sie unter AWS Toolkit für IntelliJ IDEA und AWS Toolkit for Eclipse
.
Laden Sie den Apache Flink-Streaming-Java-Code herunter und untersuchen Sie ihn
Der Java-Anwendungscode für dieses Beispiel ist verfügbar unter GitHub. Zum Herunterladen des Anwendungscodes gehen Sie wie folgt vor:
-
Klonen Sie das Remote-Repository, indem Sie den folgenden Befehl verwenden:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
-
Navigieren Sie zum
amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted
Verzeichnis .
Überprüfen Sie die Anwendungskomponenten
Die Anwendung ist vollständig in der com.amazonaws.services.msf.BasicStreamingJob
Klasse implementiert. Die main()
Methode definiert den Datenfluss, um die Streaming-Daten zu verarbeiten und auszuführen.
Anmerkung
Für ein optimiertes Entwicklererlebnis ist die Anwendung so konzipiert, dass sie ohne Codeänderungen sowohl auf Amazon Managed Service für Apache Flink als auch lokal für die Entwicklung in Ihrem IDE ausgeführt werden kann.
-
Um die Laufzeitkonfiguration zu lesen, damit sie funktioniert, wenn sie in Amazon Managed Service for Apache Flink und in Ihrem läuftIDE, erkennt die Anwendung automatisch, ob sie eigenständig lokal in der IDE läuft. In diesem Fall lädt die Anwendung die Laufzeitkonfiguration anders:
-
Wenn die Anwendung feststellt, dass sie in Ihrem eigenständigen Modus ausgeführt wirdIDE, erstellen Sie die
application_properties.json
Datei, die im Ressourcenordner des Projekts enthalten ist. Der Inhalt der Datei folgt. -
Wenn die Anwendung in Amazon Managed Service für Apache Flink ausgeführt wird, lädt das Standardverhalten die Anwendungskonfiguration aus den Laufzeiteigenschaften, die Sie in der Amazon Managed Service for Apache Flink-Anwendung definieren. Siehe Erstellen und konfigurieren Sie die Anwendung Managed Service für Apache Flink.
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
-
-
Die
main()
Methode definiert den Anwendungsdatenfluss und führt ihn aus.-
Initialisiert die Standard-Streaming-Umgebungen. In diesem Beispiel zeigen wir, wie Sie sowohl die Tabelle erstellen, die mit der verwendet werden
StreamExecutionEnvironment
soll, als auch die TabelleStreamTableEnvironment
, mit SQL der Sie verwendet werden können. DataSteam API API Die beiden Umgebungsobjekte sind zwei separate Verweise auf dieselbe Laufzeitumgebung, die unterschiedlich verwendet werden sollAPIs.StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
Laden Sie die Konfigurationsparameter der Anwendung. Dadurch werden sie automatisch von der richtigen Stelle geladen, je nachdem, wo die Anwendung ausgeführt wird:
Map<String, Properties> applicationParameters = loadApplicationProperties(env);
-
Die Anwendung definiert mithilfe des Kinesis Consumer-Connectors
eine Quelle, um Daten aus dem Eingabestream zu lesen. Die Konfiguration des Eingabestreams ist im PropertyGroupId
=InputStream0
definiert. Der Name und die Region des Streams sind in den jeweiligen Eigenschaften benanntstream.name
undaws.region
angegeben. Der Einfachheit halber liest diese Quelle die Datensätze als Zeichenfolge.private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... }
-
Die Anwendung definiert dann mithilfe des Kinesis Streams Sink
Connectors eine Senke, um Daten an den Ausgabestrom zu senden. Name und Region des Ausgabestreams sind im PropertyGroupId
= definiertOutputStream0
, ähnlich wie beim Eingabestream. Die Senke ist direkt mit der internen Senke verbundenDataStream
, die Daten von der Quelle bezieht. In einer echten Anwendung gibt es eine gewisse Transformation zwischen Quelle und Senke.private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... }
-
Schließlich führen Sie den Datenfluss aus, den Sie gerade definiert haben. Dies muss die letzte Anweisung der
main()
Methode sein, nachdem Sie alle Operatoren definiert haben, die für den Datenfluss erforderlich sind:env.execute("Flink streaming Java API skeleton");
-
Verwenden Sie die Datei pom.xml
Die Datei pom.xml definiert alle Abhängigkeiten, die von der Anwendung benötigt werden, und richtet das Maven Shade-Plugin ein, um das Fat-Jar zu erstellen, das alle von Flink benötigten Abhängigkeiten enthält.
-
Einige Abhängigkeiten haben einen Gültigkeitsbereich.
provided
Diese Abhängigkeiten sind automatisch verfügbar, wenn die Anwendung in Amazon Managed Service for Apache Flink ausgeführt wird. Sie sind erforderlich, um die Anwendung zu kompilieren oder um die Anwendung lokal in Ihrem IDE auszuführen. Weitere Informationen finden Sie unter Führen Sie Ihre Anwendung lokal aus. Stellen Sie sicher, dass Sie dieselbe Flink-Version wie die Runtime verwenden, die Sie in Amazon Managed Service for Apache Flink verwenden werden.<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
Sie müssen dem POM zusätzliche Apache Flink-Abhängigkeiten mit dem Standardbereich hinzufügen, z. B. den von dieser Anwendung verwendeten Kinesis-Connector
. Weitere Informationen finden Sie unter Verwenden Sie Apache Flink-Konnektoren mit Managed Service für Apache Flink. Sie können auch alle zusätzlichen Java-Abhängigkeiten hinzufügen, die für Ihre Anwendung erforderlich sind. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
-
Das Maven Java Compiler-Plugin stellt sicher, dass der Code mit Java 11 kompiliert wird, der JDK Version, die derzeit von Apache Flink unterstützt wird.
-
Das Maven Shade-Plugin packt das Fat-Jar, mit Ausnahme einiger Bibliotheken, die von der Runtime bereitgestellt werden. Es spezifiziert auch zwei Transformatoren: und.
ServicesResourceTransformer
ManifestResourceTransformer
Letzteres konfiguriert die Klasse, die diemain
Methode zum Starten der Anwendung enthält. Wenn Sie die Hauptklasse umbenennen, vergessen Sie nicht, diesen Transformator zu aktualisieren. -
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>
Schreiben Sie Beispieldatensätze in den Eingabestream
In diesem Abschnitt senden Sie Beispieldatensätze an den Stream, damit die Anwendung sie verarbeiten kann. Sie haben zwei Möglichkeiten, Beispieldaten zu generieren, entweder mit einem Python-Skript oder mit dem Kinesis Data Generator
Generieren Sie Beispieldaten mit einem Python-Skript
Sie können ein Python-Skript verwenden, um Beispieldatensätze an den Stream zu senden.
Anmerkung
Um dieses Python-Skript auszuführen, müssen Sie Python 3.x verwenden und die AWS SDKfor Python (Boto)
Um mit dem Senden von Testdaten an den Kinesis-Eingabestream zu beginnen:
-
Laden Sie das
stock.py
Python-Skript für den Datengenerator aus dem GitHub Datengenerator-Repositoryherunter. -
Führen Sie das
stock.py
Skript aus:$ python stock.py
Lassen Sie das Skript laufen, während Sie den Rest des Tutorials abschließen. Sie können jetzt Ihre Apache Flink-Anwendung ausführen.
Generieren Sie Beispieldaten mit Kinesis Data Generator
Alternativ zur Verwendung des Python-Skripts können Sie Kinesis Data Generator
So richten Sie Kinesis Data Generator ein und führen ihn aus:
-
Folgen Sie den Anweisungen in der Kinesis Data Generator-Dokumentation
, um den Zugriff auf das Tool einzurichten. Sie werden eine AWS CloudFormation Vorlage ausführen, die einen Benutzer und ein Passwort einrichtet. -
Greifen Sie über die von der CloudFormation Vorlage URL generierte Datei auf Kinesis Data Generator zu. Sie finden das auf URL der Registerkarte Ausgabe, nachdem die CloudFormation Vorlage fertiggestellt wurde.
-
Konfigurieren Sie den Datengenerator:
-
Region: Wählen Sie die Region aus, die Sie für dieses Tutorial verwenden: us-east-1
-
Stream/Delivery-Stream: Wählen Sie den Eingabestream aus, den die Anwendung verwenden soll:
ExampleInputStream
-
Datensätze pro Sekunde: 100
-
Datensatzvorlage: Kopieren Sie die folgende Vorlage und fügen Sie sie ein:
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
Testen Sie die Vorlage: Wählen Sie Testvorlage und stellen Sie sicher, dass der generierte Datensatz dem folgenden ähnelt:
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
-
Starten Sie den Datengenerator: Wählen Sie Select Send Data.
Kinesis Data Generator sendet jetzt Daten an denExampleInputStream
.
Führen Sie Ihre Anwendung lokal aus
Sie können Ihre Flink-Anwendung lokal in Ihrem ausführen und debuggen. IDE
Anmerkung
Bevor Sie fortfahren, stellen Sie sicher, dass die Eingabe- und Ausgabestreams verfügbar sind. Siehe Erstellen Sie zwei Amazon Kinesis Kinesis-Datenstreams. Stellen Sie außerdem sicher, dass Sie über Lese- und Schreibberechtigungen für beide Streams verfügen. Siehe Authentifizieren Sie Ihre Sitzung AWS.
Für die Einrichtung der lokalen Entwicklungsumgebung sind Java 11JDK, Apache Maven und IDE für die Java-Entwicklung erforderlich. Stellen Sie sicher, dass Sie die erforderlichen Voraussetzungen erfüllen. Siehe Erfüllen Sie die Voraussetzungen für das Abschließen der Übungen.
Importieren Sie das Java-Projekt in Ihr IDE
Um mit der Arbeit an der Anwendung in Ihrem zu beginnenIDE, müssen Sie sie als Java-Projekt importieren.
Das von Ihnen geklonte Repository enthält mehrere Beispiele. Jedes Beispiel ist ein separates Projekt. Importieren Sie für dieses Tutorial den Inhalt im ./java/GettingStarted
Unterverzeichnis in IhrIDE.
Fügen Sie den Code mithilfe von Maven als vorhandenes Java-Projekt ein.
Anmerkung
Der genaue Vorgang zum Importieren eines neuen Java-Projekts hängt von dem von IDE Ihnen verwendeten ab.
Überprüfen Sie die lokale Anwendungskonfiguration
Bei der lokalen Ausführung verwendet die Anwendung die Konfiguration in der application_properties.json
Datei im Ressourcenordner des Projekts unter./src/main/resources
. Sie können diese Datei bearbeiten, um verschiedene Kinesis-Stream-Namen oder -Regionen zu verwenden.
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
Richten Sie Ihre IDE Laufkonfiguration ein
Sie können die Flink-Anwendung IDE direkt von Ihnen aus ausführen und debuggen, indem Sie die Hauptklasse ausführencom.amazonaws.services.msf.BasicStreamingJob
, wie Sie jede Java-Anwendung ausführen würden. Bevor Sie die Anwendung ausführen, müssen Sie die Run-Konfiguration einrichten. Die Einrichtung hängt von der abIDE, die Sie verwenden. Siehe beispielsweise Run/Debug-Konfigurationen
-
Fügen Sie die
provided
Abhängigkeiten zum Klassenpfad hinzu. Dies ist erforderlich, um sicherzustellen, dass die Abhängigkeiten mitprovided
Gültigkeitsbereich an die Anwendung übergeben werden, wenn sie lokal ausgeführt wird. Ohne diese Einrichtung zeigt die Anwendung sofort einenclass not found
Fehler an. -
Übergeben Sie die AWS Anmeldeinformationen für den Zugriff auf die Kinesis-Streams an die Anwendung. Am schnellsten ist es, AWS Toolkit for IDEA IntelliJ
zu verwenden. Mit diesem IDE Plugin in der Run-Konfiguration können Sie ein bestimmtes Profil auswählen. AWS AWS Die Authentifizierung erfolgt mit diesem Profil. Sie müssen die AWS Anmeldeinformationen nicht direkt weitergeben. -
Stellen Sie sicher, dass die Anwendung mit JDK11 IDE ausgeführt wird.
Führen Sie die Anwendung in Ihrem IDE
Nachdem Sie die Run-Konfiguration für eingerichtet habenBasicStreamingJob
, können Sie sie wie eine normale Java-Anwendung ausführen oder debuggen.
Anmerkung
Sie können das von Maven generierte Fat-Jar nicht direkt über die java -jar
...
Befehlszeile ausführen. Dieses JAR enthält nicht die Kernabhängigkeiten von Flink, die für die eigenständige Ausführung der Anwendung erforderlich sind.
Wenn die Anwendung erfolgreich gestartet wird, protokolliert sie einige Informationen über den eigenständigen Minicluster und die Initialisierung der Konnektoren. Darauf folgen eine Reihe INFO und einige WARN Protokolle, die Flink normalerweise beim Start der Anwendung ausgibt.
13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....
Nach Abschluss der Initialisierung gibt die Anwendung keine weiteren Protokolleinträge aus. Während der Datenfluss erfolgt, wird kein Protokoll ausgegeben.
Um zu überprüfen, ob die Anwendung Daten korrekt verarbeitet, können Sie die Eingabe- und Ausgabe-Kinesis-Streams überprüfen, wie im folgenden Abschnitt beschrieben.
Anmerkung
Es ist das normale Verhalten einer Flink-Anwendung, keine Protokolle über fließende Daten auszugeben. Das Ausgeben von Protokollen für jeden Datensatz mag für das Debuggen praktisch sein, kann aber bei der Ausführung in der Produktion zu erheblichem Mehraufwand führen.
Beobachten Sie Eingabe- und Ausgabedaten in Kinesis-Streams
Sie können Datensätze beobachten, die vom (generierenden Beispiel-Python) oder dem Kinesis Data Generator (Link) an den Eingabestream gesendet wurden, indem Sie den Data Viewer in der Amazon Kinesis Kinesis-Konsole verwenden.
Um Aufzeichnungen zu beobachten
Öffnen Sie die Kinesis-Konsole unter https://console.aws.amazon.com/kinesis.
-
Stellen Sie sicher, dass die Region mit der Region übereinstimmt, in der Sie dieses Tutorial ausführen, und zwar standardmäßig us-east-1 US East (Nord-Virginia). Ändern Sie die Region, falls sie nicht übereinstimmt.
-
Wählen Sie Datenströme.
-
Wählen Sie den Stream aus, den Sie beobachten möchten, entweder
ExampleInputStream
oderExampleOutputStream.
-
Wählen Sie die Registerkarte „Datenanzeige“.
-
Wählen Sie einen beliebigen Shard aus, behalten Sie „Letzte“ als Startposition bei und wählen Sie dann „Datensätze abrufen“. Möglicherweise wird die Fehlermeldung „Für diese Anfrage wurde kein Datensatz gefunden“ angezeigt. Wenn ja, wählen Sie „Erneut versuchen, Datensätze abzurufen“. Die neuesten im Stream veröffentlichten Datensätze werden angezeigt.
-
Wählen Sie den Wert in der Datenspalte, um den Inhalt des Datensatzes im JSON Format zu überprüfen.
Stoppen Sie, dass Ihre Anwendung lokal ausgeführt wird
Stoppen Sie die Anwendung, die in Ihrem läuftIDE. Das bietet IDE normalerweise eine „Stopp“ -Option. Der genaue Standort und die Methode hängen von der IDE verwendeten ab.
Kompilieren und verpacken Sie Ihren Anwendungscode
In diesem Abschnitt verwenden Sie Apache Maven, um den Java-Code zu kompilieren und in eine JAR Datei zu packen. Sie können Ihren Code mit dem Maven-Befehlszeilentool oder Ihrem kompilieren und verpacken. IDE
Um mit der Maven-Befehlszeile zu kompilieren und zu paketieren:
Gehen Sie in das Verzeichnis, das das GettingStarted Java-Projekt enthält, und führen Sie den folgenden Befehl aus:
$ mvn package
Um mit Ihrem zu kompilieren und zu paketierenIDE:
Führen Sie es mvn package
von Ihrer IDE Maven-Integration aus aus.
In beiden Fällen wird die folgende JAR Datei erstellt:target/amazon-msf-java-stream-app-1.0.jar
.
Anmerkung
Wenn Sie ein „Build-Projekt“ von Ihrem aus ausführen, wird die JAR Datei IDE möglicherweise nicht erstellt.
Laden Sie die JAR Anwendungscodedatei hoch
In diesem Abschnitt laden Sie die JAR Datei, die Sie im vorherigen Abschnitt erstellt haben, in den Amazon Simple Storage Service (Amazon S3) -Bucket hoch, den Sie zu Beginn dieses Tutorials erstellt haben. Wenn Sie diesen Schritt noch nicht abgeschlossen haben, finden Sie weitere Informationen unter (Link).
Um die JAR Anwendungscodedatei hochzuladen
Öffnen Sie die Amazon S3 S3-Konsole unter https://console.aws.amazon.com/s3/
. -
Wählen Sie den Bucket aus, den Sie zuvor für den Anwendungscode erstellt haben.
-
Klicken Sie auf Hochladen.
-
Klicken Sie auf Add files.
-
Navigieren Sie zu der im vorherigen Schritt generierten JAR Datei:
target/amazon-msf-java-stream-app-1.0.jar
. -
Wählen Sie Hochladen, ohne andere Einstellungen zu ändern.
Warnung
Stellen Sie sicher, dass Sie die richtige JAR Datei in auswählen<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar
.
Das target
Verzeichnis enthält auch andere JAR Dateien, die Sie nicht hochladen müssen.
Erstellen und konfigurieren Sie die Anwendung Managed Service für Apache Flink
Sie können eine Anwendung von Managed Service für Apache Flink entweder über die Konsole oder AWS CLI erstellen und ausführen. Für dieses Tutorial verwenden Sie die Konsole.
Anmerkung
Wenn Sie die Anwendung mithilfe der Konsole erstellen, werden Ihre AWS Identity and Access Management (IAM) und Amazon CloudWatch Logs-Ressourcen für Sie erstellt. Wenn Sie die Anwendung mithilfe von erstellen AWS CLI, erstellen Sie diese Ressourcen separat.
Themen
Erstellen der Anwendung
So erstellen Sie die Anwendung
Öffnen Sie die Managed Service for Apache Flink-Konsole unter https://console.aws.amazon.com /flink
-
Stellen Sie sicher, dass die richtige Region ausgewählt ist: us-east-1 US East (Nord-Virginia)
-
Öffnen Sie das Menü auf der rechten Seite und wählen Sie Apache Flink-Anwendungen und dann Streaming-Anwendung erstellen. Wählen Sie alternativ im Container Erste Schritte auf der Startseite die Option Streaming-Anwendung erstellen aus.
-
Gehen Sie auf der Seite Streaming-Anwendung erstellen wie folgt vor:
-
Wählen Sie eine Methode, um die Stream-Verarbeitungsanwendung einzurichten: Wählen Sie Von Grund auf neu erstellen.
-
Apache Flink-Konfiguration, Flink-Version der Anwendung: Wählen Sie Apache Flink 1.19.
-
-
Konfigurieren Sie Ihre Anwendung
-
Name der Anwendung: Geben Sie ein
MyApplication
. -
Beschreibung: eingeben
My java test app
. -
Zugriff auf Anwendungsressourcen: Wählen Sie „IAMRolle
kinesis-analytics-MyApplication-us-east-1
mit den erforderlichen Richtlinien erstellen/aktualisieren“.
-
-
Konfigurieren Sie Ihre Vorlage für Anwendungseinstellungen
-
Vorlagen: Wählen Sie Entwicklung.
-
-
Wählen Sie unten auf der Seite die Option Streaming-Anwendung erstellen aus.
Anmerkung
Wenn Sie mithilfe der Konsole eine Managed Service for Apache Flink-Anwendung erstellen, haben Sie die Möglichkeit, eine IAM Rolle und Richtlinie für Ihre Anwendung erstellen zu lassen. Ihre Anwendung verwendet diese Rolle und Richtlinie für den Zugriff auf ihre abhängigen Ressourcen. Diese IAM Ressourcen werden anhand Ihres Anwendungsnamens und Ihrer Region wie folgt benannt:
-
Richtlinie:
kinesis-analytics-service-
MyApplication
-us-east-1
-
Rolle:
kinesisanalytics-
MyApplication
-us-east-1
Amazon Managed Service für Apache Flink war früher als Kinesis Data Analytics bekannt. Dem Namen der Ressourcen, die automatisch erstellt werden, wird aus Gründen der Abwärtskompatibilität ein Präfix kinesis-analytics-
vorangestellt.
Bearbeiten Sie die Richtlinie IAM
Bearbeiten Sie die IAM Richtlinie, um Berechtigungen für den Zugriff auf die Kinesis-Datenstreams hinzuzufügen.
Um die Richtlinie zu bearbeiten
Öffnen Sie die IAM Konsole unter https://console.aws.amazon.com/iam/
. -
Wählen Sie Policies (Richtlinien). Wählen Sie die
kinesis-analytics-service-MyApplication-us-east-1
-Richtlinie aus, die die Konsole im vorherigen Abschnitt für Sie erstellt hat. -
Wählen Sie Bearbeiten und dann die JSONRegisterkarte.
-
Fügen Sie den markierten Abschnitt der folgenden Beispielrichtlinie der Richtlinie hinzu. Ersetzen Sie das Beispielkonto IDs (
012345678901
) durch Ihre Konto-ID.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:
012345678901
:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901
:stream/ExampleOutputStream" } -
Wählen Sie unten auf der Seite Weiter und dann Änderungen speichern.
Konfigurieren Sie die Anwendung
Bearbeiten Sie die Anwendungskonfiguration, um das Anwendungscode-Artefakt festzulegen.
Um die Konfiguration zu bearbeiten
-
Wählen Sie auf der MyApplicationSeite Configure aus.
-
Gehen Sie im Abschnitt Speicherort des Anwendungscodes wie folgt vor:
-
Wählen Sie für Amazon S3 S3-Bucket den Bucket aus, den Sie zuvor für den Anwendungscode erstellt haben. Wählen Sie Durchsuchen und wählen Sie den richtigen Bucket aus. Wählen Sie dann Auswählen aus. Klicken Sie nicht auf den Bucket-Namen.
-
Geben Sie als Pfad zum Amazon-S3-Objekt den Wert
amazon-msf-java-stream-app-1.0.jar
ein.
-
-
Wählen Sie für Zugriffsberechtigungen die Option IAMRolle
kinesis-analytics-MyApplication-us-east-1
mit den erforderlichen Richtlinien erstellen/aktualisieren aus. -
Fügen Sie im Abschnitt Runtime-Eigenschaften die folgenden Eigenschaften hinzu.
-
Wählen Sie Neues Element hinzufügen und fügen Sie jeden der folgenden Parameter hinzu:
Gruppen-ID Schlüssel Wert InputStream0
stream.name
ExampleInputStream
InputStream0
aws.region
us-east-1
OutputStream0
stream.name
ExampleOutputStream
OutputStream0
aws.region
us-east-1
-
Ändern Sie keinen der anderen Abschnitte.
-
Wählen Sie Änderungen speichern.
Anmerkung
Wenn Sie sich dafür entscheiden, die CloudWatch Amazon-Protokollierung zu aktivieren, erstellt Managed Service für Apache Flink eine Protokollgruppe und einen Protokollstream für Sie. Die Namen dieser Ressourcen lauten wie folgt:
-
Protokollgruppe:
/aws/kinesis-analytics/MyApplication
-
Protokollstream:
kinesis-analytics-log-stream
Führen Sie die Anwendung aus.
Die Anwendung ist jetzt konfiguriert und kann ausgeführt werden.
Ausführen der Anwendung
-
Wählen Sie auf der Konsole für Amazon Managed Service für Apache Flink My Application und anschließend Run aus.
-
Wählen Sie auf der nächsten Seite, der Konfigurationsseite für die Anwendungswiederherstellung, die Option Mit neuestem Snapshot ausführen und anschließend Ausführen aus.
Der Status in den Anwendungsdetails wechselt von
Ready
zuStarting
und dann zu demRunning
Zeitpunkt, an dem die Anwendung gestartet wurde.
Wenn sich die Anwendung im Running
Status befindet, können Sie jetzt das Flink-Dashboard öffnen.
So öffnen Sie das -Dashboard
-
Wählen Sie Apache Flink-Dashboard öffnen. Das Dashboard wird auf einer neuen Seite geöffnet.
-
Wählen Sie in der Liste „Laufende Jobs“ den einzelnen Job aus, den Sie sehen können.
Anmerkung
Wenn Sie die Runtime-Eigenschaften festgelegt oder die IAM Richtlinien falsch bearbeitet haben, ändert sich der Anwendungsstatus möglicherweise in
Running
, aber das Flink-Dashboard zeigt an, dass der Job kontinuierlich neu gestartet wird. Dies ist ein häufiges Fehlerszenario, wenn die Anwendung falsch konfiguriert ist oder keine Berechtigungen für den Zugriff auf die externen Ressourcen hat.In diesem Fall überprüfen Sie im Flink-Dashboard auf der Registerkarte Ausnahmen die Ursache des Problems.
Beobachten Sie die Metriken der laufenden Anwendung
Auf der MyApplicationSeite, im Abschnitt CloudWatch Amazon-Metriken, können Sie einige der grundlegenden Metriken der laufenden Anwendung sehen.
Um die Metriken einzusehen
-
Wählen Sie neben der Schaltfläche „Aktualisieren“ in der Dropdownliste die Option 10 Sekunden aus.
-
Wenn die Anwendung läuft und fehlerfrei ist, können Sie sehen, dass die Uptime-Metrik kontinuierlich zunimmt.
-
Die Metrik für vollständige Neustarts sollte Null sein. Wenn sie zunimmt, kann es bei der Konfiguration zu Problemen kommen. Um das Problem zu untersuchen, überprüfen Sie den Tab Ausnahmen im Flink-Dashboard.
-
Die Metrik „Anzahl fehlgeschlagener Checkpoints“ sollte in einer fehlerfreien Anwendung Null sein.
Anmerkung
Dieses Dashboard zeigt einen festen Satz von Metriken mit einer Granularität von 5 Minuten. Sie können ein benutzerdefiniertes Anwendungs-Dashboard mit beliebigen Metriken im CloudWatch Dashboard erstellen.
Beobachten Sie die Ausgabedaten in Kinesis-Streams
Vergewissern Sie sich, dass Sie weiterhin Daten in der Eingabe veröffentlichen, entweder mit dem Python-Skript oder dem Kinesis Data Generator.
Sie können jetzt die Ausgabe der Anwendung beobachten, die auf Managed Service for Apache Flink ausgeführt wird, indem Sie den Datenviewer in der verwenden https://console.aws.amazon.com/kinesis/
Um die Ausgabe anzusehen
Öffnen Sie die Kinesis-Konsole unter https://console.aws.amazon.com/kinesis.
-
Stellen Sie sicher, dass die Region mit der Region übereinstimmt, die Sie für die Ausführung dieses Tutorials verwenden. Standardmäßig ist es US-East-1US East (Nord-Virginia). Ändern Sie bei Bedarf die Region.
-
Wählen Sie Data Streams aus.
-
Wählen Sie den Stream aus, den Sie beobachten möchten. Verwenden Sie für dieses Tutorial
ExampleOutputStream
. -
Wählen Sie die Registerkarte Datenanzeige.
-
Wählen Sie einen beliebigen Shard aus, behalten Sie „Letzte“ als Startposition bei und wählen Sie dann „Datensätze abrufen“. Möglicherweise wird die Fehlermeldung „Für diese Anfrage wurde kein Datensatz gefunden“ angezeigt. Wenn ja, wählen Sie „Erneut versuchen, Datensätze abzurufen“. Die neuesten im Stream veröffentlichten Datensätze werden angezeigt.
-
Wählen Sie den Wert in der Datenspalte aus, um den Inhalt des Datensatzes im JSON Format zu überprüfen.
Beenden Sie die Anwendung
Um die Anwendung zu beenden, rufen Sie die Konsolenseite der Anwendung Managed Service for Apache Flink mit dem Namen auf. MyApplication
So stoppen Sie die Anwendung
-
Wählen Sie in der Dropdownliste Aktion die Option Stopp aus.
-
Der Status in den Anwendungsdetails wechselt von
Running
zu und dann zu demReady
ZeitpunktStopping
, an dem die Anwendung vollständig gestoppt wurde.Anmerkung
Vergessen Sie nicht, auch das Senden von Daten aus dem Python-Skript oder dem Kinesis Data Generator an den Eingabestream zu beenden.