Erstellen Sie eine Anwendung mit Apache Beam - Managed Service für Apache Flink

Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Erstellen Sie eine Anwendung mit Apache Beam

In dieser Übung erstellen Sie eine Managed Service for Apache Flink-Anwendung, die Daten mithilfe von Apache Beam transformiert. Apache Beam ist ein Programmiermodell für die Verarbeitung von Streaming-Daten. Informationen zur Verwendung von Apache Beam mit Managed Service für Apache Flink finden Sie unterVerwenden Sie Apache Beam mit Managed Service für Apache Flink-Anwendungen.

Anmerkung

Um die erforderlichen Voraussetzungen für diese Übung festzulegen, schließen Sie zunächst die Tutorial: Erste Schritte mit dem DataStream API integrierten Managed Service für Apache Flink Übung ab.

Erstellen Sie abhängige Ressourcen

Bevor Sie für diese Übung eine Anwendung von Managed Service für Apache Flink erstellen, erstellen Sie die folgenden abhängigen Ressourcen:

  • Zwei Kinesis Data Streams (ExampleInputStream und ExampleOutputStream)

  • Einen Amazon S3-Bucket zum Speichern des Codes der Anwendung (ka-app-code-<username>)

Sie können die Kinesis Streams und den Amazon-S3-Bucket mithilfe der Konsole erstellen. Anweisungen zum Erstellen dieser Ressourcen finden Sie in den folgenden Themen:

  • Data Streams erstellen und aktualisieren im Amazon Kinesis Data Streams Entwicklerleitfaden. Benennen Sie Ihre Data Streams ExampleInputStream und ExampleOutputStream.

  • Wie erstelle ich einen S3-Bucket? im Amazon Simple Storage Service Benutzerhandbuch. Geben Sie dem Amazon S3-Bucket einen global eindeutigen Namen, indem Sie Ihren Anmeldenamen anhängen, z. B. ka-app-code-<username>.

Schreiben Sie Beispieldatensätze in den Eingabestream

In diesem Abschnitt verwenden Sie ein Python-Skript zum Schreiben von Datensätzen in den Stream für die zu verarbeitende Anwendung.

Anmerkung

Dieser Abschnitt erfordert AWS SDK for Python (Boto).

  1. Erstellen Sie eine Datei ping.py mit dem folgenden Inhalt:

    import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
  2. Führen Sie das ping.pySkript aus:

    $ python ping.py

    Lassen Sie das Skript laufen, während Sie den Rest des Tutorials abschließen.

Laden Sie den Anwendungscode herunter und untersuchen Sie ihn

Der Java-Anwendungscode für dieses Beispiel ist verfügbar unter GitHub. Zum Herunterladen des Anwendungscodes gehen Sie wie folgt vor:

  1. Installieren Sie den Git-Client, wenn Sie dies noch nicht getan haben. Weitere Informationen finden Sie unter Git installieren.

  2. Klonen Sie das Remote-Repository mit dem folgenden Befehl:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Navigieren Sie zum amazon-kinesis-data-analytics-java-examples/Beam Verzeichnis .

Der Anwendungscode befindet sich in der BasicBeamStreamingJob.java-Datei. Beachten Sie Folgendes zum Anwendungscode:

  • Die Anwendung verwendet den Apache Beam ParDo, um eingehende Datensätze zu verarbeiten, indem sie eine benutzerdefinierte Transformationsfunktion namens PingPongFn aufruft.

    Der Code zum Aufrufen der PingPongFn Funktion lautet wie folgt:

    .apply("Pong transform", ParDo.of(new PingPongFn())
  • Managed Service für Apache Flink-Anwendungen, die Apache Beam verwenden, erfordert die folgenden Komponenten. Wenn Sie diese Komponenten und Versionen nicht in Ihre pom.xml aufnehmen, lädt Ihre Anwendung die falschen Versionen aus den Umgebungsabhängigkeiten, und da die Versionen nicht übereinstimmen, stürzt Ihre Anwendung zur Laufzeit ab.

    <jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
  • Die PingPongFn Transformationsfunktion übergibt die Eingabedaten an den Ausgabestrom, sofern es sich bei den Eingabedaten nicht um einen Ping-Wert handelt. In diesem Fall gibt sie die Zeichenfolge pong\nan den Ausgabestrom aus.

    Der Code der Transformationsfunktion lautet wie folgt:

    private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }

Kompilieren Sie den Anwendungscode

Zum Kompilieren der Anwendung gehen Sie wie folgt vor:

  1. Installieren Sie Java und Maven, wenn das noch nicht geschehen ist. Weitere Informationen finden Sie unter Erfüllen Sie die erforderlichen Voraussetzungen im Tutorial: Erste Schritte mit dem DataStream API integrierten Managed Service für Apache Flink Tutorial.

  2. Kompilieren Sie die Anwendung mit dem folgenden Befehl:

    mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
    Anmerkung

    Der bereitgestellte Quellcode basiert auf Bibliotheken von Java 11.

