Crea un notebook Studio con Amazon MSK - Servizio gestito per Apache Flink

Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Crea un notebook Studio con Amazon MSK

Questo tutorial descrive come creare un notebook Studio che utilizza un MSK cluster Amazon come sorgente.

Configura un MSK cluster Amazon

Per questo tutorial, è necessario un MSK cluster Amazon che consenta l'accesso al testo in chiaro. Se non hai già configurato un MSK cluster Amazon, segui il MSK tutorial Getting Started Using Amazon per creare un AmazonVPC, un MSK cluster Amazon, un argomento e un'istanza EC2 client Amazon.

Seguendo il tutorial, completa le seguenti operazioni:

Aggiungi un NAT gateway al tuo VPC

Se hai creato un MSK cluster Amazon seguendo il MSK tutorial Getting Started Using Amazon o se il tuo Amazon esistente VPC non dispone già di un NAT gateway per le sue sottoreti private, devi aggiungere un NAT gateway al tuo Amazon. VPC Il diagramma seguente illustra l'architettura generale.

AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.

Per creare un NAT gateway per AmazonVPC, procedi come segue:

  1. Apri la VPC console Amazon all'indirizzo https://console.aws.amazon.com/vpc/.

  2. Scegli NATGateways dalla barra di navigazione a sinistra.

  3. Nella pagina NATGateways, scegli NATCreate Gateway.

  4. Nella pagina Create NAT Gateway, fornite i seguenti valori:

    Nome: opzionale ZeppelinGateway
    Sottorete AWS KafkaTutorialSubnet1
    ID di allocazione IP elastico Scegli un IP elastico disponibile. Se non è IPs disponibile alcun Elastic IP, scegli Allocate Elastic IP, quindi scegli l'IP Elasic creato dalla console.

    Scegli Create Gateway. NAT

  5. Nella barra di navigazione a sinistra, seleziona Tabelle di routing.

  6. Seleziona Crea tabella di routing.

  7. Nella pagina Crea tabella di routing, fornisci le seguenti informazioni:

    • Tag nome: ZeppelinRouteTable

    • VPC: Scegli il tuo VPC (ad esempio AWS KafkaTutorialVPC).

    Scegli Create (Crea) .

  8. Nell'elenco delle tabelle dei percorsi, scegli ZeppelinRouteTable. Seleziona la scheda Route, seleziona Modifica route.

  9. Nella scheda Modifica route scegli Aggiungi route.

  10. Per Destinazione, inserisci 0.0.0.0/0. Per Target, scegli NATGateway, ZeppelinGateway. Seleziona Salva route. Scegli Chiudi.

  11. Nella pagina Route Tables, con l'ZeppelinRouteTableopzione selezionata, scegli la scheda Associazioni di sottoreti. Scegli Modifica associazioni sottorete.

  12. Nella pagina Modifica associazioni di sottoreti, scegli AWS KafkaTutorialSubnet2 e AWS KafkaTutorialSubnet 3. Seleziona Salva.

Crea una AWS Glue connessione e una tabella

Il tuo notebook Studio utilizza un AWS Gluedatabase per i metadati sulla tua fonte di MSK dati Amazon. In questa sezione, crei una AWS Glue connessione che descrive come accedere al tuo MSK cluster Amazon e una AWS Glue tabella che descrive come presentare i dati della tua origine dati a client come il tuo notebook Studio.

