Crea ed esegui un servizio gestito per l'applicazione Apache Flink - Servizio gestito per Apache Flink

Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Crea ed esegui un servizio gestito per l'applicazione Apache Flink

In questo passaggio, crei un servizio gestito per l'applicazione Apache Flink con flussi di dati Kinesis come origine e sink.

Crea risorse dipendenti

Prima di creare un'applicazione del servizio gestito per Apache Flink per questo esercizio, è necessario creare le seguenti risorse dipendenti:

  • Due flussi di dati Kinesis per input e output

  • Un bucket Amazon S3 per archiviare il codice dell'applicazione

    Nota

    Questo tutorial presuppone che l'applicazione venga distribuita nella regione us-east-1 Stati Uniti orientali (Virginia settentrionale). Se utilizzi un'altra regione, adatta tutti i passaggi di conseguenza.

Crea due flussi di dati Amazon Kinesis

Prima di creare un'applicazione del servizio gestito per Apache Flink per questo esercizio, crea due flussi di dati Kinesis (ExampleInputStream e ExampleOutputStream). L'applicazione utilizza questi flussi per i flussi di origine e di destinazione dell'applicazione.

Puoi creare questi stream utilizzando la console Amazon Kinesis o il AWS CLI seguente comando. Per istruzioni sulla console, consulta Creazione e aggiornamento dei flussi di dati nella Guida per gli sviluppatori del flusso di dati Amazon Kinesis. Per creare gli stream utilizzando AWS CLI, usa i seguenti comandi, adattandoli alla regione utilizzata per la tua applicazione.

Per creare i flussi di dati (AWS CLI)
  1. Per creare il primo stream (ExampleInputStream), usa il seguente comando Amazon Kinesis create-stream AWS CLI :

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \
  2. Per creare il secondo stream utilizzato dall'applicazione per scrivere l'output, esegui lo stesso comando, modificando il nome dello stream inExampleOutputStream:

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \

Crea un bucket Amazon S3 per il codice dell'applicazione

Puoi creare un bucket Amazon S3 utilizzando la relativa console. Per informazioni su come creare un bucket Amazon S3 utilizzando la console, consulta Creating a bucket nella Amazon S3 User Guide. Assegna un nome al bucket Amazon S3 utilizzando un nome univoco globale, ad esempio aggiungendo il tuo nome di accesso.

Nota

Assicurati di creare il bucket nella regione che usi per questo tutorial (us-east-1).

Altre risorse

Quando crei la tua applicazione, Managed Service for Apache Flink crea automaticamente CloudWatch le seguenti risorse Amazon se non esistono già:

  • Un gruppo di log denominato /AWS/KinesisAnalytics-java/<my-application>

  • Un flusso di log denominato kinesis-analytics-log-stream

Configurazione dell'ambiente di sviluppo locale

Per lo sviluppo e il debug, puoi eseguire l'applicazione Apache Flink sul tuo computer direttamente dal tuo computer preferito. IDE Tutte le dipendenze di Apache Flink vengono gestite come normali dipendenze Java utilizzando Apache Maven.

Nota

Sulla tua macchina di sviluppo, devi avere Java JDK 11, Maven e Git installati. Ti consigliamo di utilizzare un ambiente di sviluppo come Eclipse Java Neon o IntelliJ. IDEA Per verificare di soddisfare tutti i prerequisiti, vedere. Soddisfa i prerequisiti per completare gli esercizi Non è necessario installare un cluster Apache Flink sul computer.

Autentica la tua sessione AWS

