Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.
Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Erstellen Sie eine Anwendung mit Apache Beam
In dieser Übung erstellen Sie eine Managed Service for Apache Flink-Anwendung, die Daten mithilfe von Apache Beam
Anmerkung
Um die erforderlichen Voraussetzungen für diese Übung festzulegen, schließen Sie zunächst die Tutorial: Erste Schritte mit der DataStream API in Managed Service für Apache Flink Übung ab.
Dieses Thema enthält die folgenden Abschnitte:
Erstellen Sie abhängige Ressourcen
Bevor Sie für diese Übung eine Anwendung von Managed Service für Apache Flink erstellen, erstellen Sie die folgenden abhängigen Ressourcen:
Zwei Kinesis Data Streams (
ExampleInputStream
undExampleOutputStream
)Einen Amazon S3-Bucket zum Speichern des Codes der Anwendung (
ka-app-code-
)<username>
Sie können die Kinesis Streams und den Amazon-S3-Bucket mithilfe der Konsole erstellen. Anweisungen zum Erstellen dieser Ressourcen finden Sie in den folgenden Themen:
Data Streams erstellen und aktualisieren im Amazon Kinesis Data Streams Entwicklerleitfaden. Benennen Sie Ihre Data Streams
ExampleInputStream
undExampleOutputStream
.Wie erstelle ich einen S3-Bucket? im Amazon Simple Storage Service Benutzerhandbuch. Geben Sie dem Amazon S3-Bucket einen global eindeutigen Namen, indem Sie Ihren Anmeldenamen anhängen, z. B.
ka-app-code-
.<username>
Schreiben Sie Beispieldatensätze in den Eingabestream
In diesem Abschnitt verwenden Sie ein Python-Skript zum Schreiben von Datensätzen in den Stream für die zu verarbeitende Anwendung.
Anmerkung
Dieser Abschnitt erfordert AWS SDK for Python (Boto)
-
Erstellen Sie eine Datei
ping.py
mit dem folgenden Inhalt:import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
-
Führen Sie das
ping.py
Skript aus:$ python ping.py
Lassen Sie das Skript laufen, während Sie den Rest des Tutorials abschließen.
Laden Sie den Anwendungscode herunter und untersuchen Sie ihn
Der Java-Anwendungscode für dieses Beispiel ist verfügbar unter GitHub. Zum Herunterladen des Anwendungscodes gehen Sie wie folgt vor:
Installieren Sie den Git-Client, wenn Sie dies noch nicht getan haben. Weitere Informationen finden Sie unter Git installieren
. Klonen Sie das Remote-Repository mit dem folgenden Befehl:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
Navigieren Sie zum
amazon-kinesis-data-analytics-java-examples/Beam
Verzeichnis .
Der Anwendungscode befindet sich in der BasicBeamStreamingJob.java
-Datei. Beachten Sie Folgendes zum Anwendungscode:
Die Anwendung verwendet den Apache Beam ParDo
, um eingehende Datensätze zu verarbeiten, indem sie eine benutzerdefinierte Transformationsfunktion namens PingPongFn
aufruft.Der Code zum Aufrufen der
PingPongFn
Funktion lautet wie folgt:.apply("Pong transform", ParDo.of(new PingPongFn())
Managed Service für Apache Flink-Anwendungen, die Apache Beam verwenden, erfordert die folgenden Komponenten. Wenn Sie diese Komponenten und Versionen nicht in Ihre
pom.xml
aufnehmen, lädt Ihre Anwendung die falschen Versionen aus den Umgebungsabhängigkeiten, und da die Versionen nicht übereinstimmen, stürzt Ihre Anwendung zur Laufzeit ab.<jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
Die
PingPongFn
Transformationsfunktion übergibt die Eingabedaten an den Ausgabestrom, sofern es sich bei den Eingabedaten nicht um einen Ping-Wert handelt. In diesem Fall gibt sie die Zeichenfolge pong\nan den Ausgabestrom aus.Der Code der Transformationsfunktion lautet wie folgt:
private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }
Kompilieren Sie den Anwendungscode
Zum Kompilieren der Anwendung gehen Sie wie folgt vor:
Installieren Sie Java und Maven, wenn das noch nicht geschehen ist. Weitere Informationen finden Sie unter Erfüllen Sie die erforderlichen Voraussetzungen im Tutorial: Erste Schritte mit der DataStream API in Managed Service für Apache Flink Tutorial.
Kompilieren Sie die Anwendung mit dem folgenden Befehl:
mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
Anmerkung
Der bereitgestellte Quellcode basiert auf Bibliotheken von Java 11.
Beim Kompilieren der Anwendung wird die JAR-Datei der Anwendung (target/basic-beam-app-1.0.jar
) erstellt.
Laden Sie den Apache Flink-Streaming-Java-Code hoch
In diesem Abschnitt laden Sie Ihren Anwendungscode in das Amazon S3-Bucket hoch, das Sie im Erstellen Sie abhängige Ressourcen Abschnitt erstellt haben.
-
Wählen Sie in der Amazon S3 S3-Konsole den
<username>
Bucket ka-app-code- und wählen Sie Upload aus. -
Klicken Sie im Schritt Auswählen von Dateien auf Hinzufügen von Dateien. Navigieren Sie zu der
basic-beam-app-1.0.jar
Datei, die Sie im vorherigen Schritt erstellt haben. Sie müssen keine der Einstellungen für das Objekt ändern. Wählen Sie daher Hochladen.
Ihr Anwendungscode ist jetzt in einem Amazon-S3-Bucket gespeichert, in dem Ihre Anwendung darauf zugreifen kann.
Erstellen Sie die Anwendung Managed Service for Apache Flink und führen Sie sie aus
Befolgen Sie diese Schritte, um die Anwendung über die Konsole zu erstellen, zu konfigurieren, zu aktualisieren und auszuführen.
Erstellen Sie die Anwendung
Öffnen Sie die Managed Service for Apache Flink-Konsole unter /flink https://console.aws.amazon.com
-
Wählen Sie im Dashboard Managed Service für Apache Flink die Option Analyseanwendung erstellen aus.
-
Geben Sie auf der Seite Managed Service für Apache Flink – Anwendung erstellen die Anwendungsdetails wie folgt ein:
-
Geben Sie als Anwendungsname ein
MyApplication
. -
Wählen Sie für Laufzeit die Option Apache Flink aus.
Anmerkung
Apache Beam ist derzeit nicht mit Apache Flink Version 1.19 oder höher kompatibel.
Wählen Sie Apache Flink Version 1.15 aus dem Versions-Pulldown aus.
-
-
Wählen Sie für Zugriffsberechtigungen die Option Erstellen / Aktualisieren Sie IAM-Rolle
kinesis-analytics-MyApplication-us-west-2
aus. -
Wählen Sie Create application aus.
Anmerkung
Beim Erstellen einer Anwendung von Managed Service für Apache Flink mit der Konsole haben Sie die Möglichkeit, eine IAM-Rolle und -Richtlinie für Ihre Anwendung erstellen zu lassen. Ihre Anwendung verwendet diese Rolle und Richtlinie für den Zugriff auf ihre abhängigen Ressourcen. Diese IAM-Ressourcen werden unter Verwendung Ihres Anwendungsnamens und der Region wie folgt benannt:
-
Richtlinie:
kinesis-analytics-service-
MyApplication
-us-west-2
-
Rolle:
kinesis-analytics-MyApplication-
us-west-2
Bearbeiten Sie die IAM-Richtlinie
Bearbeiten Sie die IAM-Richtlinie zum Hinzufügen von Berechtigungen für den Zugriff auf die Kinesis-Datenströme.
Öffnen Sie unter https://console.aws.amazon.com/iam/
die IAM-Konsole. -
Wählen Sie Policies (Richtlinien). Wählen Sie die
kinesis-analytics-service-MyApplication-us-west-2
-Richtlinie aus, die die Konsole im vorherigen Abschnitt für Sie erstellt hat. -
Wählen Sie auf der Seite Summary (Übersicht) die Option Edit policy (Richtlinie bearbeiten) aus. Wählen Sie den Tab JSON.
-
Fügen Sie den markierten Abschnitt der folgenden Beispielrichtlinie der Richtlinie hinzu. Ersetzen Sie das Beispielkonto IDs (
012345678901
) durch Ihre Konto-ID.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:
012345678901
:log-group:*", "arn:aws:s3:::ka-app-code-<username>
/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" }
Konfigurieren Sie die Anwendung
-
Wählen Sie auf der MyApplicationSeite Configure aus.
-
Klicken Sie auf der Seite Configure application (Anwendung konfigurieren) auf die Option Code location (Codespeicherort):
-
Geben Sie für Amazon-S3-Bucket
ka-app-code-
ein.<username>
-
Geben Sie als Pfad zum Amazon-S3-Objekt den Wert
basic-beam-app-1.0.jar
ein.
-
-
Wählen Sie unter Zugriff auf Anwendungsressourcen für Zugriffsberechtigungen die Option IAM-Rolle
kinesis-analytics-MyApplication-us-west-2
erstellen/aktualisieren aus. -
Geben Sie Folgendes ein:
Gruppen-ID Schlüssel Value (Wert) BeamApplicationProperties
InputStreamName
ExampleInputStream
BeamApplicationProperties
OutputStreamName
ExampleOutputStream
BeamApplicationProperties
AwsRegion
us-west-2
-
Stellen Sie unter Überwachung sicher, dass die Ebene der Überwachungsmetriken auf Anwendung eingestellt ist.
-
Wählen Sie für die CloudWatch Protokollierung das Kontrollkästchen Aktivieren aus.
-
Wählen Sie Aktualisieren.
Anmerkung
Wenn Sie die CloudWatch Protokollierung aktivieren möchten, erstellt Managed Service for Apache Flink eine Protokollgruppe und einen Protokollstream für Sie. Die Namen dieser Ressourcen lauten wie folgt:
-
Protokollgruppe:
/aws/kinesis-analytics/MyApplication
-
Protokollstream:
kinesis-analytics-log-stream
Dieser Protokollstream wird zur Überwachung der Anwendung verwendet. Dies ist nicht derselbe Protokollstream, den die Anwendung zum Senden von Ergebnissen verwendet.
Führen Sie die Anwendung aus.
Das Flink-Jobdiagramm kann angezeigt werden, indem Sie die Anwendung ausführen, das Apache Flink-Dashboard öffnen und den gewünschten Flink-Job auswählen.
Sie können die Messwerte von Managed Service for Apache Flink auf der CloudWatch Konsole überprüfen, um sicherzustellen, dass die Anwendung funktioniert.
Ressourcen bereinigen AWS
Dieser Abschnitt enthält Verfahren zum Bereinigen von AWS Ressourcen, die im Tumbling Window-Tutorial erstellt wurden.
Dieses Thema enthält die folgenden Abschnitte:
Löschen Sie Ihre Managed Service for Apache Flink-Anwendung
Öffnen Sie die Managed Service for Apache Flink-Konsole unter /flink https://console.aws.amazon.com
wählen Sie im Bereich Managed Service for Apache Flink die Option. MyApplication
Wählen Sie auf der Seite der Anwendung die Option Löschen aus und bestätigen Sie dann den Löschvorgang.
Löschen Sie Ihre Kinesis-Datenstreams
Öffnen Sie die Kinesis-Konsole unter https://console.aws.amazon.com/kinesis.
Wählen Sie im Bereich Kinesis Data Streams die Option ExampleInputStream.
Wählen Sie auf der ExampleInputStreamSeite Delete Kinesis Stream aus und bestätigen Sie dann den Löschvorgang.
Wählen Sie auf der Kinesis-Streams-Seite die ExampleOutputStream, wählen Sie Aktionen, wählen Sie Löschen und bestätigen Sie dann den Löschvorgang.
Löschen Sie Ihr Amazon S3 S3-Objekt und Ihren Bucket
Öffnen Sie die Amazon S3 S3-Konsole unter https://console.aws.amazon.com/s3/
. Wählen Sie den
<username>
Bucket ka-app-code -.Wählen Sie Löschen und geben Sie dann den Bucketnamen ein, um das Löschen zu bestätigen.
Löschen Sie Ihre IAM-Ressourcen
Öffnen Sie unter https://console.aws.amazon.com/iam/
die IAM-Konsole. Wählen Sie in der Navigationsleiste Policies aus.
Geben Sie in der Filtersteuerung Kinesis ein.
Wählen Sie die Richtlinie kinesis-analytics-service- MyApplication -us-west-2.
Klicken Sie auf Richtlinienaktionen und anschließend auf Löschen.
Wählen Sie in der Navigationsleiste Roles (Rollen) aus.
Wählen Sie die Rolle kinesis-analytics- MyApplication -us-west-2.
Wählen Sie dann Rolle löschen und bestätigen Sie das Löschen.
CloudWatch Löschen Sie Ihre Ressourcen
Öffnen Sie die CloudWatch Konsole unter https://console.aws.amazon.com/cloudwatch/
. Wählen Sie in der Navigationsleiste Protokolle aus.
Wählen Sie die Gruppe/aws/kinesis-analytics/MyApplicationlog aus.
Wählen Sie dann Protokollgruppe löschen und bestätigen Sie das Löschen.
Nächste Schritte
Nachdem Sie nun eine grundlegende Managed Service for Apache Flink-Anwendung erstellt und ausgeführt haben, die Daten mithilfe von Apache Beam transformiert, finden Sie in der folgenden Anwendung ein Beispiel für eine erweiterte Managed Service für Apache Flink-Lösung.
Workshop „Beam on Managed Service for Apache Flink Streaming
“: In diesem Workshop untersuchen wir ein durchgängiges Beispiel, das Batch- und Streaming-Aspekte in einer einheitlichen Apache Beam-Pipeline kombiniert.