

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Inizia a usare Amazon Managed Service per Apache Flink for Python
<a name="gs-python"></a>

Questa sezione presenta i concetti fondamentali di un servizio gestito per Apache Flink utilizzando Python e l'API Table. Descrive le opzioni disponibili per la creazione e il test delle applicazioni. Fornisce inoltre istruzioni per l'installazione degli strumenti necessari per completare i tutorial di questa guida e creare la tua prima applicazione. 

**Topics**
+ [

## Esamina i componenti di un'applicazione Managed Service for Apache Flink
](#gs-python-table-components)
+ [

## Soddisfa i prerequisiti
](#gs-python-prerequisites)
+ [

# Creare ed eseguire un servizio gestito per l'applicazione Apache Flink for Python
](gs-python-createapp.md)
+ [

# Pulisci le risorse AWS
](gs-python-cleanup.md)

## Esamina i componenti di un'applicazione Managed Service for Apache Flink
<a name="gs-python-table-components"></a>

**Nota**  
Amazon Managed Service per Apache Flink supporta tutti gli [Apache](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/overview/#flinks-apis) Flink. APIs A seconda dell'API scelta, la struttura dell'applicazione è leggermente diversa. Un approccio popolare nello sviluppo di un'applicazione Apache Flink in Python consiste nel definire il flusso dell'applicazione utilizzando SQL incorporato nel codice Python. Questo è l'approccio che seguiamo nel seguente tutorial Gettgin Started.

Per elaborare i dati, l'applicazione Managed Service for Apache Flink utilizza uno script Python per definire il flusso di dati che elabora l'input e produce l'output utilizzando il runtime Apache Flink. 

Una tipica applicazione Managed Service for Apache Flink ha i seguenti componenti:
+ **Proprietà di runtime:** è possibile utilizzare le *proprietà di runtime* per configurare l'applicazione senza ricompilare il codice dell'applicazione. 
+ **Fonti:** *l'applicazione utilizza dati da una o più fonti.* Una fonte utilizza un [connettore](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) per leggere i dati da un sistema esterno, ad esempio un flusso di dati Kinesis o un argomento di Amazon MSK. Puoi anche utilizzare connettori speciali per generare dati dall'interno dell'applicazione. Quando si utilizza SQL, l'applicazione definisce le fonti come *tabelle di origine*. 
+ **Trasformazioni:** l'applicazione elabora i dati utilizzando una o più *trasformazioni* in grado di filtrare, arricchire o aggregare i dati. Quando si utilizza SQL, l'applicazione definisce le trasformazioni come query SQL. 
+ **Sinks:** *l'applicazione invia dati a fonti esterne tramite sink.* Un sink utilizza un [connettore](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) per inviare dati a un sistema esterno come un flusso di dati Kinesis, un argomento Amazon MSK, un bucket Amazon S3 o un database relazionale. Puoi anche usare un connettore speciale per stampare l'output per scopi di sviluppo. Quando si utilizza SQL, l'applicazione definisce i sink come *tabelle sink* in cui inserire i risultati. Per ulteriori informazioni, consulta [Scrivi dati utilizzando i sinks in Managed Service for Apache Flink](how-sinks.md).

La tua applicazione Python potrebbe richiedere anche dipendenze esterne, come librerie Python aggiuntive o qualsiasi connettore Flink utilizzato dall'applicazione. Quando impacchettate l'applicazione, dovete includere tutte le dipendenze richieste dall'applicazione. Questo tutorial dimostra come includere le dipendenze dei connettori e come impacchettare l'applicazione per la distribuzione su Amazon Managed Service for Apache Flink.

## Soddisfa i prerequisiti
<a name="gs-python-prerequisites"></a>

Per completare questo tutorial, devi avere quanto segue:
+ **Python 3.11** [https://docs.conda.io/en/latest/](https://docs.conda.io/en/latest/)
+  [Client Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git): installa il client Git se non l'hai già fatto.
+ [Java Development Kit (JDK) versione 11](https://www.oracle.com/java/technologies/downloads/#java11): installa un Java JDK 11 e imposta la variabile di `JAVA_HOME` ambiente in modo che punti alla posizione di installazione. Se non disponi di un JDK 11, puoi utilizzare uno qualsiasi JDK standard di [Amazon Corretto](https://docs.aws.amazon.com/corretto)nostra scelta. 
  + Per verificare che il JDK sia installato correttamente, esegui il seguente comando. L'output sarà diverso se utilizzi un JDK diverso da Amazon Corretto 11. Assicurati che la versione sia 11.x.

    ```
    $ java --version
    
    openjdk 11.0.23 2024-04-16 LTS
    OpenJDK Runtime Environment Corretto-11.0.23.9.1 (build 11.0.23+9-LTS)
    OpenJDK 64-Bit Server VM Corretto-11.0.23.9.1 (build 11.0.23+9-LTS, mixed mode)
    ```
+ [Apache Maven](https://maven.apache.org/): installa Apache Maven se non l'hai già fatto. [Per ulteriori informazioni, consulta Installazione di Apache Maven.](https://maven.apache.org/install.html)
  + Per testare l'installazione di Apache Maven, usa il seguente comando:

    ```
    $ mvn -version
    ```

**Nota**  
Sebbene l'applicazione sia scritta in Python, Apache Flink viene eseguito nella Java Virtual Machine (JVM). Distribuisce la maggior parte delle dipendenze, come il connettore Kinesis, come file JAR. [Per gestire queste dipendenze e impacchettare l'applicazione in un file ZIP, utilizzate Apache Maven.](https://maven.apache.org/) Questo tutorial spiega come farlo. 

**avvertimento**  
Ti consigliamo di usare Python 3.11 per lo sviluppo locale. Questa è la stessa versione di Python utilizzata da Amazon Managed Service per Apache Flink con il runtime Flink 1.19.   
L'installazione della libreria Python Flink 1.19 su Python 3.12 potrebbe non riuscire.  
Se hai un'altra versione di Python installata di default sul tuo computer, ti consigliamo di creare un ambiente autonomo come usare VirtualEnv Python 3.11.

**IDE per lo sviluppo locale**

Ti consigliamo di utilizzare un ambiente di sviluppo come [PyCharm](https://www.jetbrains.com/pycharm/)[Visual Studio Code](https://code.visualstudio.com/) per sviluppare e compilare l'applicazione.

Quindi, completa i primi due passaggi di[Inizia a usare Amazon Managed Service per Apache Flink (DataStream API)](getting-started.md):
+ [Configura un AWS account e crea un utente amministratore](setting-up.md)
+ [Configura il () AWS Command Line Interface AWS CLI](setup-awscli.md)

Per iniziare, consulta [Creazione di un'applicazione](gs-python-createapp.md).

# Creare ed eseguire un servizio gestito per l'applicazione Apache Flink for Python
<a name="gs-python-createapp"></a>

In questa sezione, crei un'applicazione Managed Service for Apache Flink per l'applicazione Python con un flusso Kinesis come sorgente e sink.

**Topics**
+ [

## Crea risorse dipendenti
](#gs-python-resources)
+ [

## Configurazione dell'ambiente di sviluppo locale
](#gs-python-set-up)
+ [

## Scarica ed esamina il codice Python per lo streaming di Apache Flink
](#gs-python-download)
+ [

## Gestisci le dipendenze JAR
](#gs-python-jar-dependencies)
+ [

## Scrivi record di esempio nel flusso di input
](#gs-python-sample-records)
+ [

## Esegui l'applicazione localmente
](#gs-python-run-locally)
+ [

## Osserva i dati di input e output nei flussi Kinesis
](#gs-python-observe-input-output)
+ [

## Interrompi l'esecuzione locale dell'applicazione
](#gs-python-stop)
+ [

## Package del codice dell'applicazione
](#gs-python-package-code)
+ [

## Carica il pacchetto dell'applicazione in un bucket Amazon S3
](#gs-python-upload-bucket)
+ [

## Crea e configura l'applicazione Managed Service for Apache Flink
](#gs-python-7)
+ [

## Approfondimenti
](#gs-python-next-step-4)

## Crea risorse dipendenti
<a name="gs-python-resources"></a>

Prima di creare un servizio gestito per Apache Flink per questo esercizio, devi creare le risorse dipendenti seguenti: 
+ Due flussi Kinesis per l'input e l'output.
+ Un bucket Amazon S3 per archiviare il codice dell'applicazione.

**Nota**  
Questo tutorial presuppone che l'applicazione venga distribuita nella regione us-east-1. Se si utilizza un'altra regione, è necessario adattare tutti i passaggi di conseguenza.

### Crea due stream Kinesis
<a name="gs-python-resources-streams"></a>

Prima di creare un'applicazione Managed Service for Apache Flink per questo esercizio, crea due flussi di dati Kinesis (`ExampleInputStream`e`ExampleOutputStream`) nella stessa regione che utilizzerai per distribuire l'applicazione (us-east-1 in questo esempio). L'applicazione utilizza questi flussi per i flussi di origine e di destinazione dell'applicazione.

Puoi creare questi flussi utilizzando la console Amazon Kinesis o il comando AWS CLI seguente. Per istruzioni sulla console, consulta [Creazione e aggiornamento dei flussi di dati](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) nella *Guida per gli sviluppatori del flusso di dati Amazon Kinesis*. 

**Per creare i flussi di dati (AWS CLI)**

1. Per creare il primo stream (`ExampleInputStream`), usa il seguente comando Amazon Kinesis `create-stream` AWS CLI .

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-east-1
   ```

1. Per creare il secondo flusso utilizzato dall'applicazione per scrivere l'output, esegui lo stesso comando, modificando il nome del flusso in `ExampleOutputStream`.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-east-1
   ```

### Crea un bucket Amazon S3
<a name="gs-python-resources-s3"></a>

Puoi creare un bucket Amazon S3 utilizzando la relativa console. Per istruzioni per la creazione di questa risorsa, consulta gli argomenti riportati di seguito:
+ [Come si crea un bucket S3?](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html) nella *Guida per l'utente di Amazon Simple Storage Service*. Assegna al bucket Amazon S3 un nome univoco a livello globale, ad esempio aggiungendo il tuo nome di accesso.
**Nota**  
Assicurati di creare il bucket S3 nella regione che usi per questo tutorial (us-east-1).

### Altre risorse
<a name="gs-python-resources-cw"></a>

Quando crei la tua applicazione, Managed Service for Apache Flink crea le seguenti CloudWatch risorse Amazon se non esistono già:
+ Un gruppo di log chiamato `/AWS/KinesisAnalytics-java/<my-application>`.
+ Un flusso di log denominato `kinesis-analytics-log-stream`.

## Configurazione dell'ambiente di sviluppo locale
<a name="gs-python-set-up"></a>

Per lo sviluppo e il debug, puoi eseguire l'applicazione Python Flink sul tuo computer. Puoi avviare l'applicazione dalla riga di comando con `python main.py` o in un IDE Python a tua scelta. 

**Nota**  
Sulla macchina di sviluppo, devi avere Python 3.10 o 3.11, Java 11, Apache Maven e Git installati. [Ti consigliamo di usare un IDE come Visual Studio Code. [PyCharm](https://www.jetbrains.com/pycharm/)](https://code.visualstudio.com/) Per verificare che tutti i prerequisiti siano soddisfatti, consulta [Soddisfa i prerequisiti per completare gli esercizi](gs-python.md#gs-python-prerequisites) prima di procedere. 

### Installa la libreria PyFlink
<a name="gs-python-install-pyflink"></a>

Per sviluppare l'applicazione ed eseguirla localmente, è necessario installare la libreria Flink Python.

1. Crea un ambiente Python autonomo VirtualEnv usando Conda o qualsiasi strumento Python simile. 

1. Installa la PyFlink libreria in quell'ambiente. Usa la stessa versione di runtime di Apache Flink che utilizzerai in Amazon Managed Service per Apache Flink. Attualmente, il runtime consigliato è 1.19.1. 

   ```
   $ pip install apache-flink==1.19.1
   ```

1. Assicurati che l'ambiente sia attivo quando esegui l'applicazione. Se esegui l'applicazione nell'IDE, assicurati che l'IDE utilizzi l'ambiente come runtime. Il processo dipende dall'IDE che state utilizzando. 
**Nota**  
Devi solo installare la PyFlink libreria. **Non** è necessario installare un cluster Apache Flink sul computer. 

### Autentica la tua sessione AWS
<a name="gs-python-authenticate"></a>

L'applicazione utilizza i flussi di dati Kinesis per pubblicare i dati. Quando si esegue localmente, è necessario disporre di una sessione AWS autenticata valida con autorizzazioni di scrittura nel flusso di dati Kinesis. Usa i seguenti passaggi per autenticare la tua sessione:

1. Se non hai configurato il profilo AWS CLI e un profilo denominato con credenziali valide, consulta. [Configura il () AWS Command Line Interface AWS CLI](setup-awscli.md)

1. Verifica che AWS CLI sia configurato correttamente e che gli utenti dispongano delle autorizzazioni per scrivere nel flusso di dati Kinesis pubblicando il seguente record di test:

   ```
   $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
   ```

1. Se il tuo IDE ha un plugin con cui integrarti AWS, puoi utilizzarlo per passare le credenziali all'applicazione in esecuzione nell'IDE. Per ulteriori informazioni, vedere [AWS Toolkit for PyCharm,AWS Toolkit for](https://aws.amazon.com/pycharm/) [Visual Studio Code [AWS e Toolkit for](https://aws.amazon.com/intellij/)](https://aws.amazon.com/visualstudiocode/) IntelliJ IDEA.

## Scarica ed esamina il codice Python per lo streaming di Apache Flink
<a name="gs-python-download"></a>

Il codice dell'applicazione Python per questo esempio è disponibile da. GitHub Per scaricare il codice dell'applicazione, esegui le operazioni descritte di seguito:

1. Clona il repository remoto con il comando seguente:

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. Passa alla directory `./python/GettingStarted`.

### Esamina i componenti dell'applicazione
<a name="gs-python-review"></a>

Il codice dell'applicazione si trova in`main.py`. Utilizziamo SQL incorporato in Python per definire il flusso dell'applicazione. 

**Nota**  
Per un'esperienza di sviluppo ottimizzata, l'applicazione è progettata per funzionare senza modifiche al codice sia su Amazon Managed Service per Apache Flink che localmente, per lo sviluppo sulla tua macchina. L'applicazione utilizza la variabile di ambiente `IS_LOCAL = true` per rilevare quando è in esecuzione localmente. È necessario impostare la variabile `IS_LOCAL = true` di ambiente sulla shell o nella configurazione di esecuzione dell'IDE.
+ L'applicazione configura l'ambiente di esecuzione e legge la configurazione di runtime. Per funzionare sia su Amazon Managed Service for Apache Flink che localmente, l'applicazione controlla la `IS_LOCAL` variabile. 
  + Di seguito è riportato il comportamento predefinito quando l'applicazione viene eseguita in Amazon Managed Service for Apache Flink:

    1. Carica le dipendenze impacchettate con l'applicazione. Per ulteriori informazioni, vedere (link)

    1. Carica la configurazione dalle proprietà di runtime che definisci nell'applicazione Amazon Managed Service for Apache Flink. Per ulteriori informazioni, consulta (link)
  + Quando l'applicazione rileva `IS_LOCAL = true` quando l'applicazione viene eseguita localmente:

    1. Carica le dipendenze esterne dal progetto.

    1. Carica la configurazione dal `application_properties.json` file incluso nel progetto. 

       ```
       ...
       APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"
       ...
       is_local = (
           True if os.environ.get("IS_LOCAL") else False
       )
       ...
       if is_local:
           APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"
           CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
           table_env.get_config().get_configuration().set_string(
               "pipeline.jars",
               "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar",
           )
       ```
+ L'applicazione definisce una tabella di origine con un'`CREATE TABLE`istruzione, utilizzando il [Kinesis](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/) Connector. Questa tabella legge i dati dal flusso Kinesis di input. L'applicazione prende il nome dello stream, la regione e la posizione iniziale dalla configurazione di runtime. 

  ```
  table_env.execute_sql(f"""
          CREATE TABLE prices (
                  ticker VARCHAR(6),
                  price DOUBLE,
                  event_time TIMESTAMP(3),
                  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
                )
                PARTITIONED BY (ticker)
                WITH (
                  'connector' = 'kinesis',
                  'stream' = '{input_stream_name}',
                  'aws.region' = '{input_stream_region}',
                  'format' = 'json',
                  'json.timestamp-format.standard' = 'ISO-8601'
                ) """)
  ```
+ L'applicazione definisce anche una tabella sink utilizzando il [Kinesis Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/) in questo esempio. Questo racconto invia i dati allo stream Kinesis in uscita.

  ```
  table_env.execute_sql(f"""
              CREATE TABLE output (
                  ticker VARCHAR(6),
                  price DOUBLE,
                  event_time TIMESTAMP(3)
                )
                PARTITIONED BY (ticker)
                WITH (
                  'connector' = 'kinesis',
                  'stream' = '{output_stream_name}',
                  'aws.region' = '{output_stream_region}',
                  'sink.partitioner-field-delimiter' = ';',
                  'sink.batch.max-size' = '100',
                  'format' = 'json',
                  'json.timestamp-format.standard' = 'ISO-8601'
                )""")
  ```
+ Infine, l'applicazione esegue un codice SQL `INSERT INTO...` derivante dalla tabella sorgente. In un'applicazione più complessa, è probabile che siano necessari passaggi aggiuntivi per trasformare i dati prima di scriverli nel sink. 

  ```
  table_result = table_env.execute_sql("""INSERT INTO output 
          SELECT ticker, price, event_time FROM prices""")
  ```
+ È necessario aggiungere un altro passaggio alla fine della `main()` funzione per eseguire l'applicazione localmente:

  ```
  if is_local:
      table_result.wait()
  ```

  Senza questa istruzione, l'applicazione termina immediatamente quando viene eseguita localmente. Non devi eseguire questa istruzione quando esegui l'applicazione in Amazon Managed Service for Apache Flink.

## Gestisci le dipendenze JAR
<a name="gs-python-jar-dependencies"></a>

Un' PyFlink applicazione richiede in genere uno o più connettori. L'applicazione in questo tutorial utilizza il [Kinesis](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/) Connector. Poiché Apache Flink viene eseguito in Java JVM, i connettori vengono distribuiti come file JAR, indipendentemente dal fatto che l'applicazione venga implementata in Python. È necessario impacchettare queste dipendenze con l'applicazione quando la si distribuisce su Amazon Managed Service for Apache Flink. 

In questo esempio, mostriamo come usare Apache Maven per recuperare le dipendenze e impacchettare l'applicazione da eseguire su Managed Service for Apache Flink.

**Nota**  
Esistono modi alternativi per recuperare e impacchettare le dipendenze. Questo esempio dimostra un metodo che funziona correttamente con uno o più connettori. Consente inoltre di eseguire l'applicazione localmente, per lo sviluppo e su Managed Service for Apache Flink senza modifiche al codice. 

### Usa il file pom.xml
<a name="gs-python-jar-pom"></a>

Apache Maven utilizza il `pom.xml` file per controllare le dipendenze e il pacchetto delle applicazioni. 

Tutte le dipendenze JAR sono specificate nel file del `pom.xml` blocco. `<dependencies>...</dependencies>` 

```
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    ...
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kinesis</artifactId>
            <version>4.3.0-1.19</version>
        </dependency>
    </dependencies>
    ...
```

Per trovare l'elemento e la versione del connettore corretti da utilizzare, consulta. [Usa i connettori Apache Flink con Managed Service for Apache Flink](how-flink-connectors.md) Assicurati di fare riferimento alla versione di Apache Flink che stai utilizzando. Per questo esempio, utilizziamo il connettore Kinesis. Per Apache Flink 1.19, la versione del connettore è. `4.3.0-1.19`

**Nota**  
Se si utilizza Apache Flink 1.19, non è stata rilasciata alcuna versione del connettore specifica per questa versione. Utilizzate i connettori rilasciati per la versione 1.18.

### Dipendenze da scaricare e impacchettare
<a name="gs-python-dependencies-download"></a>

Usa Maven per scaricare le dipendenze definite nel `pom.xml` file e impacchettarle per l'applicazione Python Flink. 

1. Passa alla directory che contiene il progetto Python Getting Started chiamato. `python/GettingStarted`

1. Esegui il comando seguente:

```
$ mvn package
```

Maven crea un nuovo file chiamato. `./target/pyflink-dependencies.jar` Quando sviluppate localmente sulla vostra macchina, l'applicazione Python cerca questo file. 

**Nota**  
Se dimentichi di eseguire questo comando, quando tenti di eseguire l'applicazione, l'applicazione fallirà e restituirà l'errore: **Could not find any factory for identifier «**kinesis».

## Scrivi record di esempio nel flusso di input
<a name="gs-python-sample-records"></a>

In questa sezione, invierai record di esempio allo stream per l'elaborazione dell'applicazione. Sono disponibili due opzioni per generare dati di esempio, utilizzando uno script Python o il [Kinesis](https://github.com/awslabs/amazon-kinesis-data-generator) Data Generator.

### Genera dati di esempio usando uno script Python
<a name="gs-python-sample-data"></a>

Puoi usare uno script Python per inviare record di esempio allo stream.

**Nota**  
Per eseguire questo script Python, devi usare Python 3.x e avere installata la libreria SDK [AWS for Python](https://aws.amazon.com/developer/language/python/) (Boto).

**Per iniziare a inviare i dati di test al flusso di input Kinesis:**

1. Scarica lo script `stock.py` Python del generatore di [dati dal repository Data generator GitHub ](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/data-generator).

1. Esegui lo script `stock.py`:

   ```
   $ python stock.py
   ```

Mantieni lo script in esecuzione mentre completi il resto del tutorial. Ora puoi eseguire la tua applicazione Apache Flink.

### Generazione di dati di esempio utilizzando Kinesis Data Generator
<a name="gs-python-sample-kinesis"></a>

In alternativa all'utilizzo dello script Python, puoi usare [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator), disponibile anche in una [versione ospitata](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html), per inviare dati di esempio casuali allo stream. Kinesis Data Generator viene eseguito nel browser e non è necessario installare nulla sul computer. 

**Per configurare ed eseguire Kinesis Data Generator:**

1. Segui le istruzioni nella [documentazione di Kinesis Data Generator](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html) per configurare l'accesso allo strumento. Eseguirai un CloudFormation modello che imposta un utente e una password. 

1. Accedi a Kinesis Data Generator tramite l'URL generato dal CloudFormation modello. Puoi trovare l'URL nella scheda **Output** dopo aver completato il CloudFormation modello. 

1. Configura il generatore di dati:
   + **Regione:** Seleziona la regione che stai utilizzando per questo tutorial: us-east-1
   + **Stream/flusso di consegna:** seleziona il flusso di input che l'applicazione utilizzerà: `ExampleInputStream`
   + **Record al secondo**: 100
   + **Modello di registrazione:** copia e incolla il seguente modello:

     ```
     {
       "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}},
       "ticker" : "{{random.arrayElement(
             ["AAPL", "AMZN", "MSFT", "INTC", "TBV"]
         )}}",
       "price" : {{random.number(100)}}          
     }
     ```

1. Prova il modello: scegli **Modello di test** e verifica che il record generato sia simile al seguente:

   ```
   { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
   ```

1. Avvia il generatore di dati: scegli **Seleziona Invia dati**.

Kinesis Data Generator sta ora inviando dati a. `ExampleInputStream` 

## Esegui l'applicazione localmente
<a name="gs-python-run-locally"></a>

Puoi testare l'applicazione localmente, eseguendola dalla riga di comando con `python main.py` o dal tuo IDE.

Per eseguire l'applicazione localmente, è necessario che sia installata la versione corretta della PyFlink libreria, come descritto nella sezione precedente. Per ulteriori informazioni, vedere (link)

**Nota**  
Prima di continuare, verifica che i flussi di input e output siano disponibili. Per informazioni, consulta [Crea due flussi di dati Amazon Kinesis](get-started-exercise.md#get-started-exercise-1). Inoltre, verifica di disporre dell'autorizzazione per leggere e scrivere da entrambi gli stream. Per informazioni, consulta [Autentica la tua sessione AWS](get-started-exercise.md#get-started-exercise-2-5). 

### Importa il progetto Python nel tuo IDE
<a name="gs-python-import"></a>

Per iniziare a lavorare sull'applicazione nel tuo IDE, devi importarla come progetto Python. 

Il repository che hai clonato contiene diversi esempi. Ogni esempio è un progetto separato. Per questo tutorial, importa il contenuto della `./python/GettingStarted` sottodirectory nel tuo IDE. 

Importa il codice come progetto Python esistente.

**Nota**  
Il processo esatto per importare un nuovo progetto Python varia a seconda dell'IDE in uso.

### Controllate la configurazione locale dell'applicazione
<a name="gs-python-check-configuration"></a>

Quando viene eseguita localmente, l'applicazione utilizza la configurazione contenuta nel `application_properties.json` file nella cartella delle risorse del progetto sotto`./src/main/resources`. È possibile modificare questo file per utilizzare diversi nomi o regioni di stream Kinesis.

```
[
  {
    "PropertyGroupId": "InputStream0",
    "PropertyMap": {
      "stream.name": "ExampleInputStream",
      "flink.stream.initpos": "LATEST",
      "aws.region": "us-east-1"
    }
  },
  {
    "PropertyGroupId": "OutputStream0",
    "PropertyMap": {
      "stream.name": "ExampleOutputStream",
      "aws.region": "us-east-1"
    }
  }
]
```

### Esegui la tua applicazione Python localmente
<a name="gs-python-run-locally"></a>

È possibile eseguire l'applicazione localmente, dalla riga di comando come un normale script Python o dall'IDE. 

**Per eseguire l'applicazione dalla riga di comando**

1. Assicurati che l'ambiente Python standalone come Conda VirtualEnv o dove hai installato la libreria Python Flink sia attualmente attivo. 

1. Assicurati di aver eseguito `mvn package` almeno una volta. 

1. Imposta la variabile di ambiente `IS_LOCAL = true`:

   ```
   $ export IS_LOCAL=true
   ```

1. Esegui l'applicazione come un normale script Python.

   ```
   $python main.py
   ```

**Per eseguire l'applicazione dall'interno dell'IDE**

1. Configura il tuo IDE per eseguire lo `main.py` script con la seguente configurazione:

   1. Usa l'ambiente Python autonomo come Conda VirtualEnv o dove hai installato la libreria. PyFlink 

   1. Usa le AWS credenziali per accedere ai flussi di dati Kinesis di input e output. 

   1. Imposta `IS_LOCAL = true`.

1. Il processo esatto per impostare la configurazione di esecuzione dipende dall'IDE in uso e varia. 

1. Dopo aver configurato l'IDE, esegui lo script Python e usa gli strumenti forniti dall'IDE mentre l'applicazione è in esecuzione. 

### Ispezionate i log delle applicazioni localmente
<a name="gs-python-run-IDE"></a>

Quando viene eseguita localmente, l'applicazione non mostra alcun registro nella console, a parte alcune righe stampate e visualizzate all'avvio dell'applicazione. PyFlink scrive i log in un file nella directory in cui è installata la libreria Python Flink. L'applicazione stampa la posizione dei log all'avvio. È inoltre possibile eseguire il comando seguente per trovare i registri:

```
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
```

1. Elenca i file nella directory di registrazione. Di solito si trova un solo `.log` file.

1. Coda il file mentre l'applicazione è in esecuzione:`tail -f <log-path>/<log-file>.log`.

## Osserva i dati di input e output nei flussi Kinesis
<a name="gs-python-observe-input-output"></a>

Puoi osservare i record inviati al flusso di input da (Python di esempio che genera) o Kinesis Data Generator (link) utilizzando **Data Viewer** nella console Amazon Kinesis. 

**Per osservare i record:**  [Apri la console Kinesis in /kinesis. https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)  Verifica che la regione sia la stessa in cui stai eseguendo questo tutorial, che per impostazione predefinita è us-east-1 US East (Virginia settentrionale). Cambia la regione se non corrisponde.    Scegli **Data Streams.**    Seleziona lo stream che desideri osservare, oppure `ExampleInputStream` `ExampleOutputStream.`   Scegli la scheda **Visualizzatore dati**.    Scegli uno **Shard**, mantieni **Ultimo** come **posizione iniziale**, quindi scegli **Ottieni record**. Potresti visualizzare l'errore «Nessun record trovato per questa richiesta». In tal caso, scegli **Riprova a recuperare i record**. Vengono visualizzati i record più recenti pubblicati nello stream.    Scegli il valore nella colonna Dati per esaminare il contenuto del record in formato JSON.   

## Interrompi l'esecuzione locale dell'applicazione
<a name="gs-python-stop"></a>

Arresta l'esecuzione dell'applicazione nel tuo IDE. L'IDE di solito fornisce un'opzione di «stop». La posizione e il metodo esatti dipendono dall'IDE. 

## Package del codice dell'applicazione
<a name="gs-python-package-code"></a>

In questa sezione, si utilizza Apache Maven per impacchettare il codice dell'applicazione e tutte le dipendenze richieste in un file.zip. 

Esegui nuovamente il comando del pacchetto Maven:

```
$ mvn package
```

Questo comando genera il file. `target/managed-flink-pyflink-getting-started-1.0.0.zip`

## Carica il pacchetto dell'applicazione in un bucket Amazon S3
<a name="gs-python-upload-bucket"></a>

In questa sezione, carichi il file.zip creato nella sezione precedente nel bucket Amazon Simple Storage Service (Amazon S3) creato all'inizio di questo tutorial. Se non hai completato questo passaggio, consulta (link).

**Per caricare il file JAR del codice dell'applicazione**

1. Apri la console Amazon S3 all'indirizzo. [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)

1. Scegli il bucket che hai creato in precedenza per il codice dell'applicazione.

1. Scegli **Carica**.

1. Scegliere **Add files (Aggiungi file)**.

1. Passa al file.zip generato nel passaggio precedente:. `target/managed-flink-pyflink-getting-started-1.0.0.zip` 

1. Scegli **Carica** senza modificare altre impostazioni.

## Crea e configura l'applicazione Managed Service for Apache Flink
<a name="gs-python-7"></a>

È possibile creare e configurare un'applicazione Managed Service for Apache Flink utilizzando la console o il. AWS CLI Per questo tutorial, useremo la console. 

### Creazione dell’applicazione
<a name="gs-python-7-console-create"></a>

1. Accedi a e apri Console di gestione AWS la console Amazon MSF all'indirizzo https://console.aws.amazon.com /flink.

1. Verifica che sia selezionata la regione corretta: Stati Uniti orientali (Virginia settentrionale) us-east-1.

1. **Apri il menu a destra e scegli Applicazioni **Apache Flink, quindi Crea applicazione** di streaming.** In alternativa, scegli **Crea applicazione di streaming** dalla sezione Guida **introduttiva** della pagina iniziale. 

1. Nella pagina **Crea applicazioni di streaming**:
   + Per **Scegli un metodo per configurare l'applicazione di elaborazione dello stream**, scegli **Crea da zero**.
   + Per la **configurazione di Apache Flink, versione Application Flink**, scegli **Apache** Flink 1.19.
   + **Per la** configurazione dell'applicazione:
     + Per **Nome applicazione**, immetti **MyApplication**.
     + Per **Descrizione**, inserisci **My Python test app**.
     + In **Accesso alle risorse dell'applicazione**, scegli **Crea/aggiorna il ruolo IAM kinesis-analytics-MyApplication-us -east-1 con le politiche richieste.**
   + Per le impostazioni **di Template for applications**:
     + Per **Modelli**, scegli **Sviluppo**.
   + Scegli **Crea applicazione di streaming**.

**Nota**  
Quando crei un'applicazione del servizio gestito per Apache Flink tramite la console, hai la possibilità di avere un ruolo e una policy IAM creati per l'applicazione. L'applicazione utilizza questo ruolo e questa policy per accedere alle sue risorse dipendenti. Queste risorse IAM sono denominate utilizzando il nome dell'applicazione e la Regione come segue:  
Policy: `kinesis-analytics-service-MyApplication-us-west-2`
Ruolo: `kinesisanalytics-MyApplication-us-west-2`
*Amazon Managed Service for Apache Flink era precedentemente noto come Kinesis Data Analytics.* Il nome delle risorse generate automaticamente ha il prefisso per garantire la compatibilità con le versioni precedenti. `kinesis-analytics`

### Modifica la policy IAM
<a name="gs-python-7-console-iam"></a>

Modifica la policy IAM per aggiungere le autorizzazioni per accedere al bucket Amazon S3.

**Modifica della policy IAM per aggiungere le autorizzazioni per i bucket S3**

1. Aprire la console IAM all'indirizzo [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Seleziona **Policy**. Scegli la policy **`kinesis-analytics-service-MyApplication-us-east-1`** creata dalla console nella sezione precedente. 

1. Scegli **Modifica**, quindi scegli la scheda **JSON**.

1. Aggiungi alla policy la sezione evidenziata del seguente esempio di policy. Sostituisci l'account di esempio IDs (*012345678901*) con l'ID del tuo account.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

1. Scegli **Avanti** e quindi seleziona **Salva modifiche**.

### Configura l'applicazione
<a name="gs-python-7-console-configure"></a>

Modifica la configurazione dell'applicazione per impostare l'elemento del codice dell'applicazione. 

**Per configurare l'applicazione**

1. **Nella **MyApplication**pagina, scegli Configura.**

1. Nella sezione **Posizione del codice dell'applicazione**:
   + Per il bucket **Amazon S3, seleziona il bucket** creato in precedenza per il codice dell'applicazione. **Scegli **Sfoglia** e seleziona il bucket corretto, quindi scegli Scegli.** Non selezionare il nome del bucket. 
   + Per **Percorso dell'oggetto Amazon S3**, inserisci **managed-flink-pyflink-getting-started-1.0.0.zip**

1. Per le **autorizzazioni di accesso**, scegli **Crea/aggiorna il ruolo IAM `kinesis-analytics-MyApplication-us-east-1` con le politiche richieste.**

1. Passa alle **proprietà di Runtime** e mantieni i valori predefiniti per tutte le altre impostazioni.

1. **Scegliete Aggiungi nuovo elemento** e aggiungete ciascuno dei seguenti parametri:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/managed-flink/latest/java/gs-python-createapp.html)

1. Non modificate nessuna delle altre sezioni e scegliete **Salva modifiche**.

**Nota**  
Quando scegli di abilitare la CloudWatch registrazione di Amazon, Managed Service for Apache Flink crea un gruppo di log e un flusso di log per te. I nomi di tali risorse sono i seguenti:   
Gruppo di log: `/aws/kinesis-analytics/MyApplication`
Flusso di log: `kinesis-analytics-log-stream`

### Esecuzione dell'applicazione.
<a name="gs-python-7-console-run"></a>

L'applicazione è ora configurata e pronta per l'esecuzione.

**Per eseguire l'applicazione**

1. **Sulla console per Amazon Managed Service for Apache Flink, scegli **La mia applicazione** e scegli Esegui.**

1. **Nella pagina successiva, nella pagina di configurazione del ripristino dell'applicazione, scegli **Esegui con l'ultima istantanea**, quindi scegli Esegui.** 

   Lo **stato** nell'**applicazione descrive in dettaglio** le transizioni da `Ready` `Starting` e poi a `Running` quando l'applicazione è stata avviata.

Quando l'applicazione è nello `Running` stato, ora puoi aprire la dashboard di Flink. 

**Per aprire il pannello di controllo**

1. Scegli **Apri la dashboard di Apache Flink**. La dashboard si apre in una nuova pagina.

1. Nell'elenco **dei lavori in esecuzione**, scegli il singolo lavoro che puoi vedere. 
**Nota**  
Se imposti le proprietà di Runtime o modifichi le policy IAM in modo errato, lo stato dell'applicazione potrebbe trasformarsi in`Running`, ma la dashboard di Flink mostra che il job viene riavviato continuamente. Si tratta di uno scenario di errore comune se l'applicazione non è configurata correttamente o non dispone delle autorizzazioni per accedere alle risorse esterne.   
Quando ciò accade, controlla la scheda **Eccezioni** nella dashboard di Flink per vedere la causa del problema.

### Osserva le metriche dell'applicazione in esecuzione
<a name="gs-python-observe-metrics"></a>

Nella **MyApplication**pagina, nella sezione **Amazon CloudWatch metrics**, puoi vedere alcune delle metriche fondamentali dell'applicazione in esecuzione. 

**Per visualizzare le metriche**

1. Accanto al pulsante **Aggiorna**, seleziona **10 secondi** dall'elenco a discesa.

1. Quando l'applicazione è in esecuzione ed è integra, puoi vedere la metrica di **uptime aumentare continuamente**.

1. La metrica **fullrestarts deve** essere zero. Se è in aumento, la configurazione potrebbe presentare problemi. Per esaminare il problema, consulta la scheda **Eccezioni** nella dashboard di Flink.

1. La metrica **del numero di checkpoint non riusciti** deve essere pari a zero in un'applicazione integra. 
**Nota**  
Questa dashboard mostra un set fisso di metriche con una granularità di 5 minuti. Puoi creare un pannello di controllo dell'applicazione personalizzato con qualsiasi metrica presente nella dashboard. CloudWatch 

### Osserva i dati di output nei flussi Kinesis
<a name="gs-python-observe-output"></a>

Assicurati di continuare a pubblicare i dati sull'input, usando lo script Python o il Kinesis Data Generator. 

È ora possibile osservare l'output dell'applicazione in esecuzione su Managed Service for Apache Flink utilizzando il Data Viewer in [https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/), analogamente a quanto già fatto in precedenza. 

**Per visualizzare l'output**

1. [Apri la console Kinesis in /kinesis. https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. Verifica che la regione sia la stessa che stai usando per eseguire questo tutorial. Per impostazione predefinita, è US-East-1US East (Virginia settentrionale). Se necessario, modificare la regione.

1. Scegli **Data Streams.** 

1. Seleziona lo stream che desideri osservare. Ai fini di questo tutorial, utilizza `ExampleOutputStream`. 

1.  Scegli la scheda **Data viewer**. 

1. Seleziona uno **Shard**, mantieni **Ultimo** come **posizione iniziale**, quindi scegli **Ottieni record**. Potresti visualizzare l'errore «nessun record trovato per questa richiesta». In tal caso, scegli **Riprova a recuperare i record**. Vengono visualizzati i record più recenti pubblicati nello stream.

1. Seleziona il valore nella colonna Dati per esaminare il contenuto del record in formato JSON.

### Arresta l'applicazione
<a name="gs-python-7-console-stop"></a>

Per interrompere l'applicazione, vai alla pagina della console dell'applicazione Managed Service for Apache Flink denominata. `MyApplication`

**Per interrompere l'applicazione**

1. **Dall'elenco a discesa **Azione**, scegli Stop.**

1. Lo **stato** nell'**applicazione descrive in dettaglio** le transizioni da `Running` e quindi a `Ready` quando l'applicazione viene completamente interrotta. `Stopping` 
**Nota**  
Non dimenticare di interrompere anche l'invio di dati al flusso di input dallo script Python o dal Kinesis Data Generator.

## Approfondimenti
<a name="gs-python-next-step-4"></a>

[Pulisci le risorse AWS](gs-python-cleanup.md)

# Pulisci le risorse AWS
<a name="gs-python-cleanup"></a>

Questa sezione include le procedure per la pulizia AWS delle risorse create nel tutorial Getting Started (Python).

**Topics**
+ [

## Eliminare l'applicazione Managed Service for Apache Flink
](#gs-python-cleanup-app)
+ [

## Eliminare i flussi di dati Kinesis
](#gs-python-cleanup-msk)
+ [

## Elimina oggetti e bucket Amazon S3
](#gs-python-cleanup-s3)
+ [

## Elimina le tue risorse IAM
](#gs-python-cleanup-iam)
+ [

## CloudWatch Elimina le tue risorse
](#gs-python-cleanup-cw)

## Eliminare l'applicazione Managed Service for Apache Flink
<a name="gs-python-cleanup-app"></a>

Per eliminare l'applicazione, utilizza la procedura seguente.

**Per eliminare l'applicazione**

1. [Apri la console Kinesis in /kinesis. https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. Nel pannello Managed Service for Apache Flink, scegli. **MyApplication**

1. Dall'elenco a discesa **Azioni**, scegliete **Elimina** e quindi confermate l'eliminazione.

## Eliminare i flussi di dati Kinesis
<a name="gs-python-cleanup-msk"></a>

1. Accedi a e apri Console di gestione AWS la console Amazon MSF all'indirizzo https://console.aws.amazon.com /flink.

1. **Scegli Flussi di dati.**

1. Seleziona i due stream che hai creato e. `ExampleInputStream` `ExampleOutputStream` 

1. Dall'elenco a discesa **Azioni**, scegli **Elimina**, quindi conferma l'eliminazione.

## Elimina oggetti e bucket Amazon S3
<a name="gs-python-cleanup-s3"></a>

Per eliminare gli oggetti e il bucket S3, utilizza la procedura seguente.

**Per eliminare l'oggetto dal bucket S3**

1. Apri la console Amazon S3 all'indirizzo. [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)

1. Seleziona il bucket S3 che hai creato per l'elemento dell'applicazione.

1. Seleziona l'elemento dell'applicazione che hai caricato, denominato. `amazon-msf-java-stream-app-1.0.jar`

1. Scegliete **Elimina** e confermate l'eliminazione.

**Per eliminare il bucket S3**

1. Apri la console Amazon S3 all'indirizzo. [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)

1. Seleziona il bucket che hai creato per gli artefatti.

1. Scegliete **Elimina e confermate l'eliminazione**.
**Nota**  
Il bucket S3 deve essere vuoto per eliminarlo.

## Elimina le tue risorse IAM
<a name="gs-python-cleanup-iam"></a>

Per eliminare le risorse IAM, segui la procedura riportata di seguito.

**Per eliminare le risorse IAM**

1. Aprire la console IAM all'indirizzo [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Nella barra di navigazione, scegli **Policy**.

1. Nel controllo filtro, inserisci **kinesis**.

1. Scegli la politica **kinesis-analytics-service- MyApplication -us-east-1**.

1. Seleziona **Operazioni di policy** e quindi **Elimina**.

1. Nella barra di navigazione, scegli **Ruoli**.

1. Scegli il ruolo **kinesis-analytics- MyApplication** -us-east-1.

1. Quindi scegli **Elimina ruolo** e conferma l'eliminazione.

## CloudWatch Elimina le tue risorse
<a name="gs-python-cleanup-cw"></a>

Utilizzate la seguente procedura per eliminare le CloudWatch risorse.

**Per eliminare le tue CloudWatch risorse**

1. Apri la CloudWatch console all'indirizzo [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. Nella barra di navigazione, scegli **Log**.

1. Scegli il gruppo**/aws/kinesis-analytics/MyApplication**log.

1. Quindi scegli **Elimina gruppo di log** e conferma l'eliminazione.