L'applicazione utilizza i flussi di dati Kinesis per pubblicare i dati. Quando si esegue localmente, è necessario disporre di una sessione AWS autenticata valida con autorizzazioni di scrittura nel flusso di dati Kinesis. Usa i seguenti passaggi per autenticare la tua sessione:

  1. Se non hai configurato il profilo AWS CLI e un profilo denominato con credenziali valide, consulta. Configura il AWS Command Line Interface (AWS CLI)

  2. Verifica che AWS CLI sia configurato correttamente e che gli utenti dispongano delle autorizzazioni per scrivere nel flusso di dati Kinesis pubblicando il seguente record di test:

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. Se hai IDE un plugin con cui integrarti AWS, puoi utilizzarlo per passare le credenziali all'applicazione in esecuzione su. IDE Per ulteriori informazioni, vedere AWS Toolkit for IDEA IntelliJ AWS e Toolkit for Eclipse.

Scarica ed esamina il codice Java per lo streaming di Apache Flink

Il codice dell'applicazione Java per questo esempio è disponibile da. GitHub Per scaricare il codice dell'applicazione, esegui le operazioni descritte di seguito:

  1. Clona il repository remoto con il comando seguente:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. Passa alla directory amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted.

Esamina i componenti dell'applicazione

L'applicazione è interamente implementata nella com.amazonaws.services.msf.BasicStreamingJob classe. Il main() metodo definisce il flusso di dati per elaborare i dati di streaming ed eseguirli.

Nota

Per un'esperienza di sviluppo ottimizzata, l'applicazione è progettata per essere eseguita senza modifiche al codice sia su Amazon Managed Service per Apache Flink che localmente, per lo sviluppo in azienda. IDE

  • Per leggere la configurazione di runtime in modo che funzioni durante l'esecuzione in Amazon Managed Service for Apache Flink e nel tuoIDE, l'applicazione rileva automaticamente se è in esecuzione autonoma localmente in. IDE In tal caso, l'applicazione carica la configurazione di runtime in modo diverso:

    1. Quando l'applicazione rileva che è in esecuzione in modalità autonoma sul tuo computerIDE, forma il application_properties.json file incluso nella cartella delle risorse del progetto. Segue il contenuto del file.

    2. Quando l'applicazione viene eseguita in Amazon Managed Service for Apache Flink, il comportamento predefinito carica la configurazione dell'applicazione dalle proprietà di runtime che definirai nell'applicazione Amazon Managed Service for Apache Flink. Per informazioni, consulta Crea e configura l'applicazione Managed Service for Apache Flink.

      private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
  • Il main() metodo definisce il flusso di dati dell'applicazione e lo esegue.

    • Inizializza gli ambienti di streaming predefiniti. In questo esempio, mostriamo come creare sia la tabella StreamExecutionEnvironment da usare con che StreamTableEnvironment quella da usare conSQL. DataSteam API API I due oggetti di ambiente sono due riferimenti separati allo stesso ambiente di runtime, da utilizzare in modo diversoAPIs.

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    • Caricate i parametri di configurazione dell'applicazione. Questo li caricherà automaticamente dalla posizione corretta, a seconda di dove è in esecuzione l'applicazione:

      Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    • L'applicazione definisce una fonte utilizzando il connettore Kinesis Consumer per leggere i dati dal flusso di input. La configurazione del flusso di input è definita in PropertyGroupId =InputStream0. Il nome e la regione del flusso si trovano nelle proprietà denominate stream.name aws.region rispettivamente. Per semplicità, questa fonte legge i record come una stringa.

      private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... }
    • L'applicazione definisce quindi un sink utilizzando il connettore Kinesis Streams Sink per inviare i dati al flusso di output. Il nome e la regione del flusso di output sono definiti in PropertyGroupId =OutputStream0, in modo simile al flusso di input. Il sink è collegato direttamente all'unità interna DataStream che riceve i dati dalla fonte. In un'applicazione reale, si verifica una trasformazione tra sorgente e sink.

      private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... }
    • Infine, esegui il flusso di dati che hai appena definito. Questa deve essere l'ultima istruzione del main() metodo, dopo aver definito tutti gli operatori richiesti dal flusso di dati:

      env.execute("Flink streaming Java API skeleton");

Utilizzate il file pom.xml

