

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Usa Apache Beam con Managed Service per le applicazioni Apache Flink
<a name="how-creating-apps-beam"></a>

**Nota**  
**Non esiste Apache Flink Runner compatibile per Flink 1.20. Per ulteriori informazioni, consulta la [compatibilità delle versioni di Flink](https://beam.apache.org/documentation/runners/flink/#flink-version-compatibility) nella documentazione di Apache Beam.** >

È possibile utilizzare il framework [Apache Beam](https://beam.apache.org/) con l'applicazione del servizio gestito per Apache Flink per elaborare i dati di streaming. Le applicazioni del servizio gestito per Apache Flink che utilizzano Apache Beam utilizzano il [runner Apache Flink](https://beam.apache.org/documentation/runners/flink/) per eseguire le pipeline Beam.

Per un tutorial sull'utilizzo di Apache Beam in un'applicazione del servizio gestito per Apache Flink, consulta [Usa CloudFormationCreazione di un'applicazione utilizzando Apache Beam](examples-beam.md).

**Topics**
+ [Limitazioni di Apache Flink runner con Managed Service for Apache Flink](#how-creating-apps-beam-using)
+ [Funzionalità di Apache Beam con Managed Service per Apache Flink](#how-creating-apps-beam-capabilities)
+ [Crea un'applicazione utilizzando Apache Beam](examples-beam.md)

## Limitazioni di Apache Flink runner con Managed Service for Apache Flink
<a name="how-creating-apps-beam-using"></a>

Tieni presente quanto segue sull'utilizzo del runner Apache Flink con il servizio gestito per Apache Flink:
+ I parametri di Apache Beam non sono visualizzabili nella console del servizio gestito per Apache Flink.
+ **Apache Beam è supportato solo nelle applicazioni del servizio gestito per Apache Flink che utilizzano Apache Flink versione 1.8 e successive. Apache Beam non è supportato nelle applicazioni del servizio gestito per Apache Flink che utilizzano Apache Flink versione 1.6.**

## Funzionalità di Apache Beam con Managed Service per Apache Flink
<a name="how-creating-apps-beam-capabilities"></a>

Il servizio gestito per Apache Flink supporta le stesse funzionalità di Apache Beam supportate dal runner Apache Flink. Per informazioni sulle funzionalità supportate dal runner Apache Flink, consulta la [Matrice di compatibilità Beam](https://beam.apache.org/documentation/runners/capability-matrix/). 

È consigliabile testare l'applicazione Apache Flink nel servizio gestito per Apache Flink per verificare che tutte le funzionalità di cui l'applicazione ha bisogno siano supportate.

# Crea un'applicazione utilizzando Apache Beam
<a name="examples-beam"></a>

In questo esercizio, viene creata un'applicazione del servizio gestito per Apache Flink che trasforma i dati utilizzando [Apache Beam](https://beam.apache.org/). Apache Beam è un modello di programmazione per l'elaborazione di dati di streaming. Per informazioni sull'utilizzo di Apache Beam con il servizio gestito per Apache Flink, consulta [Usa Apache Beam con Managed Service per le applicazioni Apache Flink](how-creating-apps-beam.md).

**Nota**  
Per impostare i prerequisiti richiesti per questo esercizio, completa innanzitutto l'esercizio [Tutorial: inizia a usare l' DataStream API in Managed Service for Apache Flink](getting-started.md).

**Topics**
+ [Crea risorse dipendenti](#examples-beam-resources)
+ [Scrivi record di esempio nel flusso di input](#examples-beam-write)
+ [Scarica ed esamina il codice dell'applicazione](#examples-beam-download)
+ [Compila il codice dell'applicazione](#examples-beam-compile)
+ [Carica il codice Java di streaming Apache Flink](#examples-beam-upload)
+ [Crea ed esegui l'applicazione Managed Service for Apache Flink](#examples-beam-create-run)
+ [Pulisci le risorse AWS](#examples-beam-cleanup)
+ [Fasi successive](#examples-beam-nextsteps)

## Crea risorse dipendenti
<a name="examples-beam-resources"></a>

Prima di creare un'applicazione del servizio gestito per Apache Flink per questo esercizio, è necessario creare le seguenti risorse dipendenti: 
+ Due flussi di dati Kinesis (`ExampleInputStream` e `ExampleOutputStream`)
+ Un bucket Amazon S3 per archiviare il codice dell'applicazione (`ka-app-code-<username>`) 

Puoi creare i flussi Kinesis e un bucket S3 utilizzando la console. Per istruzioni sulla creazione di queste risorse, consulta i seguenti argomenti:
+ [Creazione e aggiornamento dei flussi di dati](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) nella *Guida per gli sviluppatori del flusso di dati Amazon Kinesis*. Assegna un nome ai flussi di dati **ExampleInputStream** e **ExampleOutputStream**.
+ [Come si crea un bucket S3?](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html) nella *Guida per l'utente di Amazon Simple Storage Service*. Assegna al bucket Amazon S3 un nome globalmente univoco aggiungendo il tuo nome di accesso, ad esempio **ka-app-code-*<username>***.

## Scrivi record di esempio nel flusso di input
<a name="examples-beam-write"></a>

In questa sezione, viene utilizzato uno script Python per scrivere stringhe casuali nel flusso per l'applicazione da elaborare.

**Nota**  
Questa sezione richiede [AWS SDK per Python (Boto)](https://aws.amazon.com/developers/getting-started/python/).

1. Crea un file denominato `ping.py` con i seguenti contenuti:

   ```
   import json
   import boto3
   import random
   
   kinesis = boto3.client('kinesis')
   
   while True:
           data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat'])
           print(data)
           kinesis.put_record(
                   StreamName="ExampleInputStream",
                   Data=data,
                   PartitionKey="partitionkey")
   ```

1. Esegui lo script `ping.py`: 

   ```
   $ python ping.py
   ```

   Mantieni lo script in esecuzione mentre completi il resto del tutorial.

## Scarica ed esamina il codice dell'applicazione
<a name="examples-beam-download"></a>

Il codice dell'applicazione Java per questo esempio è disponibile da GitHub. Per scaricare il codice dell'applicazione, esegui le operazioni descritte di seguito.

1. Installa il client Git se non lo hai già fatto. Per ulteriori informazioni, consulta [Installazione di Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git). 

1. Clona il repository remoto con il comando seguente:

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
   ```

1. Passa alla directory `amazon-kinesis-data-analytics-java-examples/Beam`.

Il codice dell'applicazione si trova nei file `BasicBeamStreamingJob.java`. Tieni presente quanto segue riguardo al codice dell'applicazione:
+ L'applicazione utilizza Apache Beam [ParDo](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/transforms/ParDo.html)per elaborare i record in entrata richiamando una funzione di trasformazione personalizzata chiamata. `PingPongFn`

  Il codice per richiamare la funzione `PingPongFn` è il seguente:

  ```
  .apply("Pong transform",
      ParDo.of(new PingPongFn())
  ```
+ Le applicazioni del servizio gestito per Apache Flink che utilizzano Apache Beam richiedono i seguenti componenti. Se questi componenti e versioni non vengono inclusi in `pom.xml`, l'applicazione carica le versioni errate dalle dipendenze dell'ambiente e, poiché le versioni non corrispondono, si blocca in fase di runtime.

  ```
  <jackson.version>2.10.2</jackson.version>
  ...
  <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-jaxb-annotations</artifactId>
      <version>2.10.2</version>
  </dependency>
  ```
+ La funzione di trasformazione `PingPongFn` passa i dati di input nel flusso di output, a meno che i dati di input non siano **ping**, nel qual caso nel flusso di output viene emessa la stringa **pong\$1n**. 

  Il codice della funzione di trasformazione è il seguente:

  ```
      private static class PingPongFn extends DoFn<KinesisRecord, byte[]> {
      private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class);
      
      @ProcessElement
      public void processElement(ProcessContext c) {
          String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8);
          if (content.trim().equalsIgnoreCase("ping")) {
              LOG.info("Ponged!");
              c.output("pong\n".getBytes(StandardCharsets.UTF_8));
          } else {
              LOG.info("No action for: " + content);
              c.output(c.element().getDataAsBytes());
          }
      }
  }
  ```

## Compila il codice dell'applicazione
<a name="examples-beam-compile"></a>

Per scaricare il codice dell'applicazione, esegui le operazioni descritte di seguito:

1. Installa Java e Maven se non lo hai già fatto. Per ulteriori informazioni, consulta [Completa i prerequisiti richiesti](getting-started.md#setting-up-prerequisites) nel tutorial [Tutorial: inizia a usare l' DataStream API in Managed Service for Apache Flink](getting-started.md).

1. Compila l'applicazione con il seguente comando: 

   ```
   mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
   ```
**Nota**  
Il codice di origine fornito si basa sulle librerie di Java 11. 

La compilazione dell'applicazione crea il file JAR dell'applicazione (`target/basic-beam-app-1.0.jar`).

## Carica il codice Java di streaming Apache Flink
<a name="examples-beam-upload"></a>

In questa sezione, il codice dell'applicazione viene caricato nel bucket Amazon S3 creato nella sezione [Crea risorse dipendenti](#examples-beam-resources).

1. **Nella console Amazon S3, scegli il *<username>* bucket **ka-app-code-** e scegli Carica.**

1. Nella fase **Seleziona file**, scegli **Aggiungi file**. Individua il file `basic-beam-app-1.0.jar` creato nella fase precedente. 

1. Non è necessario modificare alcuna delle impostazioni dell'oggetto, quindi scegli **Carica**.

Il codice dell'applicazione è ora archiviato in un bucket Amazon S3 accessibile dall'applicazione.

## Crea ed esegui l'applicazione Managed Service for Apache Flink
<a name="examples-beam-create-run"></a>

Segui questi passaggi per creare, configurare, aggiornare ed eseguire l'applicazione utilizzando la console.

### Creazione dell'applicazione
<a name="examples-beam-create"></a>

1. Accedi a e apri Console di gestione AWS la console Amazon MSF all'indirizzo https://console.aws.amazon.com /flink.

1. Nella dashboard del servizio gestito per Apache Flink, scegli **Crea un'applicazione di analisi**.

1. Nella pagina **Servizio gestito per Apache Flink: crea applicazione**, fornisci i dettagli dell'applicazione nel modo seguente:
   + Per **Nome applicazione**, inserisci **MyApplication**.
   + Per **Runtime**, scegli **Apache Flink**.
**Nota**  
Apache Beam non è attualmente compatibile con Apache Flink versione 1.19 o successiva.
   + Seleziona **Apache Flink** versione 1.15 dal menu a discesa della versione.

1. Per **Autorizzazioni di accesso**, scegli **Crea/aggiorna `kinesis-analytics-MyApplication-us-west-2`** per il ruolo IAM.

1. Scegli **Crea applicazione**.

**Nota**  
Quando crei un'applicazione del servizio gestito per Apache Flink tramite la console, hai la possibilità di avere un ruolo e una policy IAM creati per l'applicazione. L'applicazione utilizza questo ruolo e questa policy per accedere alle sue risorse dipendenti. Queste risorse IAM sono denominate utilizzando il nome dell'applicazione e la Regione come segue:  
Policy: `kinesis-analytics-service-MyApplication-us-west-2`
Ruolo: `kinesis-analytics-MyApplication-us-west-2`

### Modifica la policy IAM
<a name="get-started-exercise-7-console-iam"></a>

Modifica la policy IAM per aggiungere le autorizzazioni per accedere ai flussi di dati Kinesis.

1. Aprire la console IAM all'indirizzo [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Seleziona **Policy**. Scegli la policy **`kinesis-analytics-service-MyApplication-us-west-2`** creata dalla console nella sezione precedente. 

1. Nella pagina **Riepilogo**, scegli **Modifica policy**. Scegli la scheda **JSON**.

1. Aggiungi alla policy la sezione evidenziata del seguente esempio di policy. Sostituisci l'account di esempio IDs (*012345678901*) con l'ID del tuo account.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "logs:DescribeLogGroups",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*",
                   "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar"
               ]
           },
           {
               "Sid": "DescribeLogStreams",
               "Effect": "Allow",
               "Action": "logs:DescribeLogStreams",
               "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
           },
           {
               "Sid": "PutLogEvents",
               "Effect": "Allow",
               "Action": "logs:PutLogEvents",
               "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

### Configura l'applicazione
<a name="examples-beam-configure"></a>

1. Nella **MyApplication**pagina, scegli **Configura**.

1. Nella pagina **Configura applicazione**, fornisci la **Posizione del codice**:
   + Per **Bucket Amazon S3**, inserisci **ka-app-code-*<username>***.
   + Per **Percorso dell'oggetto Amazon S3**, inserisci **basic-beam-app-1.0.jar**

1. In **Accedi alle risorse dell'applicazione**, per **Autorizzazioni di accesso**, scegli **Crea/aggiorna `kinesis-analytics-MyApplication-us-west-2` per il ruolo IAM**.

1. Inserisci i seguenti dati:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/managed-flink/latest/java/examples-beam.html)

1. In **Monitoraggio**, accertati che il **Monitoraggio del livello dei parametri** sia impostato su **Applicazione**.

1. Per la **CloudWatch registrazione**, seleziona la casella di controllo **Abilita**.

1. Scegliere **Aggiorna**.

**Nota**  
Quando scegli di abilitare la CloudWatch registrazione, Managed Service for Apache Flink crea un gruppo di log e un flusso di log per te. I nomi di tali risorse sono i seguenti:   
Gruppo di log: `/aws/kinesis-analytics/MyApplication`
Flusso di log: `kinesis-analytics-log-stream`
Questo flusso di log viene utilizzato per monitorare l'applicazione. Non si tratta dello stesso flusso di log utilizzato dall'applicazione per inviare i risultati.

### Esecuzione dell'applicazione.
<a name="examples-beam-run"></a>

Il grafico del processo Flink può essere visualizzato eseguendo l'applicazione, aprendo il pannello di controllo di Apache Flink e scegliendo il processo Flink desiderato.

Puoi controllare le metriche del servizio gestito per Apache Flink sulla CloudWatch console per verificare che l'applicazione funzioni. 

## Pulisci le risorse AWS
<a name="examples-beam-cleanup"></a>

Questa sezione include le procedure per ripulire AWS le risorse create nel tutorial di Tumbling Window.

**Topics**
+ [Eliminare l'applicazione Managed Service for Apache Flink](#examples-beam-cleanup-app)
+ [Eliminare i flussi di dati Kinesis](#examples-beam-cleanup-stream)
+ [Elimina l'oggetto e il bucket Amazon S3](#examples-beam-cleanup-s3)
+ [Elimina le tue risorse IAM](#examples-beam-cleanup-iam)
+ [CloudWatch Elimina le tue risorse](#examples-beam-cleanup-cw)

### Eliminare l'applicazione Managed Service for Apache Flink
<a name="examples-beam-cleanup-app"></a>

1. Accedi a e apri Console di gestione AWS la console Amazon MSF all'indirizzo https://console.aws.amazon.com /flink.

1. nel pannello Managed Service for Apache Flink, scegli. **MyApplication**

1. Nella pagina dell'applicazione, scegli **Elimina** e quindi conferma l'eliminazione.

### Eliminare i flussi di dati Kinesis
<a name="examples-beam-cleanup-stream"></a>

1. [Apri la console Kinesis in /kinesis. https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. Nel pannello Kinesis Data Streams, scegli. **ExampleInputStream**

1. Nella **ExampleInputStream**pagina, scegli **Elimina Kinesis Stream** e conferma l'eliminazione.

1. Nella pagina **Kinesis Streams**, scegli, scegli **Azioni **ExampleOutputStream****, scegli **Elimina**, quindi conferma l'eliminazione.

### Elimina l'oggetto e il bucket Amazon S3
<a name="examples-beam-cleanup-s3"></a>

1. Apri la console Amazon S3 all'indirizzo. [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)

1. Scegli il ***<username>*secchio ka-app-code -**.

1. Per confermare l'eliminazione, scegli **Elimina**, quindi inserisci il nome del bucket.

### Elimina le tue risorse IAM
<a name="examples-beam-cleanup-iam"></a>

1. Aprire la console IAM all'indirizzo [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Nella barra di navigazione, scegli **Policy**.

1. Nel controllo filtro, inserisci **kinesis**.

1. Scegli la politica **kinesis-analytics-service- MyApplication -us-west-2**.

1. Seleziona **Operazioni di policy** e quindi **Elimina**.

1. Nella barra di navigazione, scegli **Ruoli**.

1. Scegli il ruolo **kinesis-analytics- MyApplication** -us-west-2.

1. Quindi scegli **Elimina ruolo** e conferma l'eliminazione.

### CloudWatch Elimina le tue risorse
<a name="examples-beam-cleanup-cw"></a>

1. Apri la CloudWatch console all'indirizzo [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. Nella barra di navigazione, scegli **Log**.

1. Scegli il gruppo**/aws/kinesis-analytics/MyApplication**log.

1. Quindi scegli **Elimina gruppo di log** e conferma l'eliminazione.

## Fasi successive
<a name="examples-beam-nextsteps"></a>

Ora che hai creato ed eseguito un'applicazione di base del servizio gestito per Apache Flink che trasforma i dati utilizzando Apache Beam, consulta la seguente applicazione come esempio di una soluzione del servizio gestito per Apache Flink più avanzata.
+ ** [ Workshop in streaming su Beam nel servizio gestito per Apache Flink](https://streaming-analytics.workshop.aws/beam-on-kda/)**: in questo workshop, viene esplorato un esempio completo che combina aspetti di batch e streaming in un'unica pipeline Apache Beam uniforme. 