Beim Kompilieren der Anwendung wird die JAR Anwendungsdatei (target/basic-beam-app-1.0.jar) erstellt.

Laden Sie den Apache Flink-Streaming-Java-Code hoch

In diesem Abschnitt laden Sie Ihren Anwendungscode in das Amazon S3-Bucket hoch, das Sie im Erstellen Sie abhängige Ressourcen Abschnitt erstellt haben.

  1. Wählen Sie in der Amazon S3 S3-Konsole die Option ka-app-code -<username>Bucket und wählen Sie Upload.

  2. Klicken Sie im Schritt Auswählen von Dateien auf Hinzufügen von Dateien. Navigieren Sie zu der basic-beam-app-1.0.jarDatei, die Sie im vorherigen Schritt erstellt haben.

  3. Sie müssen keine der Einstellungen für das Objekt ändern. Wählen Sie daher Hochladen.

Ihr Anwendungscode ist jetzt in einem Amazon-S3-Bucket gespeichert, in dem Ihre Anwendung darauf zugreifen kann.

Erstellen Sie die Anwendung Managed Service for Apache Flink und führen Sie sie aus

Befolgen Sie diese Schritte, um die Anwendung über die Konsole zu erstellen, zu konfigurieren, zu aktualisieren und auszuführen.

Erstellen Sie die Anwendung

  1. Öffnen Sie die Managed Service for Apache Flink-Konsole unter /flink https://console.aws.amazon.com

  2. Wählen Sie im Dashboard Managed Service für Apache Flink die Option Analyseanwendung erstellen aus.

  3. Geben Sie auf der Seite Managed Service für Apache Flink – Anwendung erstellen die Anwendungsdetails wie folgt ein:

    • Geben Sie als Anwendungsname ein MyApplication.

    • Wählen Sie für Laufzeit die Option Apache Flink aus.

      Anmerkung

      Apache Beam ist derzeit nicht mit Apache Flink Version 1.19 oder höher kompatibel.

    • Wählen Sie Apache Flink Version 1.15 aus dem Versions-Pulldown aus.

  4. Wählen Sie für Zugriffsberechtigungen die Option Rolle erstellen/aktualisieren. IAM kinesis-analytics-MyApplication-us-west-2

  5. Wählen Sie Create application aus.

Anmerkung

Wenn Sie mithilfe der Konsole eine Managed Service for Apache Flink-Anwendung erstellen, haben Sie die Möglichkeit, eine IAM Rolle und Richtlinie für Ihre Anwendung erstellen zu lassen. Ihre Anwendung verwendet diese Rolle und Richtlinie für den Zugriff auf ihre abhängigen Ressourcen. Diese IAM Ressourcen werden anhand Ihres Anwendungsnamens und Ihrer Region wie folgt benannt:

  • Richtlinie: kinesis-analytics-service-MyApplication-us-west-2

  • Rolle: kinesis-analytics-MyApplication-us-west-2

Bearbeiten Sie die IAM Richtlinie

Bearbeiten Sie die IAM Richtlinie, um Berechtigungen für den Zugriff auf die Kinesis-Datenstreams hinzuzufügen.

  1. Öffnen Sie die IAM Konsole unter. https://console.aws.amazon.com/iam/

  2. Wählen Sie Policies (Richtlinien). Wählen Sie die kinesis-analytics-service-MyApplication-us-west-2-Richtlinie aus, die die Konsole im vorherigen Abschnitt für Sie erstellt hat.

  3. Wählen Sie auf der Seite Summary (Übersicht) die Option Edit policy (Richtlinie bearbeiten) aus. Wählen Sie die JSONRegisterkarte.

  4. Fügen Sie den markierten Abschnitt der folgenden Beispielrichtlinie der Richtlinie hinzu. Ersetzen Sie das Beispielkonto IDs (012345678901) durch Ihre Konto-ID.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Konfigurieren Sie die Anwendung

  1. Wählen Sie auf der MyApplicationSeite Configure aus.

  2. Klicken Sie auf der Seite Configure application (Anwendung konfigurieren) auf die Option Code location (Codespeicherort):

    • Geben Sie für Amazon-S3-Bucket ka-app-code-<username> ein.

    • Geben Sie als Pfad zum Amazon-S3-Objekt den Wert basic-beam-app-1.0.jar ein.

  3. Wählen Sie unter Zugriff auf Anwendungsressourcen für Zugriffsberechtigungen die Option IAMRolle erstellen/aktualisieren auskinesis-analytics-MyApplication-us-west-2.

  4. Geben Sie Folgendes ein:

    Gruppen-ID Schlüssel Wert
    BeamApplicationProperties InputStreamName ExampleInputStream
    BeamApplicationProperties OutputStreamName ExampleOutputStream
    BeamApplicationProperties AwsRegion us-west-2
  5. Stellen Sie unter Überwachung sicher, dass die Ebene der Überwachungsmetriken auf Anwendung eingestellt ist.

  6. Wählen Sie für die CloudWatch Protokollierung das Kontrollkästchen Aktivieren aus.

  7. Wählen Sie Aktualisieren.