Il file pom.xml definisce tutte le dipendenze richieste dall'applicazione e configura il plugin Maven Shade per creare il fat-jar che contiene tutte le dipendenze richieste da Flink.

  • providedAlcune dipendenze hanno un ambito. Queste dipendenze sono disponibili automaticamente quando l'applicazione viene eseguita in Amazon Managed Service for Apache Flink. Sono necessarie per compilare l'applicazione o per eseguirla localmente nel tuo. IDE Per ulteriori informazioni, consulta Esegui l'applicazione localmente. Assicurati di utilizzare la stessa versione di Flink del runtime che utilizzerai in Amazon Managed Service for Apache Flink.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • È necessario aggiungere ulteriori dipendenze Apache Flink al pom con l'ambito predefinito, come il connettore Kinesis utilizzato da questa applicazione. Per ulteriori informazioni, consulta Usa i connettori Apache Flink. È inoltre possibile aggiungere eventuali dipendenze Java aggiuntive richieste dall'applicazione.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
  • Il plugin Maven Java Compiler assicura che il codice sia compilato con Java 11, la JDK versione attualmente supportata da Apache Flink.

  • Il plugin Maven Shade impacchetta il fat-jar, escludendo alcune librerie fornite dal runtime. Inoltre specifica due trasformatori: e. ServicesResourceTransformer ManifestResourceTransformer Quest'ultimo configura la classe contenente il main metodo per avviare l'applicazione. Se rinomini la classe principale, non dimenticare di aggiornare questo trasformatore.

  • <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>

Scrivi record di esempio nel flusso di input

In questa sezione, invierai record di esempio allo stream per l'elaborazione dell'applicazione. Sono disponibili due opzioni per generare dati di esempio, utilizzando uno script Python o il Kinesis Data Generator.

Genera dati di esempio usando uno script Python

Puoi usare uno script Python per inviare record di esempio allo stream.

Nota

Per eseguire questo script Python, devi usare Python 3.x e avere installata la libreria for AWS SDKPython (Boto).

Per iniziare a inviare i dati di test al flusso di input Kinesis:

  1. Scarica lo script stock.py Python del generatore di dati dal repository Data generator GitHub .

  2. Esegui lo script stock.py:

    $ python stock.py

Mantieni lo script in esecuzione mentre completi il resto del tutorial. Ora puoi eseguire la tua applicazione Apache Flink.

Generazione di dati di esempio utilizzando Kinesis Data Generator

In alternativa all'utilizzo dello script Python, puoi usare Kinesis Data Generator, disponibile anche in una versione ospitata, per inviare dati di esempio casuali allo stream. Kinesis Data Generator viene eseguito nel browser e non è necessario installare nulla sul computer.

Per configurare ed eseguire Kinesis Data Generator:

  1. Segui le istruzioni nella documentazione di Kinesis Data Generator per configurare l'accesso allo strumento. Eseguirai un AWS CloudFormation modello che imposta un utente e una password.

  2. Accedi a Kinesis Data Generator tramite il file URL generato dal CloudFormation modello. Puoi trovarlo URL nella scheda Output dopo aver completato il CloudFormation modello.

  3. Configura il generatore di dati:

    • Regione: Seleziona la regione che stai utilizzando per questo tutorial: us-east-1

    • Stream/flusso di consegna: seleziona il flusso di input che l'applicazione utilizzerà: ExampleInputStream

    • Record al secondo: 100

    • Modello di registrazione: copia e incolla il seguente modello:

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. Prova il modello: scegli Modello di test e verifica che il record generato sia simile al seguente:

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. Avvia il generatore di dati: scegli Seleziona Invia dati.

Kinesis Data Generator sta ora inviando dati a. ExampleInputStream

Esegui l'applicazione localmente

Puoi eseguire ed eseguire il debug della tua applicazione Flink localmente nel tuo. IDE

Nota

Prima di continuare, verificate che i flussi di input e output siano disponibili. Per informazioni, consulta Crea due flussi di dati Amazon Kinesis. Inoltre, verifica di disporre dell'autorizzazione per leggere e scrivere da entrambi gli stream. Per informazioni, consulta Autentica la tua sessione AWS.