Creazione di una connessione
  1. Accedi AWS Management Console e apri la AWS Glue console all'indirizzo https://console.aws.amazon.com/glue/.

  2. Se non disponi già di un AWS Glue database, scegli Database dalla barra di navigazione a sinistra. Scegli Aggiungi database. Nella finestra Aggiungi database, inserisci default per Nome database. Scegli Create (Crea) .

  3. Scegli Connessioni dalla barra di navigazione a sinistra. Scegli Aggiungi connessione.

  4. Nella finestra Aggiungi connessione, fornisci i seguenti valori:

    • Per Nome connessione, inserisci ZeppelinConnection.

    • Per Tipo di connessione, scegli Kafka.

    • Per il server bootstrap Kafka URLs, fornisci la stringa del broker bootstrap per il tuo cluster. Puoi scaricare i broker bootstrap dalla MSK console o inserendo il seguente comando: CLI

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • Deseleziona la casella di controllo Richiedi connessione SSL.

    Scegli Next (Successivo).

  5. Nella VPCpagina, fornisci i seguenti valori:

    • Per VPC, scegli il nome del tuo VPC (ad AWS KafkaTutorialVPCes.)

    • Per Subnet, scegli AWS KafkaTutorialSubnet2.

    • Per Gruppi di sicurezza, scegli tutti i gruppi disponibili.

    Scegli Next (Successivo).

  6. Nella pagina Proprietà di connessione/Accesso alla connessione, scegli Finisci.

Creazione di una tabella
Nota

È possibile creare manualmente la tabella come descritto nei passaggi seguenti oppure utilizzare il codice di creazione della tabella del connettore per Managed Service for Apache Flink nel notebook all'interno di Apache Zeppelin per creare la tabella tramite un'istruzione. DDL È quindi possibile effettuare il check-in AWS Glue per assicurarsi che la tabella sia stata creata correttamente.

  1. Nella barra di navigazione a sinistra, seleziona Tabelle. Nella pagina Tabelle, scegli Aggiungi tabelle > Aggiungi tabella manualmente.

  2. Nella pagina Imposta le proprietà della tabella, inserisci stock per Nome tabella. Assicurati di selezionare il database creato in precedenza. Scegli Next (Successivo).

  3. Nella pagina Aggiungi un datastore, scegli Kafka. Per il nome dell'argomento, inserisci il nome dell'argomento (ad es. AWS KafkaTutorialTopic). Per Connessione, scegli ZeppelinConnection.

  4. Nella pagina Classificazione, scegli JSON. Scegli Next (Successivo).

  5. Nella pagina Definisci uno schema, scegli Aggiungi colonna per aggiungere una colonna. Aggiungi colonne con le seguenti proprietà:

    Nome colonna Tipo di dati
    ticker string
    price double

    Scegli Next (Successivo).

  6. Nella pagina successiva, verifica le impostazioni e scegli Fine.

  7. Scegli la tabella appena creata dall'elenco delle tabelle.

  8. Scegliete Modifica tabella e aggiungete le seguenti proprietà:

    • chiave:managed-flink.proctime, valore: proctime

    • chiave:flink.properties.group.id, valore: test-consumer-group

    • chiave:flink.properties.auto.offset.reset, valore: latest

    • chiave:classification, valore: json

    Senza queste coppie chiave/valore, il notebook Flink genera un errore.

  9. Scegli Applica.

Crea un notebook Studio con Amazon MSK

Ora che hai creato le risorse utilizzate dall'applicazione, puoi creare il notebook Studio.

Puoi creare la tua applicazione utilizzando il AWS Management Console o il AWS CLI.
Nota

Puoi anche creare un notebook Studio dalla MSK console Amazon scegliendo un cluster esistente, quindi selezionando Elabora dati in tempo reale.

Crea un notebook Studio utilizzando il AWS Management Console

  1. Aprire la console Managed Service for Apache Flink a casa https://console.aws.amazon.com/managed-flink/? region=us-east-1#/applicazioni/dashboard.

  2. Nella pagina Applicazioni del servizio gestito per Apache Flink, scegli la scheda Studio. Scegli Crea notebook Studio.

    Nota

    Per creare un notebook Studio dalle console Amazon MSK o Kinesis Data Streams, seleziona il cluster MSK Amazon di input o il flusso di dati Kinesis, quindi scegli Elabora dati in tempo reale.

  3. Nella pagina Crea notebook Studio, immetti le seguenti informazioni:

    • Inserisci MyNotebook per Nome notebook Studio.

    • Scegli l'impostazione predefinita per il database AWS Glue.

    Scegli Crea notebook Studio.

  4. Nella MyNotebookpagina, scegli la scheda Configurazione. Nella sezione Reti, scegli Modifica.

  5. Nella MyNotebook pagina Modifica rete per, scegli la VPCconfigurazione basata sul MSK cluster Amazon. Scegli il tuo MSK cluster Amazon per Amazon MSK Cluster. Scegli Save changes (Salva modifiche).

  6. Nella MyNotebookpagina, scegli Esegui. Attendi che lo stato mostri In esecuzione.

