Guida introduttiva (Scala) - Servizio gestito per Apache Flink

Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Guida introduttiva (Scala)

Nota

A partire dalla versione 1.15, Flink è gratuito per Scala. Le applicazioni possono ora utilizzare Java API da qualsiasi versione di Scala. Flink utilizza ancora Scala internamente in alcuni componenti chiave, ma non espone Scala nel classloader del codice utente. Per questo motivo, devi aggiungere le dipendenze di Scala nei tuoi -archivi. JAR

Per ulteriori informazioni sulle modifiche a Scala in Flink 1.15, consulta Scala non più disponibile nella versione 1.15.

In questo esercizio, creerai un'applicazione Managed Service for Apache Flink per Scala con un flusso Kinesis come sorgente e sink.

Crea risorse dipendenti

Prima di creare un'applicazione del servizio gestito per Apache Flink per questo esercizio, è necessario creare le seguenti risorse dipendenti:

  • Due flussi Kinesis per l'input e l'output.

  • Un bucket Amazon S3 per archiviare il codice dell'applicazione (ka-app-code-<username>)

Puoi creare i flussi Kinesis e un bucket S3 utilizzando la console. Per istruzioni sulla creazione di queste risorse, consulta i seguenti argomenti:

  • Creazione e aggiornamento dei flussi di dati nella Guida per gli sviluppatori del flusso di dati Amazon Kinesis. Assegna un nome ai flussi di dati ExampleInputStream e ExampleOutputStream.

    Per creare i flussi di dati (AWS CLI)

    • Per creare il primo stream (ExampleInputStream), usa il seguente comando Amazon Kinesis AWS CLI create-stream.

      aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
    • Per creare il secondo flusso utilizzato dall'applicazione per scrivere l'output, esegui lo stesso comando, modificando il nome del flusso in ExampleOutputStream.

      aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  • Consulta Come si crea un bucket S3? nella Guida per l'utente di Amazon Simple Storage Service. Assegna al bucket Amazon S3 un nome univoco globale aggiungendo il tuo nome di accesso, ad esempio ka-app-code-<username>.

Altre risorse

Quando crei la tua applicazione, Managed Service for Apache Flink crea le seguenti CloudWatch risorse Amazon se non esistono già:

  • Un gruppo di log denominato /AWS/KinesisAnalytics-java/MyApplication

  • Un flusso di log denominato kinesis-analytics-log-stream

Scrivi record di esempio nel flusso di input

In questa sezione, viene utilizzato uno script Python per scrivere record di esempio nel flusso per l'applicazione da elaborare.

Nota

Questa sezione richiede AWS SDK for Python (Boto).

Nota

Lo script Python in questa sezione utilizza la AWS CLI. È necessario configurare AWS CLI per utilizzare le credenziali dell'account e la regione predefinita. Per configurare la tua AWS CLI, inserisci quanto segue:

aws configure
  1. Crea un file denominato stock.py con i seguenti contenuti:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. Esegui lo script stock.py:

    $ python stock.py

    Mantieni lo script in esecuzione mentre completi il resto del tutorial.

Scarica ed esamina il codice dell'applicazione

Il codice dell'applicazione Python per questo esempio è disponibile da. GitHub Per scaricare il codice dell'applicazione, esegui le operazioni descritte di seguito.

  1. Installa il client Git se non lo hai già fatto. Per ulteriori informazioni, consulta Installazione di Git.

  2. Clona il repository remoto con il comando seguente:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Passa alla directory amazon-kinesis-data-analytics-java-examples/scala/GettingStarted.

Tieni presente quanto segue riguardo al codice dell'applicazione:

  • Un file build.sbt contiene le informazioni sulla configurazione e le dipendenze dell'applicazione, incluse le librerie del servizio gestito per Apache Flink.

  • Il file BasicStreamingJob.scala contiene il metodo principale che definisce la funzionalità dell'applicazione.

  • L'applicazione utilizza un'origine Kinesis per leggere dal flusso di origine. Il seguente snippet crea l'origine Kinesis:

    private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }

    L'applicazione utilizza anche un sink Kinesis per scrivere nel flusso dei risultati. Il seguente snippet crea il sink Kinesis:

    private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
  • L'applicazione crea connettori source e sink per accedere a risorse esterne utilizzando un StreamExecutionEnvironment oggetto.

  • L'applicazione crea connettori di origine e sink utilizzando proprietà dinamiche. Questi metodi leggono le proprietà dell'applicazione di runtime per configurare il connettori. Per ulteriori informazioni sulle proprietà di runtime, consulta Proprietà di runtime.

Per creare e compilare il codice dell'applicazione

In questa sezione, viene compilato e caricato il codice dell'applicazione nel bucket Amazon S3 creato nella sezione Crea risorse dipendenti.

Compilazione del codice dell'applicazione

In questa sezione, si utilizza lo strumento di SBTcompilazione per creare il codice Scala per l'applicazione. Per l'installazioneSBT, vedi Installare sbt con cs setup. È inoltre necessario installare il Java Development Kit (JDK). Consulta Prerequisiti per il completamento degli esercizi.

  1. Per utilizzare il codice dell'applicazione, lo si compila e lo si impacchetta in un JAR file. Puoi compilare e impacchettare il tuo codice con: SBT

    sbt assembly
  2. Se l'applicazione viene compilata correttamente, viene creato il seguente file:

    target/scala-3.2.0/getting-started-scala-1.0.jar
Caricamento del codice Scala di streaming di Apache Flink

In questa sezione, viene creato un bucket Amazon S3 e caricato il codice dell'applicazione.

  1. Apri la console Amazon S3 all'indirizzo. https://console.aws.amazon.com/s3/

  2. Seleziona Crea bucket.

  3. Immetti ka-app-code-<username> nel campo Nome bucket. Aggiungi un suffisso al nome del bucket, ad esempio il nome utente, per renderlo globalmente univoco. Scegli Next (Successivo).

  4. Nella fase Configura opzioni, non modificare le impostazioni e scegli Successivo.

  5. Nella fase Imposta autorizzazioni, non modificare le impostazioni e scegli Successivo.

  6. Seleziona Crea bucket.

  7. Scegli il bucket ka-app-code-<username>, quindi scegli Carica.

  8. Nella fase Seleziona file, scegli Aggiungi file. Individua il file getting-started-scala-1.0.jar creato nella fase precedente.

  9. Non è necessario modificare alcuna delle impostazioni dell'oggetto, quindi scegli Carica.

Il codice dell'applicazione è ora archiviato in un bucket Amazon S3 accessibile dall'applicazione.