Erstellen Sie ein Studio-Notizbuch mit Amazon MSK - Managed Service für Apache Flink

Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Erstellen Sie ein Studio-Notizbuch mit Amazon MSK

In diesem Tutorial wird beschrieben, wie Sie ein Studio-Notizbuch erstellen, das einen MSK Amazon-Cluster als Quelle verwendet.

Einen MSK Amazon-Cluster einrichten

Für dieses Tutorial benötigen Sie einen MSK Amazon-Cluster, der Klartextzugriff ermöglicht. Wenn Sie noch keinen MSK Amazon-Cluster eingerichtet haben, folgen Sie dem MSK Tutorial Erste Schritte mit Amazon, um eine Amazon-VPC, einen MSK Amazon-Cluster, ein Thema und eine EC2 Amazon-Client-Instance zu erstellen.

Gehen Sie beim Befolgen des Tutorials wie folgt vor:

Fügen Sie ein NAT Gateway zu Ihrem hinzu VPC

Wenn Sie einen MSK Amazon-Cluster erstellt haben, indem Sie dem MSK Tutorial Erste Schritte mit Amazon gefolgt sind, oder wenn Ihr vorhandenes Amazon noch VPC kein NAT Gateway für seine privaten Subnetze hat, müssen Sie Ihrem Amazon VPC ein NAT Gateway hinzufügen. Das folgende Diagramm zeigt die Architektur.

AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.

Gehen Sie wie folgt vorVPC, um ein NAT Gateway für Ihr Amazon zu erstellen:

  1. Öffnen Sie die VPC Amazon-Konsole unter https://console.aws.amazon.com/vpc/.

  2. Wählen Sie in der linken Navigationsleiste NATGateways aus.

  3. Wählen Sie auf der Seite NATGateways die Option Create NAT Gateway aus.

  4. Geben Sie auf der Seite Create NAT Gateway die folgenden Werte an:

    Name — optional ZeppelinGateway
    Subnetz AWS KafkaTutorialSubnet1
    Elastische IP-Zuweisungs-ID Wählen Sie eine verfügbare Elastic IP aus. Wenn kein Elastic IPs verfügbar ist, wählen Sie Allocate Elastic IP und dann die Elastic IP aus, die die Konsole erstellt.

    Wählen Sie Create Gateway. NAT

  5. Wählen Sie in der linken Navigationsleiste Routing-Tabellen aus.

  6. Klicken Sie auf Create Route Table (Routing-Tabelle erstellen).

  7. Geben Sie auf der Seite Routing-Tabelle erstellen folgende Informationen ein:

    • Name-Tag: ZeppelinRouteTable

    • VPC: Wählen Sie Ihr VPC (z. B. AWS KafkaTutorialVPC).

    Wählen Sie Create (Erstellen) aus.

  8. Wählen Sie in der Liste der Routentabellen ZeppelinRouteTable. Klicken Sie auf der Registerkarte Routen auf Routen bearbeiten.

  9. Wählen Sie auf der Seite Routen bearbeiten die Option Route hinzufügen aus.

  10. Geben Sie im Für-Ziel 0.0.0.0/0 ein. Wählen Sie für Target die Option NATGateway, aus ZeppelinGateway. Wählen Sie Routen speichern aus. Klicken Sie auf Close (Schließen).

  11. Wählen Sie auf der Seite Routing-Tabellen die ZeppelinRouteTableOption „Subnetzzuordnungen“ aus. Wählen Sie Subnetzzuordnungen bearbeiten aus.

  12. Wählen Sie auf der Seite Subnetzzuordnungen bearbeiten die Optionen AWS KafkaTutorialSubnet2 und AWS KafkaTutorialSubnet 3 aus. Wählen Sie Save (Speichern) aus.

Erstellen Sie eine AWS Glue Verbindung und eine Tabelle

Ihr Studio-Notizbuch verwendet eine AWS GlueDatenbank für Metadaten zu Ihrer MSK Amazon-Datenquelle. In diesem Abschnitt erstellen Sie eine AWS Glue Verbindung, die beschreibt, wie Sie auf Ihren MSK Amazon-Cluster zugreifen können, und eine AWS Glue Tabelle, die beschreibt, wie Sie die Daten in Ihrer Datenquelle für Clients wie Ihr Studio-Notebook präsentieren.