Crea un taccuino Studio utilizzando il AWS CLI

Per creare il tuo taccuino Studio utilizzando il AWS CLI, procedi come segue:

  1. Assicurati di disporre delle informazioni riportate di seguito. Questi valori sono necessari per creare l'applicazione.

    • ID dell'account.

    • L'ID della sottorete IDs e del gruppo di sicurezza per Amazon VPC che contiene il tuo MSK cluster Amazon.

  2. Crea un file denominato create.json con i seguenti contenuti. Sostituisci i valori segnaposto con le tue informazioni.

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. Per creare l'applicazione, esegui il comando riportato di seguito:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. Una volta completata l'esecuzione del comando, dovresti visualizzare un output simile al seguente, che mostra i dettagli per il nuovo notebook Studio:

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. Per avviare l'applicazione, esegui il comando riportato di seguito. Sostituisci il valore di esempio con il tuo ID account.

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

Invia dati al tuo MSK cluster Amazon

In questa sezione, esegui uno script Python nel tuo EC2 client Amazon per inviare dati alla tua fonte MSK dati Amazon.

  1. Connect al tuo EC2 client Amazon.

  2. Esegui i seguenti comandi per installare Python versione 3, Pip e il pacchetto Kafka per Python e conferma le operazioni:

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. Configuralo AWS CLI sul tuo computer client inserendo il seguente comando:

    aws configure

    Fornisci le credenziali del tuo account e us-east-1 per region.

  4. Crea un file denominato stock.py con i seguenti contenuti. Sostituisci il valore di esempio con la stringa Bootstrap Brokers del tuo MSK cluster Amazon e aggiorna il nome dell'argomento se l'argomento non è: AWS KafkaTutorialTopic

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. Esegui lo script con il comando seguente:

    $ python3 stock.py
  6. Lascia lo script in esecuzione mentre completi la sezione seguente.

Test del notebook Studio

In questa sezione, usi il tuo notebook Studio per interrogare i dati dal tuo MSK cluster Amazon.

  1. Aprire la console Managed Service for Apache Flink a casa https://console.aws.amazon.com/managed-flink/? region=us-east-1#/applicazioni/dashboard.

  2. Nella pagina Applicazioni del servizio gestito per Apache Flink, scegli la scheda Notebook Studio. Scegli. MyNotebook

  3. Nella MyNotebookpagina, scegli Apri in Apache Zeppelin.

    L'interfaccia di Apache Zeppelin viene aperta in una nuova scheda.

  4. Nella pagina Ti diamo il benvenuto su Zeppelin!, scegli Nuova nota Zeppelin.

  5. Nella pagina Nota Zeppelin, inserisci la seguente query in una nuova nota:

    %flink.ssql(type=update) select * from stock

    Seleziona l'icona dell'esecuzione.

    L'applicazione visualizza i dati del MSK cluster Amazon.

Per aprire la dashboard di Apache Flink per la tua applicazione e visualizzare gli aspetti operativi, scegli. FLINKJOB Per ulteriori informazioni sul pannello di controllo di Flink, consulta Pannello di controllo di Apache Flink nella Guida per gli sviluppatori del servizio gestito per Apache Flink.

Per altri esempi di query Flink Streaming, consulta SQL Queries nella documentazione di Apache Flink.