Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.
Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Crea ed esegui un servizio gestito per l'applicazione Apache Flink
In questo passaggio, crei un servizio gestito per l'applicazione Apache Flink con flussi di dati Kinesis come origine e sink.
Questa sezione contiene le fasi seguenti:
- Crea risorse dipendenti
- Configurazione dell'ambiente di sviluppo locale
- Scarica ed esamina il codice Java per lo streaming di Apache Flink
- Scrivi record di esempio nel flusso di input
- Esegui l'applicazione localmente
- Osserva i dati di input e output nei flussi Kinesis
- Arresta l'esecuzione locale dell'applicazione
- Compila e impacchetta il codice dell'applicazione
- Carica il JAR file di codice dell'applicazione
- Crea e configura l'applicazione Managed Service for Apache Flink
- Approfondimenti
Crea risorse dipendenti
Prima di creare un'applicazione del servizio gestito per Apache Flink per questo esercizio, è necessario creare le seguenti risorse dipendenti:
-
Due flussi di dati Kinesis per input e output
-
Un bucket Amazon S3 per archiviare il codice dell'applicazione
Nota
Questo tutorial presuppone che l'applicazione venga distribuita nella regione us-east-1 Stati Uniti orientali (Virginia settentrionale). Se utilizzi un'altra regione, adatta tutti i passaggi di conseguenza.
Crea due flussi di dati Amazon Kinesis
Prima di creare un'applicazione del servizio gestito per Apache Flink per questo esercizio, crea due flussi di dati Kinesis (ExampleInputStream
e ExampleOutputStream
). L'applicazione utilizza questi flussi per i flussi di origine e di destinazione dell'applicazione.
Puoi creare questi stream utilizzando la console Amazon Kinesis o il AWS CLI seguente comando. Per istruzioni sulla console, consulta Creazione e aggiornamento dei flussi di dati nella Guida per gli sviluppatori del flusso di dati Amazon Kinesis. Per creare gli stream utilizzando AWS CLI, usa i seguenti comandi, adattandoli alla regione utilizzata per la tua applicazione.
Per creare i flussi di dati (AWS CLI)
-
Per creare il primo stream (
ExampleInputStream
), usa il seguente comando Amazon Kinesiscreate-stream
AWS CLI :$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \
-
Per creare il secondo stream utilizzato dall'applicazione per scrivere l'output, esegui lo stesso comando, modificando il nome dello stream in
ExampleOutputStream
:$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \
Crea un bucket Amazon S3 per il codice dell'applicazione
Puoi creare un bucket Amazon S3 utilizzando la relativa console. Per informazioni su come creare un bucket Amazon S3 utilizzando la console, consulta Creating a bucket nella Amazon S3 User Guide. Assegna un nome al bucket Amazon S3 utilizzando un nome univoco globale, ad esempio aggiungendo il tuo nome di accesso.
Nota
Assicurati di creare il bucket nella regione che usi per questo tutorial (us-east-1).
Altre risorse
Quando crei la tua applicazione, Managed Service for Apache Flink crea automaticamente CloudWatch le seguenti risorse Amazon se non esistono già:
-
Un gruppo di log denominato
/AWS/KinesisAnalytics-java/<my-application>
-
Un flusso di log denominato
kinesis-analytics-log-stream
Configurazione dell'ambiente di sviluppo locale
Per lo sviluppo e il debug, puoi eseguire l'applicazione Apache Flink sul tuo computer direttamente dal tuo computer preferito. IDE Tutte le dipendenze di Apache Flink vengono gestite come normali dipendenze Java utilizzando Apache Maven.
Nota
Sulla tua macchina di sviluppo, devi avere Java JDK 11, Maven e Git installati. Ti consigliamo di utilizzare un ambiente di sviluppo come Eclipse Java Neon
Autentica la tua sessione AWS
L'applicazione utilizza i flussi di dati Kinesis per pubblicare i dati. Quando si esegue localmente, è necessario disporre di una sessione AWS autenticata valida con autorizzazioni di scrittura nel flusso di dati Kinesis. Usa i seguenti passaggi per autenticare la tua sessione:
-
Se non hai configurato il profilo AWS CLI e un profilo denominato con credenziali valide, consulta. Configura il AWS Command Line Interface (AWS CLI)
-
Verifica che AWS CLI sia configurato correttamente e che gli utenti dispongano delle autorizzazioni per scrivere nel flusso di dati Kinesis pubblicando il seguente record di test:
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
-
Se hai IDE un plugin con cui integrarti AWS, puoi utilizzarlo per passare le credenziali all'applicazione in esecuzione su. IDE Per ulteriori informazioni, vedere AWS Toolkit for IDEA IntelliJ AWS e Toolkit for
Eclipse.
Scarica ed esamina il codice Java per lo streaming di Apache Flink
Il codice dell'applicazione Java per questo esempio è disponibile da. GitHub Per scaricare il codice dell'applicazione, esegui le operazioni descritte di seguito:
-
Clona il repository remoto con il comando seguente:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
-
Passa alla directory
amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted
.
Esamina i componenti dell'applicazione
L'applicazione è interamente implementata nella com.amazonaws.services.msf.BasicStreamingJob
classe. Il main()
metodo definisce il flusso di dati per elaborare i dati di streaming ed eseguirli.
Nota
Per un'esperienza di sviluppo ottimizzata, l'applicazione è progettata per essere eseguita senza modifiche al codice sia su Amazon Managed Service per Apache Flink che localmente, per lo sviluppo in azienda. IDE
-
Per leggere la configurazione di runtime in modo che funzioni durante l'esecuzione in Amazon Managed Service for Apache Flink e nel tuoIDE, l'applicazione rileva automaticamente se è in esecuzione autonoma localmente in. IDE In tal caso, l'applicazione carica la configurazione di runtime in modo diverso:
-
Quando l'applicazione rileva che è in esecuzione in modalità autonoma sul tuo computerIDE, forma il
application_properties.json
file incluso nella cartella delle risorse del progetto. Segue il contenuto del file. -
Quando l'applicazione viene eseguita in Amazon Managed Service for Apache Flink, il comportamento predefinito carica la configurazione dell'applicazione dalle proprietà di runtime che definirai nell'applicazione Amazon Managed Service for Apache Flink. Per informazioni, consulta Crea e configura l'applicazione Managed Service for Apache Flink.
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
-
-
Il
main()
metodo definisce il flusso di dati dell'applicazione e lo esegue.-
Inizializza gli ambienti di streaming predefiniti. In questo esempio, mostriamo come creare sia la tabella
StreamExecutionEnvironment
da usare con cheStreamTableEnvironment
quella da usare conSQL. DataSteam API API I due oggetti di ambiente sono due riferimenti separati allo stesso ambiente di runtime, da utilizzare in modo diversoAPIs.StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
Caricate i parametri di configurazione dell'applicazione. Questo li caricherà automaticamente dalla posizione corretta, a seconda di dove è in esecuzione l'applicazione:
Map<String, Properties> applicationParameters = loadApplicationProperties(env);
-
L'applicazione definisce una fonte utilizzando il connettore Kinesis Consumer
per leggere i dati dal flusso di input. La configurazione del flusso di input è definita in PropertyGroupId
=InputStream0
. Il nome e la regione del flusso si trovano nelle proprietà denominatestream.name
aws.region
rispettivamente. Per semplicità, questa fonte legge i record come una stringa.private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... }
-
L'applicazione definisce quindi un sink utilizzando il connettore Kinesis Streams Sink
per inviare i dati al flusso di output. Il nome e la regione del flusso di output sono definiti in PropertyGroupId
=OutputStream0
, in modo simile al flusso di input. Il sink è collegato direttamente all'unità internaDataStream
che riceve i dati dalla fonte. In un'applicazione reale, si verifica una trasformazione tra sorgente e sink.private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... }
-
Infine, esegui il flusso di dati che hai appena definito. Questa deve essere l'ultima istruzione del
main()
metodo, dopo aver definito tutti gli operatori richiesti dal flusso di dati:env.execute("Flink streaming Java API skeleton");
-
Utilizzate il file pom.xml
Il file pom.xml definisce tutte le dipendenze richieste dall'applicazione e configura il plugin Maven Shade per creare il fat-jar che contiene tutte le dipendenze richieste da Flink.
-
provided
Alcune dipendenze hanno un ambito. Queste dipendenze sono disponibili automaticamente quando l'applicazione viene eseguita in Amazon Managed Service for Apache Flink. Sono necessarie per compilare l'applicazione o per eseguirla localmente nel tuo. IDE Per ulteriori informazioni, consulta Esegui l'applicazione localmente. Assicurati di utilizzare la stessa versione di Flink del runtime che utilizzerai in Amazon Managed Service for Apache Flink.<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
È necessario aggiungere ulteriori dipendenze Apache Flink al pom con l'ambito predefinito, come il connettore Kinesis
utilizzato da questa applicazione. Per ulteriori informazioni, consulta Usa i connettori Apache Flink con Managed Service for Apache Flink. È inoltre possibile aggiungere eventuali dipendenze Java aggiuntive richieste dall'applicazione. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
-
Il plugin Maven Java Compiler assicura che il codice sia compilato con Java 11, la JDK versione attualmente supportata da Apache Flink.
-
Il plugin Maven Shade impacchetta il fat-jar, escludendo alcune librerie fornite dal runtime. Inoltre specifica due trasformatori: e.
ServicesResourceTransformer
ManifestResourceTransformer
Quest'ultimo configura la classe contenente ilmain
metodo per avviare l'applicazione. Se rinomini la classe principale, non dimenticare di aggiornare questo trasformatore. -
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>
Scrivi record di esempio nel flusso di input
In questa sezione, invierai record di esempio allo stream per l'elaborazione dell'applicazione. Sono disponibili due opzioni per generare dati di esempio, utilizzando uno script Python o il Kinesis
Genera dati di esempio usando uno script Python
Puoi usare uno script Python per inviare record di esempio allo stream.
Nota
Per eseguire questo script Python, devi usare Python 3.x e avere installata la libreria for AWS SDKPython
Per iniziare a inviare i dati di test al flusso di input Kinesis:
-
Scarica lo script
stock.py
Python del generatore di dati dal repository Data generator GitHub. -
Esegui lo script
stock.py
:$ python stock.py
Mantieni lo script in esecuzione mentre completi il resto del tutorial. Ora puoi eseguire la tua applicazione Apache Flink.
Generazione di dati di esempio utilizzando Kinesis Data Generator
In alternativa all'utilizzo dello script Python, puoi usare Kinesis Data Generator
Per configurare ed eseguire Kinesis Data Generator:
-
Segui le istruzioni nella documentazione di Kinesis Data Generator
per configurare l'accesso allo strumento. Eseguirai un AWS CloudFormation modello che imposta un utente e una password. -
Accedi a Kinesis Data Generator tramite il file URL generato dal CloudFormation modello. Puoi trovarlo URL nella scheda Output dopo aver completato il CloudFormation modello.
-
Configura il generatore di dati:
-
Regione: Seleziona la regione che stai utilizzando per questo tutorial: us-east-1
-
Stream/flusso di consegna: seleziona il flusso di input che l'applicazione utilizzerà:
ExampleInputStream
-
Record al secondo: 100
-
Modello di registrazione: copia e incolla il seguente modello:
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
Prova il modello: scegli Modello di test e verifica che il record generato sia simile al seguente:
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
-
Avvia il generatore di dati: scegli Seleziona Invia dati.
Kinesis Data Generator sta ora inviando dati a. ExampleInputStream
Esegui l'applicazione localmente
Puoi eseguire ed eseguire il debug della tua applicazione Flink localmente nel tuo. IDE
Nota
Prima di continuare, verificate che i flussi di input e output siano disponibili. Per informazioni, consulta Crea due flussi di dati Amazon Kinesis. Inoltre, verifica di disporre dell'autorizzazione per leggere e scrivere da entrambi gli stream. Per informazioni, consulta Autentica la tua sessione AWS.
La configurazione dell'ambiente di sviluppo locale richiede Java 11JDK, Apache Maven e e IDE per lo sviluppo di Java. Verifica di soddisfare i prerequisiti richiesti. Per informazioni, consulta Soddisfa i prerequisiti per completare gli esercizi.
Importa il progetto Java nel tuo IDE
Per iniziare a lavorare sull'applicazione in usoIDE, è necessario importarla come progetto Java.
Il repository che avete clonato contiene diversi esempi. Ogni esempio è un progetto separato. Per questo tutorial, importa il contenuto della ./java/GettingStarted
sottodirectory nella tuaIDE.
Inserisci il codice come progetto Java esistente usando Maven.
Nota
Il processo esatto per importare un nuovo progetto Java varia a seconda del tipo di progetto IDE utilizzato.
Controllate la configurazione locale dell'applicazione
Quando viene eseguita localmente, l'applicazione utilizza la configurazione contenuta nel application_properties.json
file nella cartella delle risorse del progetto sotto./src/main/resources
. È possibile modificare questo file per utilizzare diversi nomi o regioni di stream Kinesis.
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
Configura la configurazione di IDE esecuzione
Puoi eseguire ed eseguire il debug dell'applicazione Flink IDE direttamente dal tuo eseguendo la classe principalecom.amazonaws.services.msf.BasicStreamingJob
, come faresti con qualsiasi applicazione Java. Prima di eseguire l'applicazione, è necessario configurare la configurazione Esegui. La configurazione dipende dalla IDE versione in uso. Ad esempio, vedete le configurazioni di esecuzione/debug nella
-
Aggiungi le
provided
dipendenze al classpath. Ciò è necessario per assicurarsi che le dipendenze conprovided
scope vengano passate all'applicazione durante l'esecuzione locale. Senza questa configurazione, l'applicazione visualizza immediatamente unclass not found
errore. -
Passa AWS le credenziali per accedere agli stream Kinesis all'applicazione. Il modo più veloce è usare AWS Toolkit for IntelliJ IDEA
. Utilizzando questo IDE plugin nella configurazione Run, è possibile selezionare un profilo specifico AWS . AWS l'autenticazione avviene utilizzando questo profilo. Non è necessario passare direttamente AWS le credenziali. -
Verificare che l'applicazione venga IDE eseguita utilizzando JDK11.
Esegui l'applicazione nel tuo IDE
Dopo aver impostato la configurazione Run perBasicStreamingJob
, potete eseguirla o eseguire il debug come una normale applicazione Java.
Nota
Non è possibile eseguire il fat-jar generato da Maven direttamente dalla riga di java -jar
...
comando. Questo jar non contiene le dipendenze principali di Flink necessarie per eseguire l'applicazione in modo autonomo.
Quando l'applicazione viene avviata correttamente, registra alcune informazioni sul minicluster autonomo e sull'inizializzazione dei connettori. Seguono una serie di WARN registri INFO e alcuni che Flink normalmente emette all'avvio dell'applicazione.
13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....
Una volta completata l'inizializzazione, l'applicazione non emette ulteriori voci di registro. Durante il flusso di dati, non viene emesso alcun registro.
Per verificare se l'applicazione sta elaborando correttamente i dati, puoi controllare i flussi Kinesis di input e output, come descritto nella sezione seguente.
Nota
Il comportamento normale di un'applicazione Flink è quello di non emettere registri relativi al flusso di dati. L'emissione di registri su ogni record può essere utile per il debug, ma può comportare un notevole sovraccarico durante l'esecuzione in produzione.
Osserva i dati di input e output nei flussi Kinesis
Puoi osservare i record inviati al flusso di input da (Python di esempio che genera) o Kinesis Data Generator (link) utilizzando Data Viewer nella console Amazon Kinesis.
Per osservare i record
Apri la console Kinesis in /kinesis. https://console.aws.amazon.com
-
Verifica che la regione sia la stessa in cui stai eseguendo questo tutorial, che per impostazione predefinita è us-east-1 US East (Virginia settentrionale). Cambia la regione se non corrisponde.
-
Scegli Data Streams.
-
Seleziona lo stream che desideri osservare, oppure
ExampleInputStream
ExampleOutputStream.
-
Scegli la scheda Visualizzatore dati.
-
Scegli uno Shard, mantieni Ultimo come posizione iniziale, quindi scegli Ottieni record. Potresti visualizzare l'errore «Nessun record trovato per questa richiesta». In tal caso, scegli Riprova a recuperare i record. Vengono visualizzati i record più recenti pubblicati nello stream.
-
Scegliete il valore nella colonna Dati per esaminare il contenuto del record nel JSON formato.
Arresta l'esecuzione locale dell'applicazione
Arresta l'applicazione in esecuzione suIDE. IDEDi solito fornisce un'opzione di «stop». La posizione e il metodo esatti dipendono da quello che IDE stai utilizzando.
Compila e impacchetta il codice dell'applicazione
In questa sezione, si utilizza Apache Maven per compilare il codice Java e impacchettarlo in un file. JAR Puoi compilare e impacchettare il tuo codice usando lo strumento da riga di comando Maven o il tuo. IDE
Per compilare e impacchettare usando la riga di comando Maven:
Passa alla directory contenente il GettingStarted progetto Java ed esegui il seguente comando:
$ mvn package
Per compilare e impacchettare usando il tuoIDE:
Esegui mvn package
dalla tua integrazione con IDE Maven.
In entrambi i casi, viene creato il seguente JAR file:. target/amazon-msf-java-stream-app-1.0.jar
Nota
L'esecuzione di un «progetto di compilazione» dal tuo IDE potrebbe non creare il JAR file.
Carica il JAR file di codice dell'applicazione
In questa sezione, carichi il JAR file creato nella sezione precedente nel bucket Amazon Simple Storage Service (Amazon S3) creato all'inizio di questo tutorial. Se non hai completato questo passaggio, consulta (link).
Per caricare il JAR file di codice dell'applicazione
Apri la console Amazon S3 all'indirizzo. https://console.aws.amazon.com/s3/
-
Scegli il bucket che hai creato in precedenza per il codice dell'applicazione.
-
Scegli Carica.
-
Scegliere Add files (Aggiungi file).
-
Vai al JAR file generato nel passaggio precedente:
target/amazon-msf-java-stream-app-1.0.jar
. -
Scegli Carica senza modificare altre impostazioni.
avvertimento
Assicurati di selezionare il JAR file corretto in<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar
.
La target
directory contiene anche altri JAR file che non è necessario caricare.
Crea e configura l'applicazione Managed Service for Apache Flink
È possibile creare ed eseguire un'applicazione del servizio gestito per Apache Flink utilizzando la console o la AWS CLI. Per questo tutorial, utilizzerai la console.
Nota
Quando crei l'applicazione utilizzando la console, le tue risorse AWS Identity and Access Management (IAM) e Amazon CloudWatch Logs vengono create automaticamente. Quando crei l'applicazione utilizzando AWS CLI, crei queste risorse separatamente.
Argomenti
Creazione dell'applicazione
Per creare l'applicazione
Apri la console Managed Service for Apache Flink all'indirizzo /flink https://console.aws.amazon.com
-
Verifica che sia selezionata la regione corretta: us-east-1 Stati Uniti orientali (Virginia settentrionale)
-
Apri il menu a destra e scegli Applicazioni Apache Flink, quindi Crea applicazione di streaming. In alternativa, scegli Crea applicazione di streaming nel contenitore Get started della pagina iniziale.
-
Nella pagina Crea applicazione di streaming:
-
Scegliete un metodo per configurare l'applicazione di elaborazione dello stream: scegliete Crea da zero.
-
Configurazione Apache Flink, versione Application Flink: scegli Apache Flink 1.19.
-
-
Configura la tua applicazione
-
Nome dell'applicazione: invio
MyApplication
. -
Descrizione: immettere
My java test app
. -
Accesso alle risorse dell'applicazione: scegli Crea/aggiorna IAM il ruolo
kinesis-analytics-MyApplication-us-east-1
con le politiche richieste.
-
-
Configura il tuo modello per le impostazioni dell'applicazione
-
Modelli: scegli Sviluppo.
-
-
Scegli Crea applicazione di streaming nella parte inferiore della pagina.
Nota
Quando crei un servizio gestito per l'applicazione Apache Flink utilizzando la console, hai la possibilità di creare un IAM ruolo e una policy per la tua applicazione. L'applicazione utilizza questo ruolo e questa policy per accedere alle sue risorse dipendenti. Queste IAM risorse vengono denominate utilizzando il nome e la regione dell'applicazione come segue:
-
Policy:
kinesis-analytics-service-
MyApplication
-us-east-1
-
Ruolo:
kinesisanalytics-
MyApplication
-us-east-1
Amazon Managed Service for Apache Flink era precedentemente noto come Kinesis Data Analytics. Il nome delle risorse che vengono create automaticamente ha il prefisso per garantire la compatibilità con le versioni precedenti. kinesis-analytics-
Modifica la politica IAM
Modifica la IAM policy per aggiungere le autorizzazioni per accedere ai flussi di dati Kinesis.
Per modificare la politica
Apri la IAM console all'indirizzo https://console.aws.amazon.com/iam/
. -
Seleziona Policy. Scegli la policy
kinesis-analytics-service-MyApplication-us-east-1
creata dalla console nella sezione precedente. -
Scegli Modifica, quindi scegli la JSONscheda.
-
Aggiungi alla policy la sezione evidenziata del seguente esempio di policy. Sostituisci l'account di esempio IDs (
012345678901
) con l'ID del tuo account.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:
012345678901
:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901
:stream/ExampleOutputStream" } -
Scegli Avanti nella parte inferiore della pagina, quindi scegli Salva modifiche.
Configura l'applicazione
Modifica la configurazione dell'applicazione per impostare l'elemento del codice dell'applicazione.
Per modificare la configurazione
-
Nella MyApplicationpagina, scegli Configura.
-
Nella sezione Posizione del codice dell'applicazione:
-
Per il bucket Amazon S3, seleziona il bucket creato in precedenza per il codice dell'applicazione. Scegli Sfoglia e seleziona il bucket corretto, quindi seleziona Scegli. Non fare clic sul nome del bucket.
-
Per Percorso dell'oggetto Amazon S3, inserisci
amazon-msf-java-stream-app-1.0.jar
-
-
Per le autorizzazioni di accesso, scegli Crea/aggiorna il IAM ruolo
kinesis-analytics-MyApplication-us-east-1
con le politiche richieste. -
Nella sezione Proprietà di runtime, aggiungi le seguenti proprietà.
-
Scegliete Aggiungi nuovo elemento e aggiungete ciascuno dei seguenti parametri:
ID gruppo Chiave Valore InputStream0
stream.name
ExampleInputStream
InputStream0
aws.region
us-east-1
OutputStream0
stream.name
ExampleOutputStream
OutputStream0
aws.region
us-east-1
-
Non modificate nessuna delle altre sezioni.
-
Scegli Save changes (Salva modifiche).
Nota
Quando scegli di abilitare la CloudWatch registrazione di Amazon, Managed Service for Apache Flink crea un gruppo di log e un flusso di log per te. I nomi di tali risorse sono i seguenti:
-
Gruppo di log:
/aws/kinesis-analytics/MyApplication
-
Flusso di log:
kinesis-analytics-log-stream
Esecuzione dell'applicazione.
L'applicazione è ora configurata e pronta per l'esecuzione.
Per eseguire l'applicazione
-
Sulla console per Amazon Managed Service for Apache Flink, scegli La mia applicazione e scegli Esegui.
-
Nella pagina successiva, nella pagina di configurazione del ripristino dell'applicazione, scegli Esegui con l'ultima istantanea, quindi scegli Esegui.
Lo stato nell'applicazione descrive in dettaglio le transizioni da
Ready
Starting
e poi aRunning
quando l'applicazione è stata avviata.
Quando l'applicazione è nello Running
stato, ora puoi aprire la dashboard di Flink.
Per aprire il pannello di controllo
-
Scegli Apri la dashboard di Apache Flink. La dashboard si apre in una nuova pagina.
-
Nell'elenco dei lavori in esecuzione, scegli il singolo lavoro che puoi vedere.
Nota
Se hai impostato le proprietà di Runtime o hai modificato le IAM politiche in modo errato, lo stato dell'applicazione potrebbe cambiare
Running
, ma la dashboard di Flink mostra che il lavoro viene riavviato continuamente. Si tratta di uno scenario di errore comune se l'applicazione non è configurata correttamente o non dispone delle autorizzazioni per accedere alle risorse esterne.Quando ciò accade, controlla la scheda Eccezioni nella dashboard di Flink per vedere la causa del problema.
Osserva le metriche dell'applicazione in esecuzione
Nella MyApplicationpagina, nella sezione Amazon CloudWatch metrics, puoi vedere alcune delle metriche fondamentali dell'applicazione in esecuzione.
Per visualizzare le metriche
-
Accanto al pulsante Aggiorna, seleziona 10 secondi dall'elenco a discesa.
-
Quando l'applicazione è in esecuzione ed è integra, puoi vedere la metrica di uptime aumentare continuamente.
-
La metrica fullrestarts deve essere zero. Se è in aumento, la configurazione potrebbe presentare dei problemi. Per esaminare il problema, consulta la scheda Eccezioni nella dashboard di Flink.
-
La metrica del numero di checkpoint non riusciti deve essere pari a zero in un'applicazione integra.
Nota
Questa dashboard mostra un set fisso di metriche con una granularità di 5 minuti. Puoi creare una dashboard applicativa personalizzata con qualsiasi metrica nella dashboard. CloudWatch
Osserva i dati di output nei flussi Kinesis
Assicurati di continuare a pubblicare i dati sull'input, usando lo script Python o il Kinesis Data Generator.
È ora possibile osservare l'output dell'applicazione in esecuzione su Managed Service for Apache Flink utilizzando il Data Viewer in https://console.aws.amazon.com/kinesis/
Per visualizzare l'output
Apri la console Kinesis in /kinesis. https://console.aws.amazon.com
-
Verifica che la regione sia la stessa che stai usando per eseguire questo tutorial. Per impostazione predefinita, è US-East-1US East (Virginia settentrionale). Se necessario, modificare la regione.
-
Scegli Data Streams.
-
Seleziona lo stream che desideri osservare. Ai fini di questo tutorial, utilizza
ExampleOutputStream
. -
Scegli la scheda Data viewer.
-
Seleziona uno Shard, mantieni Ultimo come posizione iniziale, quindi scegli Ottieni record. Potresti visualizzare l'errore «nessun record trovato per questa richiesta». In tal caso, scegli Riprova a recuperare i record. Vengono visualizzati i record più recenti pubblicati nello stream.
-
Seleziona il valore nella colonna Dati per esaminare il contenuto del record nel JSON formato.
Arresta l'applicazione
Per interrompere l'applicazione, vai alla pagina della console dell'applicazione Managed Service for Apache Flink denominata. MyApplication
Per interrompere l'applicazione
-
Dall'elenco a discesa Azione, scegli Stop.
-
Lo stato nell'applicazione descrive in dettaglio le transizioni da
Running
e quindi aReady
quando l'applicazione viene completamente interrotta.Stopping
Nota
Non dimenticare di interrompere anche l'invio di dati al flusso di input dallo script Python o dal Kinesis Data Generator.