La configurazione dell'ambiente di sviluppo locale richiede Java 11JDK, Apache Maven e e IDE per lo sviluppo di Java. Verifica di soddisfare i prerequisiti richiesti. Per informazioni, consulta Soddisfa i prerequisiti per completare gli esercizi.

Importa il progetto Java nel tuo IDE

Per iniziare a lavorare sull'applicazione in usoIDE, è necessario importarla come progetto Java.

Il repository che avete clonato contiene diversi esempi. Ogni esempio è un progetto separato. Per questo tutorial, importa il contenuto della ./java/GettingStarted sottodirectory nella tuaIDE.

Inserisci il codice come progetto Java esistente usando Maven.

Nota

Il processo esatto per importare un nuovo progetto Java varia a seconda del tipo di progetto IDE utilizzato.

Controllate la configurazione locale dell'applicazione

Quando viene eseguita localmente, l'applicazione utilizza la configurazione contenuta nel application_properties.json file nella cartella delle risorse del progetto sotto./src/main/resources. È possibile modificare questo file per utilizzare diversi nomi o regioni di stream Kinesis.

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

Configura la configurazione di IDE esecuzione

Puoi eseguire ed eseguire il debug dell'applicazione Flink IDE direttamente dal tuo eseguendo la classe principalecom.amazonaws.services.msf.BasicStreamingJob, come faresti con qualsiasi applicazione Java. Prima di eseguire l'applicazione, è necessario configurare la configurazione Esegui. La configurazione dipende dalla IDE versione in uso. Ad esempio, vedete le configurazioni di esecuzione/debug nella documentazione di IntelliJ. IDEA In particolare, è necessario configurare quanto segue:

  1. Aggiungi le provided dipendenze al classpath. Ciò è necessario per assicurarsi che le dipendenze con provided scope vengano passate all'applicazione durante l'esecuzione locale. Senza questa configurazione, l'applicazione visualizza immediatamente un class not found errore.

  2. Passa AWS le credenziali per accedere agli stream Kinesis all'applicazione. Il modo più veloce è usare AWS Toolkit for IntelliJ IDEA. Utilizzando questo IDE plugin nella configurazione Run, è possibile selezionare un profilo specifico AWS . AWS l'autenticazione avviene utilizzando questo profilo. Non è necessario passare direttamente AWS le credenziali.

  3. Verificare che l'applicazione venga IDE eseguita utilizzando JDK11.

Esegui l'applicazione nel tuo IDE

Dopo aver impostato la configurazione Run perBasicStreamingJob, potete eseguirla o eseguire il debug come una normale applicazione Java.

Nota

Non è possibile eseguire il fat-jar generato da Maven direttamente dalla riga di java -jar ... comando. Questo jar non contiene le dipendenze principali di Flink necessarie per eseguire l'applicazione in modo autonomo.

Quando l'applicazione viene avviata correttamente, registra alcune informazioni sul minicluster autonomo e sull'inizializzazione dei connettori. Seguono una serie di WARN registri INFO e alcuni che Flink normalmente emette all'avvio dell'applicazione.

13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....

Una volta completata l'inizializzazione, l'applicazione non emette ulteriori voci di registro. Durante il flusso di dati, non viene emesso alcun registro.

Per verificare se l'applicazione sta elaborando correttamente i dati, puoi controllare i flussi Kinesis di input e output, come descritto nella sezione seguente.

Nota

Il comportamento normale di un'applicazione Flink è quello di non emettere registri relativi al flusso di dati. L'emissione di registri su ogni record può essere utile per il debug, ma può comportare un notevole sovraccarico durante l'esecuzione in produzione.

Osserva i dati di input e output nei flussi Kinesis

Puoi osservare i record inviati al flusso di input da (Python di esempio che genera) o Kinesis Data Generator (link) utilizzando Data Viewer nella console Amazon Kinesis.

