Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.
Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Crea un notebook Studio con Amazon MSK
Questo tutorial descrive come creare un notebook Studio che utilizza un MSK cluster Amazon come sorgente.
Questo tutorial contiene le sezioni seguenti:
Configura un MSK cluster Amazon
Per questo tutorial, è necessario un MSK cluster Amazon che consenta l'accesso al testo in chiaro. Se non hai già configurato un MSK cluster Amazon, segui il MSK tutorial Getting Started Using Amazon per creare un AmazonVPC, un MSK cluster Amazon, un argomento e un'istanza EC2 client Amazon.
Seguendo il tutorial, completa le seguenti operazioni:
Nel passaggio 3: crea un MSK cluster Amazon, nel passaggio 4, modifica il
ClientBroker
valore daTLS
aPLAINTEXT
.
Aggiungi un NAT gateway al tuo VPC
Se hai creato un MSK cluster Amazon seguendo il MSK tutorial Getting Started Using Amazon o se il tuo Amazon esistente VPC non dispone già di un NAT gateway per le sue sottoreti private, devi aggiungere un NAT gateway al tuo Amazon. VPC Il diagramma seguente illustra l'architettura generale.
Per creare un NAT gateway per AmazonVPC, procedi come segue:
Apri la VPC console Amazon all'indirizzo https://console.aws.amazon.com/vpc/
. Scegli NATGateways dalla barra di navigazione a sinistra.
Nella pagina NATGateways, scegli NATCreate Gateway.
Nella pagina Create NAT Gateway, fornite i seguenti valori:
Nome: opzionale ZeppelinGateway
Sottorete AWS KafkaTutorialSubnet1 ID di allocazione IP elastico Scegli un IP elastico disponibile. Se non è IPs disponibile alcun Elastic IP, scegli Allocate Elastic IP, quindi scegli l'IP Elasic creato dalla console. Scegli Create Gateway. NAT
Nella barra di navigazione a sinistra, seleziona Tabelle di routing.
Seleziona Crea tabella di routing.
Nella pagina Crea tabella di routing, fornisci le seguenti informazioni:
Tag nome:
ZeppelinRouteTable
VPC: Scegli il tuo VPC (ad esempio AWS KafkaTutorialVPC).
Scegli Create (Crea) .
Nell'elenco delle tabelle dei percorsi, scegli ZeppelinRouteTable. Seleziona la scheda Route, seleziona Modifica route.
Nella scheda Modifica route scegli Aggiungi route.
Per Destinazione, inserisci
0.0.0.0/0
. Per Target, scegli NATGateway, ZeppelinGateway. Seleziona Salva route. Scegli Chiudi.Nella pagina Route Tables, con l'ZeppelinRouteTableopzione selezionata, scegli la scheda Associazioni di sottoreti. Scegli Modifica associazioni sottorete.
Nella pagina Modifica associazioni di sottoreti, scegli AWS KafkaTutorialSubnet2 e AWS KafkaTutorialSubnet 3. Seleziona Salva.
Crea una AWS Glue connessione e una tabella
Il tuo notebook Studio utilizza un AWS Gluedatabase per i metadati sulla tua fonte di MSK dati Amazon. In questa sezione, crei una AWS Glue connessione che descrive come accedere al tuo MSK cluster Amazon e una AWS Glue tabella che descrive come presentare i dati della tua origine dati a client come il tuo notebook Studio.
Creazione di una connessione
Accedi AWS Management Console e apri la AWS Glue console all'indirizzo https://console.aws.amazon.com/glue/
. Se non disponi già di un AWS Glue database, scegli Database dalla barra di navigazione a sinistra. Scegli Aggiungi database. Nella finestra Aggiungi database, inserisci
default
per Nome database. Scegli Create (Crea) .Scegli Connessioni dalla barra di navigazione a sinistra. Scegli Aggiungi connessione.
Nella finestra Aggiungi connessione, fornisci i seguenti valori:
Per Nome connessione, inserisci
ZeppelinConnection
.Per Tipo di connessione, scegli Kafka.
Per il server bootstrap Kafka URLs, fornisci la stringa del broker bootstrap per il tuo cluster. Puoi scaricare i broker bootstrap dalla MSK console o inserendo il seguente comando: CLI
aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn
ClusterArn
Deseleziona la casella di controllo Richiedi connessione SSL.
Scegli Next (Successivo).
Nella VPCpagina, fornisci i seguenti valori:
Per VPC, scegli il nome del tuo VPC (ad AWS KafkaTutorialVPCes.)
Per Subnet, scegli AWS KafkaTutorialSubnet2.
Per Gruppi di sicurezza, scegli tutti i gruppi disponibili.
Scegli Next (Successivo).
Nella pagina Proprietà di connessione/Accesso alla connessione, scegli Finisci.
Creazione di una tabella
Nota
È possibile creare manualmente la tabella come descritto nei passaggi seguenti oppure utilizzare il codice di creazione della tabella del connettore per Managed Service for Apache Flink nel notebook all'interno di Apache Zeppelin per creare la tabella tramite un'istruzione. DDL È quindi possibile effettuare il check-in AWS Glue per assicurarsi che la tabella sia stata creata correttamente.
Nella barra di navigazione a sinistra, seleziona Tabelle. Nella pagina Tabelle, scegli Aggiungi tabelle > Aggiungi tabella manualmente.
Nella pagina Imposta le proprietà della tabella, inserisci
stock
per Nome tabella. Assicurati di selezionare il database creato in precedenza. Scegli Next (Successivo).Nella pagina Aggiungi un datastore, scegli Kafka. Per il nome dell'argomento, inserisci il nome dell'argomento (ad es. AWS KafkaTutorialTopic). Per Connessione, scegli ZeppelinConnection.
Nella pagina Classificazione, scegli JSON. Scegli Next (Successivo).
Nella pagina Definisci uno schema, scegli Aggiungi colonna per aggiungere una colonna. Aggiungi colonne con le seguenti proprietà:
Nome colonna Tipo di dati ticker
string
price
double
Scegli Next (Successivo).
Nella pagina successiva, verifica le impostazioni e scegli Fine.
-
Scegli la tabella appena creata dall'elenco delle tabelle.
-
Scegliete Modifica tabella e aggiungete le seguenti proprietà:
-
chiave:
managed-flink.proctime
, valore:proctime
-
chiave:
flink.properties.group.id
, valore:test-consumer-group
-
chiave:
flink.properties.auto.offset.reset
, valore:latest
-
chiave:
classification
, valore:json
Senza queste coppie chiave/valore, il notebook Flink genera un errore.
-
-
Scegli Applica.
Crea un notebook Studio con Amazon MSK
Ora che hai creato le risorse utilizzate dall'applicazione, puoi creare il notebook Studio.
Puoi creare la tua applicazione utilizzando il AWS Management Console o il AWS CLI.
Nota
Puoi anche creare un notebook Studio dalla MSK console Amazon scegliendo un cluster esistente, quindi selezionando Elabora dati in tempo reale.
Crea un notebook Studio utilizzando il AWS Management Console
Nella pagina Applicazioni del servizio gestito per Apache Flink, scegli la scheda Studio. Scegli Crea notebook Studio.
Nota
Per creare un notebook Studio dalle console Amazon MSK o Kinesis Data Streams, seleziona il cluster MSK Amazon di input o il flusso di dati Kinesis, quindi scegli Elabora dati in tempo reale.
Nella pagina Crea notebook Studio, immetti le seguenti informazioni:
-
Inserisci
MyNotebook
per Nome notebook Studio. Scegli l'impostazione predefinita per il database AWS Glue.
Scegli Crea notebook Studio.
-
Nella MyNotebookpagina, scegli la scheda Configurazione. Nella sezione Reti, scegli Modifica.
Nella MyNotebook pagina Modifica rete per, scegli la VPCconfigurazione basata sul MSK cluster Amazon. Scegli il tuo MSK cluster Amazon per Amazon MSK Cluster. Scegli Save changes (Salva modifiche).
Nella MyNotebookpagina, scegli Esegui. Attendi che lo stato mostri In esecuzione.
Crea un taccuino Studio utilizzando il AWS CLI
Per creare il tuo taccuino Studio utilizzando il AWS CLI, procedi come segue:
Assicurati di disporre delle informazioni riportate di seguito. Questi valori sono necessari per creare l'applicazione.
ID dell'account.
L'ID della sottorete IDs e del gruppo di sicurezza per Amazon VPC che contiene il tuo MSK cluster Amazon.
Crea un file denominato
create.json
con i seguenti contenuti. Sostituisci i valori segnaposto con le tue informazioni.{ "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::
AccountID
:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1
", "SubnetID 2
", "SubnetID 3
" ], "SecurityGroupIds": [ "VPC Security Group ID
" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID
:database/default" } } } } }Per creare l'applicazione, esegui il comando riportato di seguito:
aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
Una volta completata l'esecuzione del comando, dovresti visualizzare un output simile al seguente, che mostra i dettagli per il nuovo notebook Studio:
{ "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
Per avviare l'applicazione, esegui il comando riportato di seguito. Sostituisci il valore di esempio con il tuo ID account.
aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:
012345678901
:application/MyNotebook\
Invia dati al tuo MSK cluster Amazon
In questa sezione, esegui uno script Python nel tuo EC2 client Amazon per inviare dati alla tua fonte MSK dati Amazon.
Connect al tuo EC2 client Amazon.
Esegui i seguenti comandi per installare Python versione 3, Pip e il pacchetto Kafka per Python e conferma le operazioni:
sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
Configuralo AWS CLI sul tuo computer client inserendo il seguente comando:
aws configure
Fornisci le credenziali del tuo account e
us-east-1
perregion
.Crea un file denominato
stock.py
con i seguenti contenuti. Sostituisci il valore di esempio con la stringa Bootstrap Brokers del tuo MSK cluster Amazon e aggiorna il nome dell'argomento se l'argomento non è: AWS KafkaTutorialTopicfrom kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "
<<Bootstrap Broker List>>
" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())Esegui lo script con il comando seguente:
$ python3 stock.py
Lascia lo script in esecuzione mentre completi la sezione seguente.
Test del notebook Studio
In questa sezione, usi il tuo notebook Studio per interrogare i dati dal tuo MSK cluster Amazon.
Nella pagina Applicazioni del servizio gestito per Apache Flink, scegli la scheda Notebook Studio. Scegli. MyNotebook
Nella MyNotebookpagina, scegli Apri in Apache Zeppelin.
L'interfaccia di Apache Zeppelin viene aperta in una nuova scheda.
Nella pagina Ti diamo il benvenuto su Zeppelin!, scegli Nuova nota Zeppelin.
Nella pagina Nota Zeppelin, inserisci la seguente query in una nuova nota:
%flink.ssql(type=update) select * from stock
Seleziona l'icona dell'esecuzione.
L'applicazione visualizza i dati del MSK cluster Amazon.
Per aprire la dashboard di Apache Flink per la tua applicazione e visualizzare gli aspetti operativi, scegli. FLINKJOB Per ulteriori informazioni sul pannello di controllo di Flink, consulta Pannello di controllo di Apache Flink nella Guida per gli sviluppatori del servizio gestito per Apache Flink.
Per altri esempi di query Flink Streaming, consulta SQL Queries