Eine Verbindung erstellen
  1. Melden Sie sich bei der an AWS Management Console und öffnen Sie die AWS Glue Konsole unter https://console.aws.amazon.com/glue/.

  2. Wenn Sie noch keine AWS Glue Datenbank haben, wählen Sie in der linken Navigationsleiste Datenbanken aus. Wählen Sie Datenbank hinzufügen. Geben Sie im Fenster Datenbank hinzufügen default als Namen der Datenbank ein. Wählen Sie Create (Erstellen) aus.

  3. Wählen Sie in der linken Navigationsleiste Verbindungen aus. Wählen Sie Verbindung hinzufügen aus.

  4. Geben Sie im Fenster Verbindung hinzufügen die folgenden Werte ein:

    • Geben Sie für Verbindungsname ZeppelinConnection ein.

    • Wählen Sie für Verbindungstyp den Eintrag Kafka.

    • Geben Sie für den Kafka-Bootstrap-Server URLs die Bootstrap-Broker-String für Ihren Cluster an. Sie können die Bootstrap-Broker entweder über die MSK Konsole oder durch Eingabe des folgenden Befehls abrufen: CLI

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • Deaktivieren Sie das Kontrollkästchen SSLVerbindung erforderlich.

    Wählen Sie Weiter.

  5. Geben Sie VPCauf der Seite die folgenden Werte an:

    • Wählen Sie für VPCden Namen Ihres VPC (z. AWS KafkaTutorialVPCB.)

    • Wählen Sie für Subnetz AWS KafkaTutorialSubnet2 aus.

    • Wählen Sie für Sicherheitsgruppen alle verfügbaren Gruppen aus.

    Wählen Sie Weiter.

  6. Wählen Sie auf der Seite Verbindungseigenschaften / Verbindungszugriff die Option Fertigstellen aus.

Erstellen einer Tabelle
Anmerkung

Sie können die Tabelle entweder manuell erstellen, wie in den folgenden Schritten beschrieben, oder Sie können den Code zum Erstellen von Tabellenverbindern für Managed Service for Apache Flink in Ihrem Notizbuch innerhalb von Apache Zeppelin verwenden, um Ihre Tabelle über eine Anweisung zu erstellen. DDL Sie können dann einchecken AWS Glue , um sicherzustellen, dass die Tabelle korrekt erstellt wurde.

  1. Wählen Sie in der linken Navigationsleiste die Option Tabellen. Wählen Sie auf der Seite Tabellen die Optionen Tabellen hinzufügen, Tabelle manuell hinzufügen aus.

  2. Geben Sie auf der Seite Eigenschaften Ihrer Tabelle einrichten stock als Tabellennamen ein. Stellen Sie sicher, dass Sie die Datenbank auswählen, die Sie zuvor erstellt haben. Wählen Sie Weiter.

  3. Wählen Sie auf der Seite Datenspeicher hinzufügen die Option Kafka aus. Geben Sie als Themennamen Ihren Themennamen ein (z. B. AWS KafkaTutorialTopic). Wählen Sie für Verbindung ZeppelinConnection.

  4. Wählen Sie auf der Seite Klassifizierung die Option JSON. Wählen Sie Weiter.

  5. Wählen Sie auf der Seite Schema definieren die Option „Spalte hinzufügen“, um eine Spalte hinzuzufügen. Fügen Sie Spalten mit den folgenden Eigenschaften hinzu:

    Spaltenname Datentyp
    ticker string
    price double

    Wählen Sie Weiter.

  6. Überprüfen Sie auf der nächsten Seite Ihre Einstellungen und wählen Sie Fertigstellen.

  7. Wählen Sie die neu erstellte Tabelle aus der Liste der Tabellen aus.

  8. Wählen Sie Tabelle bearbeiten und fügen Sie die folgenden Eigenschaften hinzu:

    • Schlüssel:managed-flink.proctime, Wert: proctime

    • Schlüssel:flink.properties.group.id, Wert: test-consumer-group

    • Schlüssel:flink.properties.auto.offset.reset, Wert: latest

    • Schlüssel:classification, Wert: json

    Ohne diese Schlüssel/Wert-Paare tritt im Flink-Notebook ein Fehler auf.

  9. Wählen Sie Apply (Anwenden) aus.

Erstellen Sie ein Studio-Notizbuch mit Amazon MSK

Nachdem Sie die Ressourcen erstellt haben, die Ihre Anwendung verwendet, erstellen Sie Ihr Studio-Notebook.

Sie können Ihre Anwendung entweder mit dem AWS Management Console oder dem erstellen AWS CLI.
Anmerkung

Sie können ein Studio-Notizbuch auch von der MSK Amazon-Konsole aus erstellen, indem Sie einen vorhandenen Cluster auswählen und dann Daten in Echtzeit verarbeiten wählen.