Per osservare i record
  1. Apri la console Kinesis in /kinesis. https://console.aws.amazon.com

  2. Verifica che la regione sia la stessa in cui stai eseguendo questo tutorial, che per impostazione predefinita è us-east-1 US East (Virginia settentrionale). Cambia la regione se non corrisponde.

  3. Scegli Data Streams.

  4. Seleziona lo stream che desideri osservare, oppure ExampleInputStream ExampleOutputStream.

  5. Scegli la scheda Visualizzatore dati.

  6. Scegli uno Shard, mantieni Ultimo come posizione iniziale, quindi scegli Ottieni record. Potresti visualizzare l'errore «Nessun record trovato per questa richiesta». In tal caso, scegli Riprova a recuperare i record. Vengono visualizzati i record più recenti pubblicati nello stream.

  7. Scegliete il valore nella colonna Dati per esaminare il contenuto del record nel JSON formato.

Arresta l'esecuzione locale dell'applicazione

Arresta l'applicazione in esecuzione suIDE. IDEDi solito fornisce un'opzione di «stop». La posizione e il metodo esatti dipendono da quello che IDE stai utilizzando.

Compila e impacchetta il codice dell'applicazione

In questa sezione, si utilizza Apache Maven per compilare il codice Java e impacchettarlo in un file. JAR Puoi compilare e impacchettare il tuo codice usando lo strumento da riga di comando Maven o il tuo. IDE

Per compilare e impacchettare usando la riga di comando Maven:

Passa alla directory contenente il GettingStarted progetto Java ed esegui il seguente comando:

$ mvn package

Per compilare e impacchettare usando il tuoIDE:

Esegui mvn package dalla tua integrazione con IDE Maven.

In entrambi i casi, viene creato il seguente JAR file:. target/amazon-msf-java-stream-app-1.0.jar

Nota

L'esecuzione di un «progetto di compilazione» dal tuo IDE potrebbe non creare il JAR file.

Carica il JAR file di codice dell'applicazione

In questa sezione, carichi il JAR file creato nella sezione precedente nel bucket Amazon Simple Storage Service (Amazon S3) creato all'inizio di questo tutorial. Se non hai completato questo passaggio, consulta (link).

Per caricare il JAR file di codice dell'applicazione
  1. Apri la console Amazon S3 all'indirizzo. https://console.aws.amazon.com/s3/

  2. Scegli il bucket che hai creato in precedenza per il codice dell'applicazione.

  3. Scegli Carica.

  4. Scegliere Add files (Aggiungi file).

  5. Vai al JAR file generato nel passaggio precedente:target/amazon-msf-java-stream-app-1.0.jar.

  6. Scegli Carica senza modificare altre impostazioni.

avvertimento

Assicurati di selezionare il JAR file corretto in<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar.

La target directory contiene anche altri JAR file che non è necessario caricare.

Crea e configura l'applicazione Managed Service for Apache Flink

È possibile creare ed eseguire un'applicazione del servizio gestito per Apache Flink utilizzando la console o la AWS CLI. Per questo tutorial, utilizzerai la console.

Nota

Quando crei l'applicazione utilizzando la console, le tue risorse AWS Identity and Access Management (IAM) e Amazon CloudWatch Logs vengono create automaticamente. Quando crei l'applicazione utilizzando AWS CLI, crei queste risorse separatamente.

Creazione dell'applicazione