Anmerkung

Wenn Sie die CloudWatch Protokollierung aktivieren möchten, erstellt Managed Service for Apache Flink eine Protokollgruppe und einen Protokollstream für Sie. Die Namen dieser Ressourcen lauten wie folgt:

  • Protokollgruppe: /aws/kinesis-analytics/MyApplication

  • Protokollstream: kinesis-analytics-log-stream

Dieser Protokollstream wird zur Überwachung der Anwendung verwendet. Dies ist nicht derselbe Protokollstream, den die Anwendung zum Senden von Ergebnissen verwendet.

Führen Sie die Anwendung aus.

Das Flink-Jobdiagramm kann angezeigt werden, indem Sie die Anwendung ausführen, das Apache Flink-Dashboard öffnen und den gewünschten Flink-Job auswählen.

Sie können die Messwerte von Managed Service for Apache Flink auf der CloudWatch Konsole überprüfen, um sicherzustellen, dass die Anwendung funktioniert.

Ressourcen bereinigen AWS

Dieser Abschnitt enthält Verfahren zum Bereinigen von AWS Ressourcen, die im Tumbling Window-Tutorial erstellt wurden.

Löschen Sie Ihre Managed Service for Apache Flink-Anwendung

  1. Öffnen Sie die Managed Service for Apache Flink-Konsole unter /flink https://console.aws.amazon.com

  2. wählen Sie im Bereich Managed Service for Apache Flink die Option. MyApplication

  3. Wählen Sie auf der Seite der Anwendung die Option Löschen aus und bestätigen Sie dann den Löschvorgang.

Löschen Sie Ihre Kinesis-Datenstreams

  1. Öffnen Sie die Kinesis-Konsole unter https://console.aws.amazon.com/kinesis.

  2. Wählen Sie im Bereich Kinesis Data Streams die Option ExampleInputStream.

  3. Wählen Sie auf der ExampleInputStreamSeite Delete Kinesis Stream aus und bestätigen Sie dann den Löschvorgang.

  4. Wählen Sie auf der Kinesis-Streams-Seite die ExampleOutputStream, wählen Sie Aktionen, wählen Sie Löschen und bestätigen Sie dann den Löschvorgang.

Löschen Sie Ihr Amazon S3 S3-Objekt und Ihren Bucket

  1. Öffnen Sie die Amazon S3 S3-Konsole unter https://console.aws.amazon.com/s3/.

  2. Wählen Sie ka-app-code -<username> Eimer.

  3. Wählen Sie Löschen und geben Sie dann den Bucketnamen ein, um das Löschen zu bestätigen.

Löschen Sie Ihre IAM Ressourcen

  1. Öffnen Sie die IAM Konsole unter https://console.aws.amazon.com/iam/.

  2. Wählen Sie in der Navigationsleiste Policies aus.

  3. Geben Sie in der Filtersteuerung Kinesis ein.

  4. Wählen Sie die Richtlinie kinesis-analytics-service- MyApplication -us-west-2.

  5. Klicken Sie auf Richtlinienaktionen und anschließend auf Löschen.

  6. Wählen Sie in der Navigationsleiste Roles (Rollen) aus.

  7. Wählen Sie die Rolle kinesis-analytics- MyApplication -us-west-2.

  8. Wählen Sie dann Rolle löschen und bestätigen Sie das Löschen.

CloudWatch Löschen Sie Ihre Ressourcen

  1. Öffnen Sie die CloudWatch Konsole unter https://console.aws.amazon.com/cloudwatch/.

  2. Wählen Sie in der Navigationsleiste Protokolle aus.

  3. Wählen Sie die Protokollgruppe MyApplication/aws/kinesis-analytics/ aus.

  4. Wählen Sie dann Protokollgruppe löschen und bestätigen Sie das Löschen.

Nächste Schritte

Nachdem Sie nun eine grundlegende Managed Service for Apache Flink-Anwendung erstellt und ausgeführt haben, die Daten mithilfe von Apache Beam transformiert, finden Sie in der folgenden Anwendung ein Beispiel für eine erweiterte Managed Service für Apache Flink-Lösung.