Erstellen Sie ein Studio-Notizbuch mit dem AWS Management Console

  1. Die Managed Service for Apache Flink-Konsole zu https://console.aws.amazon.com/managed-flink/Hause öffnen? region=us-east-1#/applications/dashboard.

  2. Wählen Sie auf der Seite Managed Service für Apache Flink-Anwendungen die Registerkarte Studio aus. Wählen Sie Studio-Notebook erstellen.

    Anmerkung

    Um ein Studio-Notebook über die Amazon MSK - oder Kinesis Data Streams Streams-Konsolen zu erstellen, wählen Sie Ihren MSK Amazon-Eingabe-Cluster oder Kinesis-Datenstream aus und wählen Sie dann Daten in Echtzeit verarbeiten aus.

  3. Geben Sie auf der Seite Notebook-Instance erstellen folgende Informationen ein:

    • Geben Sie MyNotebook als Studio-Notebookname.

    • Wählen Sie Standard für die AWS -Glue-Datenbank.

    Wählen Sie Studio-Notebook erstellen.

  4. Wählen Sie auf der MyNotebookSeite die Registerkarte Konfiguration aus. Wählen Sie im Abschnitt Netzwerk die Option Bearbeiten.

  5. Wählen Sie auf der MyNotebook Seite Netzwerk bearbeiten für die VPCKonfiguration auf Basis des MSK Amazon-Clusters aus. Wählen Sie Ihren MSK Amazon-Cluster für Amazon MSK Cluster. Wählen Sie Änderungen speichern.

  6. Wählen Sie MyNotebookauf der Seite Ausführen aus. Warten Sie, bis der Status Wird ausgeführt angezeigt wird.

Erstellen Sie ein Studio-Notizbuch mit dem AWS CLI

Gehen Sie wie folgt vor AWS CLI, um Ihr Studio-Notizbuch mit dem zu erstellen:

  1. Stellen Sie sicher, dass Sie über die folgenden Informationen verfügen: Sie benötigen diese Werte, um Ihre Anwendung zu erstellen.

    • Ihre Konto-ID.

    • Das Subnetz IDs und die Sicherheitsgruppen-ID für das AmazonVPC, das Ihren MSK Amazon-Cluster enthält.

  2. Erstellen Sie eine Datei mit dem Namen create.json und den folgenden Inhalten. Ersetzen Sie die Platzhalterwerte durch Ihre Informationen.

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. Um Ihre Anwendung zu erstellen, führen Sie den folgenden Befehl aus.

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. Wenn der Befehl abgeschlossen ist, sollte eine Ausgabe wie die folgende angezeigt werden, die die Details für Ihr neues Studio-Notebook enthält:

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. Um Ihre Anwendung zu starten, führen Sie den folgenden Befehl aus. Ersetzen Sie die Beispielwerte durch Ihre Konto-ID.

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

Daten an Ihren MSK Amazon-Cluster senden

In diesem Abschnitt führen Sie ein Python-Skript in Ihrem EC2 Amazon-Client aus, um Daten an Ihre MSK Amazon-Datenquelle zu senden.

  1. Connect zu Ihrem EC2 Amazon-Client her.

  2. Führen Sie die folgenden Befehle aus, um Python Version 3, Pip und das Kafka für Python-Paket zu installieren, und bestätigen Sie die Aktionen:

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. Konfigurieren Sie das AWS CLI auf Ihrem Client-Computer, indem Sie den folgenden Befehl eingeben:

    aws configure

    Geben Sie Ihre Kontoanmeldeinformationen ein, und us-east-1 für die region.

  4. Erstellen Sie eine Datei mit dem Namen stock.py und den folgenden Inhalten. Ersetzen Sie den Beispielwert durch die Bootstrap Brokers-Zeichenfolge Ihres MSK Amazon-Clusters und aktualisieren Sie den Themennamen, falls Ihr Thema nicht AWS KafkaTutorialTopic:

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. Führen Sie das Skript mit dem folgenden Befehl aus:

    $ python3 stock.py
  6. Lassen Sie das Skript laufen, während Sie den folgenden Abschnitt abschließen.

Testen Sie Ihr Studio-Notebook

In diesem Abschnitt verwenden Sie Ihr Studio-Notizbuch, um Daten aus Ihrem MSK Amazon-Cluster abzufragen.

  1. Öffnen Sie die Managed Service for Apache Flink-Konsole zu https://console.aws.amazon.com/managed-flink/Hause? region=us-east-1#/applications/dashboard.

  2. Wählen Sie auf der Seite Managed Service für Apache Flink-Anwendungen die Registerkarte Studio-Notebook aus. Wählen Sie. MyNotebook

  3. Wählen Sie MyNotebookauf der Seite „In Apache Zeppelin öffnen“.

    Die Oberfläche von Apache Zeppelin wird in einer neuen Registerkarte geöffnet.

  4. Auf der Seite Willkommen bei Zeppelin! wählen Sie Zeppelin neue Notiz aus.

  5. Geben Sie auf der Seite Zeppelin Notiz die folgende Abfrage in eine neue Notiz ein:

    %flink.ssql(type=update) select * from stock

    Wählen Sie das Ausführungssymbol.

    Die Anwendung zeigt Daten aus dem MSK Amazon-Cluster an.

Um das Apache Flink Dashboard für Ihre Anwendung zu öffnen und betriebliche Aspekte zu sehen, wählen Sie FLINKJOB. Weitere Informationen zum Flink-Dashboard finden Sie unter Apache Flink-Dashboard im Managed Service für Apache Flink Entwicklerhandbuch.

Weitere Beispiele für Flink SQL Streaming-Abfragen finden Sie unter Abfragen in der Apache Flink-Dokumentation.