Per creare l'applicazione
  1. Apri la console Managed Service for Apache Flink all'indirizzo /flink https://console.aws.amazon.com

  2. Verifica che sia selezionata la regione corretta: us-east-1 Stati Uniti orientali (Virginia settentrionale)

  3. Apri il menu a destra e scegli Applicazioni Apache Flink, quindi Crea applicazione di streaming. In alternativa, scegli Crea applicazione di streaming nel contenitore Get started della pagina iniziale.

  4. Nella pagina Crea applicazione di streaming:

    • Scegliete un metodo per configurare l'applicazione di elaborazione dello stream: scegliete Crea da zero.

    • Configurazione Apache Flink, versione Application Flink: scegli Apache Flink 1.19.

  5. Configura la tua applicazione

    • Nome dell'applicazione: invioMyApplication.

    • Descrizione: immettereMy java test app.

    • Accesso alle risorse dell'applicazione: scegli Crea/aggiorna IAM il ruolo kinesis-analytics-MyApplication-us-east-1 con le politiche richieste.

  6. Configura il tuo modello per le impostazioni dell'applicazione

    • Modelli: scegli Sviluppo.

  7. Scegli Crea applicazione di streaming nella parte inferiore della pagina.

Nota

Quando crei un servizio gestito per l'applicazione Apache Flink utilizzando la console, hai la possibilità di creare un IAM ruolo e una policy per la tua applicazione. L'applicazione utilizza questo ruolo e questa policy per accedere alle sue risorse dipendenti. Queste IAM risorse vengono denominate utilizzando il nome e la regione dell'applicazione come segue:

  • Policy: kinesis-analytics-service-MyApplication-us-east-1

  • Ruolo: kinesisanalytics-MyApplication-us-east-1

Amazon Managed Service for Apache Flink era precedentemente noto come Kinesis Data Analytics. Il nome delle risorse che vengono create automaticamente ha il prefisso per garantire la compatibilità con le versioni precedenti. kinesis-analytics-

Modifica la politica IAM

Modifica la IAM policy per aggiungere le autorizzazioni per accedere ai flussi di dati Kinesis.

Per modificare la politica
  1. Apri la IAM console all'indirizzo https://console.aws.amazon.com/iam/.

  2. Seleziona Policy. Scegli la policy kinesis-analytics-service-MyApplication-us-east-1 creata dalla console nella sezione precedente.

  3. Scegli Modifica, quindi scegli la JSONscheda.

  4. Aggiungi alla policy la sezione evidenziata del seguente esempio di policy. Sostituisci l'account di esempio IDs (012345678901) con l'ID del tuo account.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. Scegli Avanti nella parte inferiore della pagina, quindi scegli Salva modifiche.

Configura l'applicazione

Modifica la configurazione dell'applicazione per impostare l'elemento del codice dell'applicazione.

Per modificare la configurazione
  1. Nella MyApplicationpagina, scegli Configura.

  2. Nella sezione Posizione del codice dell'applicazione:

    • Per il bucket Amazon S3, seleziona il bucket creato in precedenza per il codice dell'applicazione. Scegli Sfoglia e seleziona il bucket corretto, quindi seleziona Scegli. Non fare clic sul nome del bucket.

    • Per Percorso dell'oggetto Amazon S3, inserisci amazon-msf-java-stream-app-1.0.jar

  3. Per le autorizzazioni di accesso, scegli Crea/aggiorna il IAM ruolo kinesis-analytics-MyApplication-us-east-1 con le politiche richieste.

  4. Nella sezione Proprietà di runtime, aggiungi le seguenti proprietà.

  5. Scegliete Aggiungi nuovo elemento e aggiungete ciascuno dei seguenti parametri:

    ID gruppo Chiave Valore
    InputStream0 stream.name ExampleInputStream
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
  6. Non modificate nessuna delle altre sezioni.

  7. Scegli Save changes (Salva modifiche).

Nota

Quando scegli di abilitare la CloudWatch registrazione di Amazon, Managed Service for Apache Flink crea un gruppo di log e un flusso di log per te. I nomi di tali risorse sono i seguenti:

  • Gruppo di log: /aws/kinesis-analytics/MyApplication

  • Flusso di log: kinesis-analytics-log-stream

Esecuzione dell'applicazione.

L'applicazione è ora configurata e pronta per l'esecuzione.

Per eseguire l'applicazione
  1. Sulla console per Amazon Managed Service for Apache Flink, scegli La mia applicazione e scegli Esegui.

  2. Nella pagina successiva, nella pagina di configurazione del ripristino dell'applicazione, scegli Esegui con l'ultima istantanea, quindi scegli Esegui.

    Lo stato nell'applicazione descrive in dettaglio le transizioni da Ready Starting e poi a Running quando l'applicazione è stata avviata.

