Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.
Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
In dieser Übung erstellen Sie eine Managed Service für Apache Flink-Anwendung mit Kinesis-Datenströmen als Quelle und Senke.
Dieser Abschnitt enthält die folgenden Schritte.
Erstellen Sie abhängige Ressourcen
Bevor Sie für diese Übung einen Managed Service für Apache Flink erstellen, erstellen Sie die folgenden abhängigen Ressourcen:
-
Ein Amazon S3 S3-Bucket zum Speichern des Anwendungscodes und zum Schreiben der Anwendungsausgabe.
Anmerkung
In diesem Tutorial wird davon ausgegangen, dass Sie Ihre Anwendung in der Region us-east-1 bereitstellen. Wenn Sie eine andere Region verwenden, müssen Sie alle Schritte entsprechend anpassen.
Erstellen eines Amazon-S3-Buckets
Sie können ein Amazon-S3-Bucket mithilfe der Konsole erstellen. Anweisungen zum Erstellen dieser Ressource finden Sie in den folgenden Themen:
-
Wie erstelle ich einen S3-Bucket? im Amazon Simple Storage Service-Benutzerhandbuch. Geben Sie dem Amazon S3 S3-Bucket einen weltweit eindeutigen Namen, indem Sie Ihren Anmeldenamen anhängen.
Anmerkung
Stellen Sie sicher, dass Sie den Bucket in der Region erstellen, die Sie für dieses Tutorial verwenden. Die Standardeinstellung für das Tutorial ist us-east-1.
Sonstige Ressourcen
Wenn Sie Ihre Anwendung erstellen, erstellt Managed Service for Apache Flink die folgenden CloudWatch Amazon-Ressourcen, sofern sie noch nicht vorhanden sind:
-
Eine Protokollgruppe namens
/AWS/KinesisAnalytics-java/<my-application>
. -
Einen Protokollstream mit dem Namen
kinesis-analytics-log-stream
.
Einrichten der lokalen Entwicklungsumgebung
Für die Entwicklung und das Debuggen können Sie die Apache Flink-Anwendung auf Ihrem Computer direkt von der IDE Ihrer Wahl aus ausführen. Alle Apache Flink-Abhängigkeiten werden mit Maven als normale Java-Abhängigkeiten behandelt.
Anmerkung
Auf Ihrem Entwicklungscomputer müssen Sie Java JDK 11, Maven und Git installiert haben. Wir empfehlen Ihnen, eine Entwicklungsumgebung wie Eclipse, Java Neon
Authentifizieren Sie Ihre Sitzung AWS
Die Anwendung verwendet Kinesis-Datenströme, um Daten zu veröffentlichen. Bei der lokalen Ausführung benötigen Sie eine gültige AWS authentifizierte Sitzung mit Schreibberechtigungen in den Kinesis-Datenstrom. Verwenden Sie die folgenden Schritte, um Ihre Sitzung zu authentifizieren:
-
Wenn Sie das Profil AWS CLI und ein benanntes Profil mit gültigen Anmeldeinformationen nicht konfiguriert haben, finden Sie weitere Informationen unter. Richten Sie das AWS Command Line Interface ()AWS CLI ein
-
Wenn Ihre IDE über ein Plug-in zur Integration verfügt AWS, können Sie dieses verwenden, um die Anmeldeinformationen an die Anwendung zu übergeben, die in der IDE ausgeführt wird. Weitere Informationen finden Sie unter AWS Toolkit für IntelliJ IDEA
und AWS Toolkit zum Kompilieren der Anwendung oder zum Ausführen von Eclipse.
Laden Sie den Apache Flink-Streaming-Java-Code herunter und untersuchen Sie ihn
Der Anwendungscode für dieses Beispiel ist verfügbar unter GitHub.
So laden Sie den Java-Anwendungscode herunter
-
Klonen Sie das Remote-Repository, indem Sie den folgenden Befehl verwenden:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
-
Navigieren Sie zum
./java/GettingStartedTable
Verzeichnis .
Überprüfen Sie die Anwendungskomponenten
Die Anwendung ist vollständig in der com.amazonaws.services.msf.BasicTableJob
Klasse implementiert. Die main()
Methode definiert Quellen, Transformationen und Senken. Die Ausführung wird durch eine Ausführungsanweisung am Ende dieser Methode initiiert.
Anmerkung
Für ein optimales Entwicklererlebnis ist die Anwendung so konzipiert, dass sie ohne Codeänderungen sowohl auf Amazon Managed Service für Apache Flink als auch lokal für die Entwicklung in Ihrer IDE ausgeführt werden kann.
-
Um die Laufzeitkonfiguration so zu lesen, dass sie funktioniert, wenn sie in Amazon Managed Service for Apache Flink und in Ihrer IDE ausgeführt wird, erkennt die Anwendung automatisch, ob sie lokal in der IDE eigenständig ausgeführt wird. In diesem Fall lädt die Anwendung die Laufzeitkonfiguration anders:
-
Wenn die Anwendung feststellt, dass sie in Ihrer IDE im Standalone-Modus ausgeführt wird, erstellen Sie die
application_properties.json
Datei, die im Ressourcenordner des Projekts enthalten ist. Der Inhalt der Datei folgt. -
Wenn die Anwendung in Amazon Managed Service für Apache Flink ausgeführt wird, lädt das Standardverhalten die Anwendungskonfiguration aus den Laufzeiteigenschaften, die Sie in der Anwendung Amazon Managed Service für Apache Flink definieren. Siehe Erstellen und konfigurieren Sie die Anwendung Managed Service für Apache Flink.
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
-
-
Die
main()
Methode definiert den Anwendungsdatenfluss und führt ihn aus.-
Initialisiert die Standard-Streaming-Umgebungen. In diesem Beispiel zeigen wir, wie sowohl die
StreamExecutionEnvironment
zur Verwendung mit der DataStream API als auch dieStreamTableEnvironment
zur Verwendung mit SQL und der Tabellen-API erstellt werden. Die beiden Umgebungsobjekte sind zwei separate Verweise auf dieselbe Laufzeitumgebung, die unterschiedlich verwendet werden soll APIs.StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
-
Laden Sie die Konfigurationsparameter der Anwendung. Dadurch werden sie automatisch von der richtigen Stelle geladen, je nachdem, wo die Anwendung ausgeführt wird:
Map<String, Properties> applicationParameters = loadApplicationProperties(env);
-
Der FileSystem Sink-Connector
, den die Anwendung verwendet, um Ergebnisse in Amazon S3 S3-Ausgabedateien zu schreiben, wenn Flink einen Checkpoint abschließt. Sie müssen Checkpoints aktivieren, um Dateien in das Ziel zu schreiben. Wenn die Anwendung in Amazon Managed Service für Apache Flink ausgeführt wird, steuert die Anwendungskonfiguration den Checkpoint und aktiviert ihn standardmäßig. Umgekehrt sind Checkpoints bei lokaler Ausführung standardmäßig deaktiviert. Die Anwendung erkennt, dass sie lokal ausgeführt wird, und konfiguriert Checkpoints alle 5.000 ms. if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); }
-
Diese Anwendung empfängt keine Daten von einer tatsächlichen externen Quelle. Sie generiert zufällige Daten, die über den DataGen Konnektor
verarbeitet werden. Dieser Konnektor ist für DataStream API, SQL und Tabellen-API verfügbar. Um die Integration zwischen beiden zu demonstrieren APIs, verwendet die Anwendung die DataStram API-Version, da sie mehr Flexibilität bietet. Jeder Datensatz wird durch eine Generatorfunktion generiert, die StockPriceGeneratorFunction
in diesem Fall aufgerufen wird und in der Sie benutzerdefinierte Logik einfügen können.DataGeneratorSource<StockPrice> source = new DataGeneratorSource<>( new StockPriceGeneratorFunction(), Long.MAX_VALUE, RateLimiterStrategy.perSecond(recordPerSecond), TypeInformation.of(StockPrice.class));
-
In der DataStream API können Datensätze benutzerdefinierte Klassen haben. Klassen müssen bestimmten Regeln folgen, damit Flink sie als Datensatz verwenden kann. Weitere Informationen finden Sie unter Unterstützte Datentypen
. In diesem Beispiel ist die StockPrice
Klasse ein POJO. -
Die Quelle wird dann an die Ausführungsumgebung angehängt, wodurch ein
DataStream
ofStockPrice
generiert wird. Diese Anwendung verwendet keine Semantik zur Ereigniszeitund generiert kein Wasserzeichen. Führen Sie die DataGenerator Quelle unabhängig von der Parallelität der restlichen Anwendung mit einer Parallelität von 1 aus. DataStream<StockPrice> stockPrices = env.fromSource( source, WatermarkStrategy.noWatermarks(), "data-generator" ).setParallelism(1);
-
Was im Datenverarbeitungsablauf folgt, wird mithilfe der Tabellen-API und SQL definiert. Dazu konvertieren wir den Wert DataStream von StockPrices in eine Tabelle. Das Schema der Tabelle wird automatisch aus der
StockPrice
Klasse abgeleitet.Table stockPricesTable = tableEnv.fromDataStream(stockPrices);
-
Der folgende Codeausschnitt zeigt, wie eine Ansicht und eine Abfrage mithilfe der programmatischen Tabellen-API definiert werden:
Table filteredStockPricesTable = stockPricesTable. select( $("eventTime").as("event_time"), $("ticker"), $("price"), dateFormat($("eventTime"), "yyyy-MM-dd").as("dt"), dateFormat($("eventTime"), "HH").as("hr") ).where($("price").isGreater(50)); tableEnv.createTemporaryView("filtered_stock_prices", filteredStockPricesTable);
-
Eine Sink-Tabelle ist so definiert, dass sie die Ergebnisse als JSON-Dateien in einen Amazon S3 S3-Bucket schreibt. Um den Unterschied bei der programmgesteuerten Definition einer Ansicht zu verdeutlichen, wird die Sink-Tabelle mit der Tabellen-API mithilfe von SQL definiert.
tableEnv.executeSql("CREATE TABLE s3_sink (" + "eventTime TIMESTAMP(3)," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ") PARTITIONED BY ( dt, hr ) WITH (" + "'connector' = 'filesystem'," + "'fmat' = 'json'," + "'path' = 's3a://" + s3Path + "'" + ")");
-
Der letzte Schritt von besteht darin
executeInsert()
, die gefilterte Aktienkursansicht in die Sink-Tabelle einzufügen. Diese Methode initiiert die Ausführung des Datenflusses, den wir bisher definiert haben.filteredStockPricesTable.executeInsert("s3_sink");
-
Verwenden Sie die Datei pom.xml
Die Datei pom.xml definiert alle Abhängigkeiten, die von der Anwendung benötigt werden, und richtet das Maven Shade-Plugin ein, um das Fat-Jar zu erstellen, das alle von Flink benötigten Abhängigkeiten enthält.
-
Einige Abhängigkeiten haben einen Gültigkeitsbereich.
provided
Diese Abhängigkeiten sind automatisch verfügbar, wenn die Anwendung in Amazon Managed Service for Apache Flink ausgeführt wird. Sie sind für die Anwendung oder für die Anwendung lokal in Ihrer IDE erforderlich. Weitere Informationen finden Sie unter (Update auf TableAPI)Führen Sie Ihre Anwendung lokal aus. Stellen Sie sicher, dass Sie dieselbe Flink-Version wie die Runtime verwenden, die Sie in Amazon Managed Service for Apache Flink verwenden werden. Um die TableAPI und SQL zu verwenden, müssen Sie dasflink-table-planner-loader
und angebenflink-table-runtime-dependencies
, beide mit Gültigkeitsbereich.provided
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
Sie müssen dem POM mit dem Standardbereich zusätzliche Apache Flink-Abhängigkeiten hinzufügen. Zum Beispiel der DataGen Konnektor
, der FileSystem SQL-Konnektor und das JSON-Format . <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-datagen</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
-
Um bei lokaler Ausführung in Amazon S3 zu schreiben, ist das S3 Hadoop File System ebenfalls im
provided
Lieferumfang enthalten.<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-s3-fs-hadoop</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
Das Maven Java Compiler-Plugin stellt sicher, dass der Code mit Java 11 kompiliert wird, der JDK-Version, die derzeit von Apache Flink unterstützt wird.
-
Das Maven Shade-Plugin packt das Fat-Jar, mit Ausnahme einiger Bibliotheken, die von der Runtime bereitgestellt werden. Es spezifiziert auch zwei Transformatoren: und.
ServicesResourceTransformer
ManifestResourceTransformer
Letzteres konfiguriert die Klasse, die diemain
Methode zum Starten der Anwendung enthält. Wenn Sie die Hauptklasse umbenennen, vergessen Sie nicht, diesen Transformator zu aktualisieren. -
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>
Führen Sie Ihre Anwendung lokal aus
Sie können Ihre Flink-Anwendung lokal in Ihrer IDE ausführen und debuggen.
Anmerkung
Bevor Sie fortfahren, stellen Sie sicher, dass die Eingabe- und Ausgabestreams verfügbar sind. Siehe Erstellen Sie zwei Amazon Kinesis Kinesis-Datenstreams. Stellen Sie außerdem sicher, dass Sie über Lese- und Schreibberechtigungen für beide Streams verfügen. Siehe Authentifizieren Sie Ihre Sitzung AWS.
Für die Einrichtung der lokalen Entwicklungsumgebung sind Java 11 JDK, Apache Maven und eine IDE für die Java-Entwicklung erforderlich. Stellen Sie sicher, dass Sie die erforderlichen Voraussetzungen erfüllen. Siehe Erfüllen Sie die Voraussetzungen für das Abschließen der Übungen.
Importieren Sie das Java-Projekt in Ihre IDE
Um mit der Arbeit an der Anwendung in Ihrer IDE zu beginnen, müssen Sie sie als Java-Projekt importieren.
Das von Ihnen geklonte Repository enthält mehrere Beispiele. Jedes Beispiel ist ein separates Projekt. Importieren Sie für dieses Tutorial den Inhalt im ./jave/GettingStartedTable
Unterverzeichnis in Ihre IDE.
Fügen Sie den Code mithilfe von Maven als vorhandenes Java-Projekt ein.
Anmerkung
Der genaue Vorgang zum Importieren eines neuen Java-Projekts hängt von der verwendeten IDE ab.
Ändern Sie die lokale Anwendungskonfiguration
Bei der lokalen Ausführung verwendet die Anwendung die Konfiguration in der application_properties.json
Datei im Ressourcenordner des Projekts unter./src/main/resources
. Für diese Tutorial-Anwendung sind die Konfigurationsparameter der Name des Buckets und der Pfad, in den die Daten geschrieben werden.
Bearbeiten Sie die Konfiguration und ändern Sie den Namen des Amazon S3 S3-Buckets so, dass er mit dem Bucket übereinstimmt, den Sie zu Beginn dieses Tutorials erstellt haben.
[ { "PropertyGroupId": "bucket", "PropertyMap": { "name": "
<bucket-name>
", "path": "output" } } ]
Anmerkung
Die Konfigurationseigenschaft name
darf beispielsweise nur den Bucket-Namen enthaltenmy-bucket-name
. Geben Sie kein Präfix wie s3://
oder einen abschließenden Schrägstrich ein.
Wenn Sie den Pfad ändern, lassen Sie alle führenden oder nachfolgenden Schrägstriche weg.
Richten Sie Ihre IDE-Run-Konfiguration ein
Sie können die Flink-Anwendung direkt von Ihrer IDE aus ausführen und debuggen, indem Sie die Hauptklasse ausführencom.amazonaws.services.msf.BasicTableJob
, wie Sie jede Java-Anwendung ausführen würden. Bevor Sie die Anwendung ausführen, müssen Sie die Run-Konfiguration einrichten. Das Setup hängt von der IDE ab, die Sie verwenden. Siehe beispielsweise Run/Debug-Konfigurationen
-
Fügen Sie die
provided
Abhängigkeiten zum Klassenpfad hinzu. Dies ist erforderlich, um sicherzustellen, dass die Abhängigkeiten mitprovided
Gültigkeitsbereich an die Anwendung übergeben werden, wenn sie lokal ausgeführt wird. Ohne diese Einrichtung zeigt die Anwendung sofort einenclass not found
Fehler an. -
Übergeben Sie die AWS Anmeldeinformationen für den Zugriff auf die Kinesis-Streams an die Anwendung. Am schnellsten ist es, das AWS Toolkit für IntelliJ
IDEA zu verwenden. Mit diesem IDE-Plugin in der Run-Konfiguration können Sie ein bestimmtes Profil auswählen. AWS AWS Die Authentifizierung erfolgt mit diesem Profil. Sie müssen die AWS Anmeldeinformationen nicht direkt weitergeben. -
Stellen Sie sicher, dass die IDE die Anwendung mit JDK 11 ausführt.
Führen Sie die Anwendung in Ihrer IDE aus
Nachdem Sie die Run-Konfiguration für eingerichtet habenBasicTableJob
, können Sie sie wie eine normale Java-Anwendung ausführen oder debuggen.
Anmerkung
Sie können das von Maven generierte Fat-Jar nicht direkt über die java -jar
...
Befehlszeile ausführen. Dieses JAR enthält nicht die Kernabhängigkeiten von Flink, die für die eigenständige Ausführung der Anwendung erforderlich sind.
Wenn die Anwendung erfolgreich gestartet wird, protokolliert sie einige Informationen über den eigenständigen Minicluster und die Initialisierung der Konnektoren. Darauf folgen eine Reihe von INFO- und einige WARN-Logs, die Flink normalerweise beim Start der Anwendung ausgibt.
21:28:34,982 INFO com.amazonaws.services.msf.BasicTableJob
[] - Loading application properties from 'flink-application-properties-dev.json'
21:28:35,149 INFO com.amazonaws.services.msf.BasicTableJob
[] - s3Path is ExampleBucket/my-output-bucket
...
Nach Abschluss der Initialisierung gibt die Anwendung keine weiteren Protokolleinträge aus. Während der Datenfluss erfolgt, wird kein Protokoll ausgegeben.
Um zu überprüfen, ob die Anwendung Daten korrekt verarbeitet, können Sie den Inhalt des Ausgabe-Buckets überprüfen, wie im folgenden Abschnitt beschrieben.
Anmerkung
Es ist das normale Verhalten einer Flink-Anwendung, keine Protokolle über fließende Daten auszugeben. Das Ausgeben von Protokollen für jeden Datensatz mag für das Debuggen praktisch sein, kann aber bei der Ausführung in der Produktion zu erheblichem Mehraufwand führen.
Beobachten Sie, wie die Anwendung Daten in einen S3-Bucket schreibt
Diese Beispielanwendung generiert intern zufällige Daten und schreibt diese Daten in den von Ihnen konfigurierten Ziel-S3-Bucket. Sofern Sie den Standardkonfigurationspfad nicht geändert haben, werden die Daten im folgenden Format ./output/<yyyy-MM-dd>/<HH>
in den output
Pfad geschrieben, gefolgt von der Daten- und Stundenpartitionierung.
Der FileSystem Sink-Connector
if (env instanceof LocalStreamEnvironment) {
env.enableCheckpointing(5000);
}
Um den S3-Bucket zu durchsuchen und die von der Anwendung geschriebene Datei zu beobachten
-
Öffnen Sie die Amazon S3 S3-Konsole unter https://console.aws.amazon.com/s3/
.
-
Wählen Sie den Bucket aus, den Sie zuvor erstellt haben.
-
Navigieren Sie zum
output
Pfad und dann zu den Datums- und Stundenordnern, die der aktuellen Uhrzeit in der UTC-Zeitzone entsprechen. -
Aktualisieren Sie regelmäßig, um zu beobachten, dass alle 5 Sekunden neue Dateien angezeigt werden.
-
Wählen Sie eine Datei aus und laden Sie sie herunter, um den Inhalt zu beobachten.
Anmerkung
Standardmäßig haben die Dateien keine Erweiterungen. Der Inhalt ist als JSON formatiert. Sie können die Dateien mit einem beliebigen Texteditor öffnen, um den Inhalt zu überprüfen.
Stoppen Sie, dass Ihre Anwendung lokal ausgeführt wird
Stoppen Sie die Anwendung, die in Ihrer IDE ausgeführt wird. Die IDE bietet normalerweise eine „Stopp“ -Option. Der genaue Standort und die Methode hängen von der IDE ab.
Kompilieren und verpacken Sie Ihren Anwendungscode
In diesem Abschnitt verwenden Sie Apache Maven, um den Java-Code zu kompilieren und in eine JAR-Datei zu packen. Sie können Ihren Code mit dem Maven-Befehlszeilentool oder Ihrer IDE kompilieren und verpacken.
Um mit der Maven-Befehlszeile zu kompilieren und zu paketieren
Gehen Sie in das Verzeichnis, das das GettingStarted Jave-Projekt enthält, und führen Sie den folgenden Befehl aus:
$ mvn package
Um mit Ihrer IDE zu kompilieren und zu paketieren
Führen Sie es mvn package
von Ihrer IDE-Maven-Integration aus aus.
In beiden Fällen target/amazon-msf-java-table-app-1.0.jar
wird die JAR-Datei erstellt.
Anmerkung
Wenn Sie ein Build-Projekt von Ihrer IDE aus ausführen, wird die JAR-Datei möglicherweise nicht erstellt.
Laden Sie die JAR-Datei mit dem Anwendungscode hoch
In diesem Abschnitt laden Sie die JAR-Datei, die Sie im vorherigen Abschnitt erstellt haben, in den Amazon S3 S3-Bucket hoch, den Sie zu Beginn dieses Tutorials erstellt haben. Wenn Sie es schon getan haben, schließen Sie es abErstellen eines Amazon-S3-Buckets.
So laden Sie den Anwendungscode hoch
Öffnen Sie die Amazon S3 S3-Konsole unter https://console.aws.amazon.com/s3/
. -
Wählen Sie den Bucket aus, den Sie zuvor für den Anwendungscode erstellt haben.
-
Wählen Sie Feld hochladen.
-
Klicken Sie auf Add files.
-
Navigieren Sie zu der im vorherigen Abschnitt generierten JAR-Datei:
target/amazon-msf-java-table-app-1.0.jar
. -
Wählen Sie Hochladen, ohne andere Einstellungen zu ändern.
Warnung
Stellen Sie sicher, dass Sie die richtige JAR-Datei in auswählen
<repo-dir>/java/GettingStarted/target/amazon/msf-java-table-app-1.0.jar
.Das Zielverzeichnis enthält auch andere JAR-Dateien, die Sie nicht hochladen müssen.
Erstellen und konfigurieren Sie die Anwendung Managed Service für Apache Flink
Sie können eine Managed Service for Apache Flink-Anwendung entweder mit der Konsole oder der erstellen und konfigurieren. AWS CLI Für dieses Tutorial verwenden Sie die Konsole.
Anmerkung
Wenn Sie die Anwendung mithilfe der Konsole erstellen, werden Ihre AWS Identity and Access Management (IAM) und Amazon CloudWatch Logs-Ressourcen für Sie erstellt. Wenn Sie die Anwendung mithilfe von erstellen AWS CLI, müssen Sie diese Ressourcen separat erstellen.
Erstellen der Anwendung
Öffnen Sie die Managed Service for Apache Flink-Konsole unter https://console.aws.amazon.com /flink
-
Stellen Sie sicher, dass die richtige Region ausgewählt ist: USA Ost (Nord-Virginia) us-east-1.
-
Wählen Sie im rechten Menü Apache Flink-Anwendungen und dann Streaming-Anwendung erstellen. Wählen Sie alternativ im Bereich Erste Schritte auf der Startseite die Option Streaming-Anwendung erstellen aus.
-
Gehen Sie auf der Seite Streaming-Anwendung erstellen wie folgt vor:
-
Wählen Sie für Wählen Sie eine Methode zum Einrichten der Streamverarbeitungsanwendung die Option Von Grund auf neu erstellen aus.
-
Wählen Sie für die Apache Flink-Konfiguration und die Version von Application Flink die Option Apache Flink 1.19.
-
Gehen Sie im Abschnitt Anwendungskonfiguration wie folgt vor:
-
Geben Sie als Anwendungsname ein
MyApplication
. -
Geben Sie für Beschreibung den Text
My Java Table API test app
ein. -
Wählen Sie für Zugriff auf Anwendungsressourcen die Option Create/update IAM role kinesis-analytics-MyApplication-us -east-1 with required policies aus.
-
-
Gehen Sie unter Vorlage für Anwendungseinstellungen wie folgt vor:
-
Wählen Sie für Vorlagen die Option Entwicklung aus.
-
-
-
Wählen Sie Streaming-Anwendung erstellen aus.
Anmerkung
Beim Erstellen einer Anwendung von Managed Service für Apache Flink mit der Konsole haben Sie die Möglichkeit, eine IAM-Rolle und -Richtlinie für Ihre Anwendung erstellen zu lassen. Ihre Anwendung verwendet diese Rolle und Richtlinie für den Zugriff auf ihre abhängigen Ressourcen. Diese IAM-Ressourcen werden unter Verwendung Ihres Anwendungsnamens und der Region wie folgt benannt:
-
Richtlinie:
kinesis-analytics-service-
MyApplication
-us-east-1
-
Rolle:
kinesisanalytics-
MyApplication
-us-east-1
Bearbeiten Sie die IAM-Richtlinie
Bearbeiten Sie die IAM-Richtlinie zum Hinzufügen von Berechtigungen für den Zugriff auf den Amazon S3-Bucket.
Um die IAM-Richtlinie zu bearbeiten, um S3-Bucket-Berechtigungen hinzuzufügen
Öffnen Sie unter https://console.aws.amazon.com/iam/
die IAM-Konsole. -
Wählen Sie Policies (Richtlinien). Wählen Sie die
kinesis-analytics-service-MyApplication-us-east-1
-Richtlinie aus, die die Konsole im vorherigen Abschnitt für Sie erstellt hat. -
Wählen Sie Bearbeiten und dann die Registerkarte JSON.
-
Fügen Sie den markierten Abschnitt der folgenden Beispielrichtlinie der Richtlinie hinzu. Ersetzen Sie die Beispielkonto-ID (
012345678901
) durch Ihre Konto-ID und<bucket-name>
den Namen des S3-Buckets, den Sie erstellt haben.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:
012345678901
:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "WriteOutputBucket", "Effect": "Allow", "Action": "s3:*", Resource": [ "arn:aws:s3:::my-bucket" ] }
] } -
Wählen Sie Weiter und dann Änderungen speichern aus.
Konfigurieren Sie die Anwendung
Bearbeiten Sie die Anwendung, um das Anwendungscode-Artefakt festzulegen.
Konfigurieren der Anwendung
-
Wählen Sie auf der MyApplicationSeite Configure aus.
-
Wählen Sie im Abschnitt Speicherort des Anwendungscodes die Option Konfigurieren aus.
-
Wählen Sie für Amazon S3 S3-Bucket den Bucket aus, den Sie zuvor für den Anwendungscode erstellt haben. Wählen Sie Durchsuchen und wählen Sie den richtigen Bucket aus. Wählen Sie dann Wählen aus. Klicken Sie nicht auf den Bucket-Namen.
-
Geben Sie als Pfad zum Amazon-S3-Objekt den Wert
amazon-msf-java-table-app-1.0.jar
ein.
-
-
Wählen Sie für Zugriffsberechtigungen die Option Erstellen / Aktualisieren Sie IAM-Rolle
kinesis-analytics-MyApplication-us-east-1
aus. -
Fügen Sie im Abschnitt Runtime-Eigenschaften die folgenden Eigenschaften hinzu.
-
Wählen Sie Neues Element hinzufügen und fügen Sie jeden der folgenden Parameter hinzu:
Gruppen-ID Schlüssel Value (Wert) bucket
name
your-bucket-name
bucket
path
output
-
Ändern Sie keine anderen Einstellungen.
-
Wählen Sie Änderungen speichern.
Anmerkung
Wenn Sie sich dafür entscheiden, die CloudWatch Amazon-Protokollierung zu aktivieren, erstellt Managed Service für Apache Flink eine Protokollgruppe und einen Protokollstream für Sie. Die Namen dieser Ressourcen lauten wie folgt:
-
Protokollgruppe:
/aws/kinesis-analytics/MyApplication
-
Protokollstream:
kinesis-analytics-log-stream
Führen Sie die Anwendung aus.
Die Anwendung ist jetzt konfiguriert und kann ausgeführt werden.
Ausführen der Anwendung
-
Kehren Sie zur Konsolenseite in Amazon Managed Service für Apache Flink zurück und wählen Sie MyApplication.
-
Wählen Sie Ausführen, um die Anwendung zu starten.
-
Wählen Sie in der Konfiguration zur Anwendungswiederherstellung die Option Mit neuestem Snapshot ausführen aus.
-
Wählen Sie Ausführen aus.
Der Status in den Anwendungsdetails wechselt von
Ready
zuStarting
und dann zu „Running
Nach dem Start der Anwendung“.
Wenn sich die Anwendung im Running
Status befindet, können Sie das Flink-Dashboard öffnen.
Um das Dashboard zu öffnen und den Job anzusehen
-
Wählen Sie Apache Flink-Dashboard öffnen. Das Dashboard wird auf einer neuen Seite geöffnet.
-
Wählen Sie in der Liste „Laufende Jobs“ den einzelnen Job aus, den Sie sehen können.
Anmerkung
Wenn Sie die Laufzeiteigenschaften festgelegt oder die IAM-Richtlinien falsch bearbeitet haben, ändert sich der Anwendungsstatus möglicherweise auf, aber das Flink-Dashboard zeigt an
Running
, dass der Job kontinuierlich neu gestartet wird. Dies ist ein häufiges Fehlerszenario, wenn die Anwendung falsch konfiguriert ist oder nicht über die erforderlichen Berechtigungen für den Zugriff auf die externen Ressourcen verfügt.Überprüfen Sie in diesem Fall die Registerkarte Ausnahmen im Flink-Dashboard, um die Ursache des Problems zu untersuchen.
Beobachten Sie die Metriken der laufenden Anwendung
Auf der MyApplicationSeite, im Abschnitt CloudWatch Amazon-Metriken, können Sie einige der grundlegenden Metriken der laufenden Anwendung sehen.
Um die Metriken einzusehen
-
Wählen Sie neben der Schaltfläche „Aktualisieren“ in der Dropdownliste die Option 10 Sekunden aus.
-
Wenn die Anwendung läuft und fehlerfrei ist, können Sie sehen, dass die Uptime-Metrik kontinuierlich zunimmt.
-
Die Metrik für vollständige Neustarts sollte Null sein. Wenn sie zunimmt, kann es bei der Konfiguration zu Problemen kommen. Überprüfen Sie die Registerkarte Ausnahmen im Flink-Dashboard, um das Problem zu untersuchen.
-
Die Metrik „Anzahl fehlgeschlagener Checkpoints“ sollte in einer fehlerfreien Anwendung Null sein.
Anmerkung
Dieses Dashboard zeigt einen festen Satz von Metriken mit einer Granularität von 5 Minuten. Sie können ein benutzerdefiniertes Anwendungs-Dashboard mit beliebigen Metriken im CloudWatch Dashboard erstellen.
Beobachten Sie, wie die Anwendung Daten in den Ziel-Bucket schreibt
Sie können jetzt beobachten, wie die Anwendung in Amazon Managed Service für Apache Flink ausgeführt wird und Dateien auf Amazon S3 schreibt.
Um die Dateien zu beobachten, gehen Sie genauso vor, wie Sie die Dateien überprüft haben, die geschrieben wurden, als die Anwendung lokal ausgeführt wurde. Siehe Beobachten Sie, wie die Anwendung Daten in einen S3-Bucket schreibt.
Denken Sie daran, dass die Anwendung neue Dateien auf den Flink-Checkpoint schreibt. Bei der Ausführung auf Amazon Managed Service für Apache Flink sind Checkpoints standardmäßig aktiviert und werden alle 60 Sekunden ausgeführt. Die Anwendung erstellt ungefähr alle 1 Minute neue Dateien.
Beenden Sie die Anwendung
Um die Anwendung zu beenden, rufen Sie die Konsolenseite der Anwendung Managed Service for Apache Flink mit dem Namen auf. MyApplication
So stoppen Sie die Anwendung
-
Wählen Sie in der Dropdownliste Aktion die Option Stopp aus.
-
Der Status in den Anwendungsdetails wechselt von
Running
zu und dann zu demReady
ZeitpunktStopping
, an dem die Anwendung vollständig gestoppt wurde.Anmerkung
Vergessen Sie nicht, auch das Senden von Daten aus dem Python-Skript oder dem Kinesis Data Generator an den Eingabestream zu beenden.