Quando l'applicazione è nello Running stato, ora puoi aprire la dashboard di Flink.

Per aprire il pannello di controllo
  1. Scegli Apri la dashboard di Apache Flink. La dashboard si apre in una nuova pagina.

  2. Nell'elenco dei lavori in esecuzione, scegli il singolo lavoro che puoi vedere.

    Nota

    Se hai impostato le proprietà di Runtime o hai modificato le IAM politiche in modo errato, lo stato dell'applicazione potrebbe cambiareRunning, ma la dashboard di Flink mostra che il lavoro viene riavviato continuamente. Si tratta di uno scenario di errore comune se l'applicazione non è configurata correttamente o non dispone delle autorizzazioni per accedere alle risorse esterne.

    Quando ciò accade, controlla la scheda Eccezioni nella dashboard di Flink per vedere la causa del problema.

Osserva le metriche dell'applicazione in esecuzione

Nella MyApplicationpagina, nella sezione Amazon CloudWatch metrics, puoi vedere alcune delle metriche fondamentali dell'applicazione in esecuzione.

Per visualizzare le metriche
  1. Accanto al pulsante Aggiorna, seleziona 10 secondi dall'elenco a discesa.

  2. Quando l'applicazione è in esecuzione ed è integra, puoi vedere la metrica di uptime aumentare continuamente.

  3. La metrica fullrestarts deve essere zero. Se è in aumento, la configurazione potrebbe presentare dei problemi. Per esaminare il problema, consulta la scheda Eccezioni nella dashboard di Flink.

  4. La metrica del numero di checkpoint non riusciti deve essere pari a zero in un'applicazione integra.

    Nota

    Questa dashboard mostra un set fisso di metriche con una granularità di 5 minuti. Puoi creare una dashboard applicativa personalizzata con qualsiasi metrica nella dashboard. CloudWatch

Osserva i dati di output nei flussi Kinesis

Assicurati di continuare a pubblicare i dati sull'input, usando lo script Python o il Kinesis Data Generator.

È ora possibile osservare l'output dell'applicazione in esecuzione su Managed Service for Apache Flink utilizzando il Data Viewer in https://console.aws.amazon.com/kinesis/, analogamente a quanto già fatto in precedenza.

Per visualizzare l'output
  1. Apri la console Kinesis in /kinesis. https://console.aws.amazon.com

  2. Verifica che la regione sia la stessa che stai usando per eseguire questo tutorial. Per impostazione predefinita, è US-East-1US East (Virginia settentrionale). Se necessario, modificare la regione.

  3. Scegli Data Streams.

  4. Seleziona lo stream che desideri osservare. Ai fini di questo tutorial, utilizza ExampleOutputStream.

  5. Scegli la scheda Data viewer.

  6. Seleziona uno Shard, mantieni Ultimo come posizione iniziale, quindi scegli Ottieni record. Potresti visualizzare l'errore «nessun record trovato per questa richiesta». In tal caso, scegli Riprova a recuperare i record. Vengono visualizzati i record più recenti pubblicati nello stream.

  7. Seleziona il valore nella colonna Dati per esaminare il contenuto del record nel JSON formato.

Arresta l'applicazione

Per interrompere l'applicazione, vai alla pagina della console dell'applicazione Managed Service for Apache Flink denominata. MyApplication

Per interrompere l'applicazione
  1. Dall'elenco a discesa Azione, scegli Stop.

  2. Lo stato nell'applicazione descrive in dettaglio le transizioni da Running e quindi a Ready quando l'applicazione viene completamente interrotta. Stopping

    Nota

    Non dimenticare di interrompere anche l'invio di dati al flusso di input dallo script Python o dal Kinesis Data Generator.

Approfondimenti

Pulisci le risorse AWS