

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Usa un notebook Studio con Managed Service for Apache Flink
<a name="how-notebook"></a>

I notebook Studio per il servizio gestito per Apache Flink consentono di interrogare in modo interattivo i flussi di dati in tempo reale e di creare ed eseguire facilmente applicazioni di elaborazione dei flussi utilizzando SQL, Python e Scala standard. Con pochi clic nella console di AWS gestione, puoi avviare un notebook serverless per interrogare i flussi di dati e ottenere risultati in pochi secondi. 

Un notebook è un ambiente di sviluppo basato sul Web. Con i notebook, si ottiene un’esperienza di sviluppo semplice e interattiva combinata con le funzionalità avanzate fornite da Apache Flink. I notebook Studio utilizzano notebook basati su [Apache Zeppelin](https://zeppelin.apache.org/) e utilizzano [Apache Flink](https://flink.apache.org/) come motore di elaborazione del flusso. I notebook Studio combinano perfettamente queste tecnologie per rendere le analisi avanzate sui flussi di dati accessibili a sviluppatori con tutte le competenze. 

Apache Zeppelin fornisce ai notebook Studio una suite completa di strumenti di analisi, tra cui:
+ Visualizzazione dei dati
+ Esportazione di dati nei file
+ Controllo del formato di output per un'analisi più semplice

Per una guida all'utilizzo del servizio gestito per Apache Flink e Apache Zeppelin, consulta [Tutorial: crea un notebook Studio in Managed Service per Apache Flink](example-notebook.md). Per ulteriori informazioni su Apache Zeppelin, consulta la [documentazione su Apache Zeppelin](http://zeppelin.apache.org).

 [Con un notebook, puoi modellare le query utilizzando l'[API Apache Flink Table e SQL in SQL](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/), Python o Scala o l'API in Scala. DataStream ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) Con pochi clic, puoi quindi promuovere il notebook Studio a un'applicazione di elaborazione di flussi del servizio gestito per Apache Flink in esecuzione continua e non interattiva per i carichi di lavoro di produzione.

**Topics**
+ [Usa la versione corretta di Studio Notebook Runtime](studio-notebook-versions.md)
+ [Crea un taccuino da Studio](how-zeppelin-creating.md)
+ [

# Esegui un'analisi interattiva dei dati in streaming
](how-zeppelin-interactive.md)
+ [

# Implementa come applicazione con uno stato durevole
](how-notebook-durable.md)
+ [autorizzazioni IAM](how-zeppelin-iam.md)
+ [Usa connettori e dipendenze](how-zeppelin-connectors.md)
+ [Funzioni definite dall'utente](how-zeppelin-udf.md)
+ [

# Abilitazione del checkpointing
](how-zeppelin-checkpoint.md)
+ [

# Aggiorna Studio Runtime
](upgrading-studio-runtime.md)
+ [

# Lavora con AWS Glue
](how-zeppelin-glue.md)
+ [Esempi e tutorial per notebook Studio in Managed Service for Apache Flink](how-zeppelin-examples.md)
+ [

# Risolvi i problemi relativi ai notebook Studio per Managed Service for Apache Flink
](how-zeppelin-troubleshooting.md)
+ [

# Crea policy IAM personalizzate per i notebook Managed Service for Apache Flink Studio
](how-zeppelin-appendix-iam.md)

# Usa la versione corretta di Studio Notebook Runtime
<a name="studio-notebook-versions"></a>

Con Amazon Managed Service per Apache Flink Studio, puoi interrogare i flussi di dati in tempo reale e creare ed eseguire applicazioni di elaborazione dei flussi utilizzando SQL, Python e Scala standard in un notebook interattivo. [I notebook Studio sono basati su Apache [Zeppelin e utilizzano Apache](https://zeppelin.apache.org/) Flink come motore di elaborazione dello stream.](https://flink.apache.org/) 

**Nota**  
**Deprecheremo Studio Runtime con la versione 1.11 di Apache Flink il 5 novembre 2024.** A partire da questa data, non sarà possibile eseguire nuovi notebook o creare nuove applicazioni utilizzando questa versione. Si consiglia di eseguire l'aggiornamento al runtime più recente (Apache Flink 1.15 e Apache Zeppelin 0.10) prima di tale data. Per indicazioni su come aggiornare il notebook, consulta. [Aggiorna Studio Runtime](upgrading-studio-runtime.md)


**Studio Runtime**  

| Versione Apache Flink | Versione Apache Zeppelin | Versione di Python |  | 
| --- | --- | --- | --- | 
| 1.15 | 0.1 | 3.8 | Consigliato | 
| 1.13 | 0.9 | 3.8 | Supportata fino al 16 ottobre 2024 | 
| 1.11 | 0.9 | 3.7 | Data di scadenza il 24 febbraio 2025 | 

# Crea un taccuino Studio
<a name="how-zeppelin-creating"></a>

Un notebook Studio contiene query o programmi scritti in SQL, Python o Scala che vengono eseguiti su dati di streaming e restituiscono risultati analitici. Puoi creare la tua applicazione utilizzando la console o la CLI e fornire query per l'analisi dei dati dalla tua origine dati.

L'applicazione ha i seguenti componenti:
+ Un'origine dati, ad esempio un cluster Amazon MSK, un flusso di dati Kinesis o un bucket Amazon S3.
+ Un AWS Glue database. Questo database contiene tabelle in cui sono archiviati gli schemi e gli endpoint di origine e destinazione dei dati. Per ulteriori informazioni, consulta [Lavora con AWS Glue](how-zeppelin-glue.md).
+ Il tuo codice dell'applicazione. Il codice implementa la tua query o il tuo programma di analisi.
+ Le impostazioni dell'applicazione e le proprietà di runtime. Per informazioni sulle impostazioni dell'applicazione e le proprietà di runtime, consulta i seguenti argomenti nella [Guida per gli sviluppatori di applicazioni Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html):
  + **Parallelismo e dimensionamento delle applicazioni:** l'impostazione Parallelismo dell'applicazione serve per controllare il numero di query che l'applicazione può eseguire contemporaneamente. Le query possono inoltre trarre vantaggio da un aumento del parallelismo se hanno più percorsi di esecuzione, ad esempio nelle seguenti circostanze:
    + Durante l'elaborazione di più partizioni di un flusso di dati Kinesis
    + Durante il partizionamento dei dati utilizzando l'operatore `KeyBy`.
    + Quando si utilizzano più operatori finestra

    Per ulteriori informazioni sul dimensionamento dell'applicazione, consulta [Dimensionamento delle applicazioni nel servizio gestito per Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html).
  + **Registrazione e monitoraggio:** per informazioni sulla registrazione e il monitoraggio delle applicazioni, consulta [Registrazione e monitoraggio nel servizio gestito da Amazon per Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/monitoring-overview.html).
  + La tua applicazione utilizza checkpoint e savepoint per la tolleranza agli errori. I checkpoint e i savepoint non sono abilitati per impostazione predefinita per i notebook Studio.

Puoi creare il tuo taccuino Studio utilizzando Console di gestione AWS o AWS CLI. 

Quando crei l'applicazione dalla console, hai a disposizione le seguenti opzioni:
+ Nella console Amazon MSK, scegli un cluster, quindi scegli **Elabora dati in tempo reale**.
+ Nella console del flusso di dati Kinesis, scegli un flusso di dati, quindi nella scheda **Applicazioni** scegli **Elabora dati in tempo reale**.
+ Nella console del servizio gestito per Apache Flink, scegli la scheda **Studio**, quindi scegli **Crea notebook Studio**.

# Esegui un'analisi interattiva dei dati in streaming
<a name="how-zeppelin-interactive"></a>

Utilizza un notebook serverless basato su Apache Zeppelin per interagire con i tuoi dati di streaming. Il notebook può contenere più note e ogni nota può contenere uno o più paragrafi in cui scrivere il codice.

L'esempio seguente di query SQL mostra come recuperare dati da un'origine dati:

```
%flink.ssql(type=update)
select * from stock;
```

Per altri esempi di query SQL di Flink Streaming, consulta [Esempi e tutorial per notebook Studio in Managed Service for Apache Flink](how-zeppelin-examples.md) quanto segue e [Query](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/overview/) nella documentazione di Apache Flink.

È possibile utilizzare le query SQL di Flink nel notebook Studio per interrogare i dati di streaming. Puoi anche usare Python (Table API) e Scala (Table and Datastream APIs) per scrivere programmi per interrogare i tuoi dati di streaming in modo interattivo. Puoi visualizzare i risultati delle query o dei programmi, aggiornarli in pochi secondi ed eseguirli nuovamente per visualizzare i risultati aggiornati.

## Interpreti Flink
<a name="how-zeppelin-interactive-interpreters"></a>

Puoi specificare la lingua utilizzata dal servizio gestito per Apache Flink per eseguire l'applicazione utilizzando un *interprete*. Con il servizio gestito da Amazon per Apache Flink puoi utilizzare i seguenti interpreti:


| Nome | Classe | Description | 
| --- |--- |--- |
| %flink | FlinkInterpreter | Crea ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironmente fornisce un ambiente Scala | 
| %flink.pyflink | PyFlinkInterpreter | Fornisce un ambiente python | 
| %flink.ipyflink | IPyFlinkInterpreter | Fornisce un ambiente ipython | 
| %flink.ssql | FlinkStreamSqlInterpreter | Fornisce un ambiente stream sql | 
| %flink.bsql | FlinkBatchSqlInterpreter | Fornisce un ambiente sql in batch | 

Per ulteriori informazioni sugli interpreti Flink, consulta [Interprete Flink per Apache Zeppelin](https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html).

Se si utilizzano `%flink.pyflink` o `%flink.ipyflink` come interpreti, è necessario utilizzare il `ZeppelinContext` per visualizzare i risultati all'interno del notebook.

Per esempi più PyFlink specifici, consulta [Interroga i flussi di dati in modo interattivo utilizzando Managed Service per Apache Flink Studio](https://aws.amazon.com/blogs/big-data/query-your-data-streams-interactively-using-kinesis-data-analytics-studio-and-python/) e Python.

## Variabili dell'ambiente tabellare Apache Flink
<a name="how-zeppelin-interactive-env-vars"></a>

Apache Zeppelin fornisce l'accesso alle risorse dell'ambiente tabellare utilizzando variabili di ambiente. 

Puoi accedere alle risorse dell'ambiente tabellare Scala con le seguenti variabili:


| Variabile | Risorsa | 
| --- |--- |
| senv | StreamExecutionEnvironment | 
| stenv | StreamTableEnvironment for blink planner | 

Puoi accedere alle risorse dell'ambiente tabellare Python con le seguenti variabili:


| Variabile | Risorsa | 
| --- |--- |
| s\$1env | StreamExecutionEnvironment | 
| st\$1env | StreamTableEnvironment for blink planner | 

Per ulteriori informazioni sull'utilizzo degli ambienti tabellari, consulta [Concetti e API comuni](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/common/) nella documentazione di Apache Flink. 

# Implementa come applicazione con uno stato durevole
<a name="how-notebook-durable"></a>

Puoi creare il codice ed esportarlo in Amazon S3. Puoi promuovere il codice che hai scritto nella nota in un'applicazione di elaborazione di flussi in esecuzione continua. Esistono due modalità per eseguire un'applicazione Apache Flink nel servizio gestito per Apache Flink: con un notebook Studio, hai la possibilità di sviluppare il codice in modo interattivo, visualizzarne i risultati in tempo reale e visualizzarlo all'interno della nota. Dopo aver implementato una nota per l'esecuzione in modalità streaming, il servizio gestito per Apache Flink crea un'applicazione che viene eseguita continuamente, legge i dati dalle origini, scrive nelle destinazioni, mantiene lo stato dell'applicazione a lungo termine e viene automaticamente dimensionato in base al throughput dei flussi di origine. 

**Nota**  
Il bucket S3 in cui si esporta il codice dell'applicazione deve trovarsi nella stessa regione del notebook Studio.

È possibile implementare una nota dal notebook Studio solo se soddisfa i seguenti criteri:
+ I paragrafi devono essere ordinati in sequenza. Quando distribuite l'applicazione, tutti i paragrafi all'interno di una nota verranno eseguiti in sequenza (left-to-right, top-to-bottom) così come appaiono nella nota. È possibile controllare questo ordine selezionando **Esegui tutti i paragrafi** nella nota.
+ Il tuo codice è una combinazione di Python ed SQL o di Scala ed SQL. Al momento non supportiamo Python e Scala insieme per. deploy-as-application
+ La nota dovrebbe avere solo i seguenti interpreti: `%flink`, `%flink.ssql`, `%flink.pyflink`, `%flink.ipyflink`, `%md`.
+ L'uso dell'oggetto `z` [del contesto Zeppelin](https://zeppelin.apache.org/docs/0.9.0/usage/other_features/zeppelin_context.html) non è supportato. I metodi che non restituiscono nulla si limiteranno a registrare un avviso. Altri metodi genereranno eccezioni in Python o non riusciranno a compilare in Scala.
+ Una nota deve generare un singolo processo Apache Flink. 
+ Le note con i [moduli dinamici](https://zeppelin.apache.org/docs/0.9.0/usage/dynamic_form/intro.html) non sono supportate per l'implementazione come applicazione.
+ I paragrafi %md ([Markdown](https://zeppelin.apache.org/docs/0.9.0/interpreter/markdown.html)) verranno ignorati durante l'implementazione come applicazione, poiché si prevede che contengano documentazione leggibile dall'uomo che non è adatta all'esecuzione come parte dell'applicazione risultante.
+ I paragrafi disabilitati per l'esecuzione in Zeppelin verranno ignorati durante l'implementazione come applicazione. Anche se un paragrafo disabilitato utilizza un interprete incompatibile, ad esempio `%flink.ipyflink` in una nota con `%flink` `and %flink.ssql` interpreti, verrà ignorato durante l'utilizzo della nota come applicazione e non genererà alcun errore.
+ Affinché la distribuzione dell'applicazione abbia successo, deve essere presente almeno un paragrafo con il codice sorgente (Flink SQL PyFlink o Flink Scala) abilitato per l'esecuzione.
+ L'impostazione del parallelismo nella direttiva dell'interprete all'interno di un paragrafo (ad esempio `%flink.ssql(parallelism=32)`) verrà ignorata nelle applicazioni implementate da una nota. È invece possibile aggiornare l'applicazione distribuita tramite l' AWS API AWS Command Line Interface o per modificare le Console di gestione AWS impostazioni della and/or ParallelismPer KPU Parallelism in base al livello di parallelismo richiesto dall'applicazione, oppure è possibile abilitare la scalabilità automatica per l'applicazione distribuita.
+ Se stai implementando come applicazione con stato durevole, il VPC deve avere accesso a Internet. Se il VPC non dispone di accesso a Internet, consulta [Implementa come applicazione con stato durevole in un VPC senza accesso a Internet](how-zeppelin-troubleshooting.md#how-zeppelin-troubleshooting-deploying-no-internet). 

## Criteri Scala/Python
<a name="how-notebook-durable-scala"></a>
+ Nel tuo codice Scala o Python, usa il [pianificatore Blink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/#dependency-structure) (`senv`, `stenv` per Scala; `s_env`, per `st_env` Python) e non il vecchio pianificatore "Flink" (`stenv_2` per Scala, `st_env_2` per Python). Il progetto Apache Flink consiglia l'utilizzo del pianificatore Blink per i casi d'uso in produzione, e questo è il pianificatore predefinito in Zeppelin e Flink.
+ I tuoi paragrafi in Python non devono utilizzare [invocazioni/assegnazioni di shell](https://ipython.readthedocs.io/en/stable/interactive/python-ipython-diff.html#shell-assignment) utilizzando [comandi IPython magici](https://ipython.readthedocs.io/en/stable/interactive/magics.html) come `!` `%timeit` o `%conda` nelle note pensate per essere distribuite come applicazioni.
+ Non è possibile utilizzare le classi di casi Scala come parametri di funzioni passate a operatori di flusso di dati di ordine superiore come `map` e `filter`. Per informazioni sulle classi di casi di Scala, consulta [CLASSI DI CASI](https://docs.scala-lang.org/overviews/scala-book/case-classes.html) nella documentazione di Scala.

## Criteri SQL
<a name="how-notebook-durable-sql"></a>
+ Le istruzioni SELECT semplici non sono consentite, in quanto non esiste nulla di equivalente alla sezione di output di un paragrafo in cui è possibile fornire i dati.
+ In ogni paragrafo, le istruzioni DDL (`USE`, `CREATE`, `ALTER`, `DROP`, `SET`, `RESET`) devono precedere le istruzioni DML (`INSERT`). Questo perché le istruzioni DML contenute in un paragrafo devono essere inviate insieme come un unico processo Flink.
+ Dovrebbe esserci al massimo un paragrafo contenente istruzioni DML. Questo perché, per questa deploy-as-application funzionalità, supportiamo solo l'invio di un singolo lavoro a Flink.

Per ulteriori informazioni e un esempio, consulta [Traduzione, redazione e analisi dei dati di streaming utilizzando le funzioni SQL con il servizio gestito da Amazon per Apache Flink, Amazon Translate e Amazon Comprehend](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesisanalytics-MyApplicatioamazon-translate-and-amazon-comprehend/).

# Verifica le autorizzazioni IAM per i notebook Studio
<a name="how-zeppelin-iam"></a>

Il servizio gestito per Apache Flink crea automaticamente un ruolo IAM quando crei un notebook Studio tramite la Console di gestione AWS. Inoltre associa a quel ruolo una policy che consente i seguenti accessi:


****  

| Servizio | Accesso  | 
| --- | --- | 
| CloudWatch Registri | List | 
| Amazon EC2 | List | 
| AWS Glue | Lettura/scrittura | 
| Servizio gestito per Apache Flink | Lettura | 
| Servizio gestito per Apache Flink V2 | Lettura | 
| Simple Storage Service (Amazon S3) | Lettura/scrittura | 

# Usa connettori e dipendenze
<a name="how-zeppelin-connectors"></a>

I connettori consentono di leggere e scrivere dati utilizzando tecnologie diverse. Il servizio gestito per Apache Flink include tre connettori predefiniti nel notebook Studio. Puoi inoltre utilizzare connettori personalizzati. Per ulteriori informazioni sui connettori, consulta [Connettori tabella ed SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/) nella documentazione di Apache Flink.

## Connettori predefiniti
<a name="zeppelin-default-connectors"></a>

Se utilizzi il Console di gestione AWS per creare il tuo notebook Studio, Managed Service for Apache Flink include i seguenti connettori personalizzati per impostazione predefinita:`flink-sql-connector-kinesis`, e. `flink-connector-kafka_2.12` `aws-msk-iam-auth` Per creare un notebook Studio tramite la console senza questi connettori personalizzati, scegli l'opzione **Crea con impostazioni personalizzate**. Quindi, quando arrivi alla pagina **Configurazioni**, deseleziona le caselle di controllo accanto ai due connettori.

Se utilizzi l'[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)API per creare il tuo notebook Studio, i `flink-connector-kafka` connettori `flink-sql-connector-flink` e non sono inclusi per impostazione predefinita. Per aggiungerli, specificali come `MavenReference` nel tipo di dati `CustomArtifactsConfiguration`, come mostrato negli esempi seguenti.

Il connettore `aws-msk-iam-auth` è il connettore da utilizzare con Amazon MSK che include la funzionalità di autenticazione automatica con IAM. 

**Nota**  
Le versioni dei connettori mostrate nell'esempio seguente sono le uniche supportate.

```
For the Kinesis connector:

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "org.apache.flink",

      "ArtifactId": "flink-sql-connector-kinesis",
      "Version": "1.15.4"

   }      
}]

For authenticating with AWS MSK through AWS IAM:

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "software.amazon.msk",
      "ArtifactId": "aws-msk-iam-auth",
      "Version": "1.1.6"
   }      
}]
            
For the Apache Kafka connector:  

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "org.apache.flink",

      "ArtifactId": "flink-connector-kafka",
      "Version": "1.15.4"

   }      
}]
```

Per aggiungere questi connettori a un notebook esistente, utilizza l'operazione [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)API e specificali come `MavenReference` tipo di `CustomArtifactsConfigurationUpdate` dati.

**Nota**  
È possibile impostare `failOnError` su true per il connettore `flink-sql-connector-kinesis` nell'API della tabella.

## Aggiungi dipendenze e connettori personalizzati
<a name="zeppelin-custom-connectors"></a>

Per utilizzare il Console di gestione AWS per aggiungere una dipendenza o un connettore personalizzato al tuo notebook Studio, procedi nel seguente modo:

1. Carica il file del connettore personalizzato in Amazon S3.

1. In Console di gestione AWS, scegli l'opzione di creazione **personalizzata per creare** il tuo notebook Studio.

1. Segui il flusso di lavoro per la creazione del notebook Studio fino alla fase **Configurazioni**.

1. Nella sezione **Connettori personalizzati**, scegli **Aggiungi connettore personalizzato**.

1. Specifica la posizione Amazon S3 della dipendenza o del connettore personalizzato.

1. Scegli **Save changes** (Salva modifiche).

Per aggiungere un JAR di dipendenza o un connettore personalizzato quando crei un nuovo notebook Studio utilizzando l'[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)API, specifica la posizione Amazon S3 del JAR di dipendenza o del connettore personalizzato nel `CustomArtifactsConfiguration` tipo di dati. Per aggiungere una dipendenza o un connettore personalizzato a un notebook Studio esistente, richiama l'operazione [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)API e specifica la posizione Amazon S3 del JAR della dipendenza o del connettore personalizzato nel tipo di dati. `CustomArtifactsConfigurationUpdate`

**Nota**  
Quando includi una dipendenza o un connettore personalizzato, devi inserire anche tutte le relative dipendenze transitive non incluse al suo interno.

# Implementa funzioni definite dall'utente
<a name="how-zeppelin-udf"></a>

Le funzioni definite dall'utente (UDFs) sono punti di estensione che consentono di richiamare la logica utilizzata di frequente o la logica personalizzata che non può essere espressa diversamente nelle query. Puoi usare Python o un linguaggio JVM come Java o Scala per implementare i paragrafi all'interno del tuo UDFs notebook Studio. Puoi anche aggiungere al tuo notebook Studio file JAR esterni che contengono file JAR UDFs implementati in un linguaggio JVM. 

Quando implementate le classi astratte di JARs quel registro `UserDefinedFunction` (o le vostre classi astratte), utilizzate l'ambito fornito in Apache Maven, le dichiarazioni di `compileOnly` dipendenza in Gradle, l'ambito fornito in SBT o una direttiva equivalente nella configurazione di build del progetto UDF. Ciò consente la compilazione del codice sorgente UDF con Flink, ma le classi API Flink APIs non sono a loro volta incluse negli artefatti di compilazione. Fai riferimento a questo [pom](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47) tratto dal jar UDF di esempio, che rispetta tale prerequisito su un progetto Maven. 

**Nota**  
Per un esempio di configurazione, consulta [Traduzione, redazione e analisi dei dati di streaming utilizzando le funzioni SQL con il servizio gestito da Amazon per Apache Flink, Amazon Translate e Amazon Comprehend](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/) sul *blog Machine Learning AWS *.

Per utilizzare la console per aggiungere file JAR UDF al notebook Studio, segui questi passaggi:

1. Carica il file JAR UDF su Amazon S3.

1. In Console di gestione AWS, scegli l'opzione di **creazione personalizzata per creare il tuo taccuino** Studio.

1. Segui il flusso di lavoro per la creazione del notebook Studio fino alla fase **Configurazioni**.

1. Nella sezione **Funzioni definite dall'utente**, scegli **Aggiungi una funzione definita dall'utente**.

1. Specifica la posizione Amazon S3 del file JAR o del file ZIP che contiene l'implementazione dell'UDF.

1. Scegli **Save changes** (Salva modifiche).

Per aggiungere un JAR UDF quando crei un nuovo notebook Studio utilizzando l'[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)API, specifica la posizione JAR nel tipo di `CustomArtifactConfiguration` dati. Per aggiungere un file JAR UDF a un notebook Studio esistente, richiamate l'operazione [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)API e specificate la posizione JAR nel `CustomArtifactsConfigurationUpdate` tipo di dati. In alternativa, è possibile utilizzare il Console di gestione AWS per aggiungere file JAR UDF al notebook Studio.

## Considerazioni sulle funzioni definite dall'utente
<a name="how-zeppelin-udf-considerations"></a>
+ Il servizio gestito per Apache Flink Studio utilizza la [terminologia di Apache Zeppelin](https://zeppelin.apache.org/docs/0.9.0/quickstart/explore_ui.html), in cui un notebook è un'istanza Zeppelin che può contenere più note. Ogni nota può quindi contenere più paragrafi. Con il servizio gestito per Apache Flink Studio, il processo di interpretazione è condiviso tra tutte le note del taccuino. Quindi, se si esegue una registrazione esplicita della [createTemporarySystemfunzione utilizzando Function](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableEnvironment.html#createTemporarySystemFunction-java.lang.String-java.lang.Class-) in una nota, è possibile fare riferimento alla stessa così com'è in un'altra nota dello stesso taccuino. 

  L'operazione *Implementa come applicazione* funziona tuttavia su una *singola* nota e non su tutte le note del notebook. Quando si esegue l'implementazione come applicazione, per generare l'applicazione vengono utilizzati solo i contenuti della nota attiva. Qualsiasi registrazione esplicita di funzioni eseguita in altri notebook non fa parte delle dipendenze dell'applicazione generate. Inoltre, mentre l'opzione Implementa come applicazione è in esecuzione, si verifica una registrazione implicita della funzione convertendo il nome della classe principale del JAR in una stringa minuscola.

   Ad esempio, se `TextAnalyticsUDF` è la classe principale per il JAR UDF, una registrazione implicita darà come risultato il nome della funzione `textanalyticsudf`. Quindi, se la registrazione esplicita di una funzione nella nota 1 di Studio si verifica come segue, tutte le altre note in quel notebook (ad esempio la nota 2) possono fare riferimento alla funzione in base al nome `myNewFuncNameForClass` grazie all'interprete condiviso:

  `stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())`

   Tuttavia, durante l'operazione di implementazione come applicazione sulla nota 2, questa registrazione esplicita *non verrà inclusa* nelle dipendenze e quindi l'applicazione implementata non funzionerà come previsto. A causa della registrazione implicita, per impostazione predefinita tutti i riferimenti a questa funzione dovrebbero essere con `textanalyticsudf` e non con `myNewFuncNameForClass`.

   Se è necessaria una registrazione personalizzata del nome della funzione, si prevede che la nota 2 stessa contenga un altro paragrafo per eseguire un'altra registrazione esplicita come segue: 

  ```
  %flink(parallelism=l)
  import com.amazonaws.kinesis.udf.textanalytics.TextAnalyticsUDF 
  # re-register the JAR for UDF with custom name
  stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())
  ```

  ```
  %flink. ssql(type=update, parallelism=1) 
  INSERT INTO
      table2
  SELECT
      myNewFuncNameForClass(column_name)
  FROM
      table1
  ;
  ```
+ Se il codice JAR UDF include Flink SDKs, configurate il progetto Java in modo che il codice sorgente UDF possa essere compilato con Flink SDKs, ma le classi Flink SDK non siano esse stesse incluse nell'elemento di build, ad esempio il JAR. 

  È possibile utilizzare l'ambito `provided` in Apache Maven, dichiarazioni di dipendenza `compileOnly` in Gradle, l'ambito `provided` in SBT o una direttiva equivalente nella configurazione di build del progetto UDF. Puoi fare riferimento a questo [pom](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47) tratto dal jar UDF di esempio, che rispetta tale prerequisito su un progetto Maven. Per un step-by-step tutorial completo, consulta questo articolo [Traduci, correggi e analizza i dati di streaming utilizzando le funzioni SQL con Amazon Managed Service for Apache Flink, Amazon Translate e Amazon Comprehend](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/).

# Abilitazione del checkpointing
<a name="how-zeppelin-checkpoint"></a>

È possibile abilitare la creazione di checkpoint utilizzando le impostazioni dell'ambiente. Per informazioni sulla creazione di checkpoint, consulta [Tolleranza agli errori](https://docs.aws.amazon.com/managed-flink/latest/java/how-fault.html) nella [Guida per gli sviluppatori del servizio gestito per Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/).

## Imposta l'intervallo di checkpoint
<a name="how-zeppelin-checkpoint-interval"></a>

Il seguente esempio di codice Scala imposta l'intervallo di checkpoint dell'applicazione su un minuto:

```
// start a checkpoint every 1 minute
stenv.enableCheckpointing(60000)
```

Il seguente esempio di codice Python imposta l'intervallo di checkpoint dell'applicazione su un minuto:

```
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.interval", "1min"
)
```

## Imposta il tipo di checkpoint
<a name="how-zeppelin-checkpoint-type"></a>

Il seguente esempio di codice Scala imposta la modalità di checkpoint dell'applicazione su `EXACTLY_ONCE` (impostazione predefinita):

```
// set mode to exactly-once (this is the default)
stenv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
```

Il seguente esempio di codice Python imposta la modalità di checkpoint dell'applicazione su `EXACTLY_ONCE` (impostazione predefinita):

```
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.mode", "EXACTLY_ONCE"
)
```

# Aggiorna Studio Runtime
<a name="upgrading-studio-runtime"></a>

Questa sezione contiene informazioni su come aggiornare il notebook Studio Runtime. Ti consigliamo di eseguire sempre l'aggiornamento all'ultima versione supportata di Studio Runtime.

## Aggiorna il tuo notebook a un nuovo Studio Runtime
<a name="upgrading-notebook"></a>

A seconda di come utilizzi Studio, i passaggi per aggiornare il tuo Runtime sono diversi. Seleziona l'opzione più adatta al tuo caso d'uso.

### Query SQL o codice Python senza dipendenze esterne
<a name="notebook-no-dependencies"></a>

Se si utilizza SQL o Python senza dipendenze esterne, utilizzare il seguente processo di aggiornamento del Runtime. Ti consigliamo di eseguire l'aggiornamento alla versione più recente di Runtime. Il processo di aggiornamento è lo stesso, indipendentemente dalla versione di Runtime da cui si esegue l'aggiornamento. 

1. Crea un nuovo notebook Studio utilizzando la versione più recente del Runtime.

1. Copia e incolla il codice di ogni nota dal vecchio taccuino al nuovo taccuino.

1. Nel nuovo notebook, modifica il codice per renderlo compatibile con qualsiasi funzionalità di Apache Flink modificata rispetto alla versione precedente.
   + Esegui il nuovo notebook. Apri il taccuino ed eseguilo nota per nota, in sequenza, e verifica se funziona.
   + Apporta le modifiche necessarie al codice.
   + Arresta il nuovo notebook.

1. Se hai distribuito il vecchio notebook come applicazione:
   + Implementa il nuovo notebook come nuova applicazione separata.
   + Arresta la vecchia applicazione.
   + Esegui la nuova applicazione senza istantanea.

1. Arresta il vecchio notebook se è in esecuzione. Avvia il nuovo notebook, se necessario, per un uso interattivo.

**Flusso di processo per l'aggiornamento senza dipendenze esterne**

![\[Il diagramma seguente rappresenta il flusso di lavoro consigliato per aggiornare il notebook senza dipendenze esterne.\]](http://docs.aws.amazon.com/it_it/managed-flink/latest/java/images/MSF-Studio-upgrade-without-dependencies.png)


### Query SQL o codice Python con dipendenze esterne
<a name="notebook-dependencies"></a>

Segui questo processo se utilizzi SQL o Python e utilizzi dipendenze esterne come connettori o artefatti personalizzati, come funzioni definite dall'utente implementate in Python o Java. Ti consigliamo di eseguire l'aggiornamento alla versione più recente del Runtime. Il processo è lo stesso, indipendentemente dalla versione di Runtime da cui si esegue l'aggiornamento.

1. Crea un nuovo notebook Studio utilizzando la versione più recente del Runtime.

1. Copia e incolla il codice di ogni nota dal vecchio taccuino al nuovo taccuino.

1. Aggiorna le dipendenze esterne e gli artefatti personalizzati.
   + Cerca nuovi connettori compatibili con la versione Apache Flink del nuovo Runtime. Fate riferimento a [Table & SQL Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/overview/) nella documentazione di Apache Flink per trovare i connettori corretti per la versione Flink.
   + Aggiorna il codice delle funzioni definite dall'utente in modo che corrisponda alle modifiche nell'API Apache Flink e a qualsiasi dipendenza Python o JAR utilizzata dalle funzioni definite dall'utente. Reimpacchetta il tuo artefatto personalizzato aggiornato.
   + Aggiungi questi nuovi connettori e artefatti al nuovo notebook.

1. Nel nuovo notebook, modifica il codice per renderlo compatibile con qualsiasi funzionalità di Apache Flink modificata rispetto alla versione precedente.
   + Esegui il nuovo notebook. Apri il taccuino ed eseguilo nota per nota, in sequenza, e verifica se funziona.
   + Apporta le modifiche necessarie al codice.
   + Arresta il nuovo notebook.

1. Se hai distribuito il vecchio notebook come applicazione:
   + Implementa il nuovo notebook come nuova applicazione separata.
   + Arresta la vecchia applicazione.
   + Esegui la nuova applicazione senza istantanea.

1. Arresta il vecchio notebook se è in esecuzione. Avvia il nuovo notebook, se necessario, per un uso interattivo.

**Flusso di processo per l'aggiornamento con dipendenze esterne**

![\[Il diagramma seguente rappresenta il flusso di lavoro consigliato per aggiornare il notebook con dipendenze esterne.\]](http://docs.aws.amazon.com/it_it/managed-flink/latest/java/images/MSF-Studio-upgrade-with-dependencies.png)


# Lavora con AWS Glue
<a name="how-zeppelin-glue"></a>

Il notebook Studio archivia e riceve informazioni sulle fonti di dati e sui relativi sink. AWS Glue Quando si crea il notebook Studio, si specifica il AWS Glue database che contiene le informazioni di connessione. Quando si accede alle fonti di dati e ai sink, si specificano le AWS Glue tabelle contenute nel database. AWS Glue Le tabelle forniscono l'accesso alle AWS Glue connessioni che definiscono le posizioni, gli schemi e i parametri delle fonti e delle destinazioni dei dati.

I notebook Studio utilizzano le proprietà delle tabelle per archiviare dati specifici dell'applicazione. Per ulteriori informazioni, consulta [Proprietà tabella](how-zeppelin-glue-properties.md).

Per un esempio di come configurare una AWS Glue connessione, un database e una tabella da utilizzare con i notebook Studio, consulta [Creare un database AWS Glue](example-notebook.md#example-notebook-glue) il tutorial. [Tutorial: crea un notebook Studio in Managed Service per Apache Flink](example-notebook.md)

# Proprietà tabella
<a name="how-zeppelin-glue-properties"></a>

Oltre ai campi dati, le AWS Glue tabelle forniscono altre informazioni al notebook Studio utilizzando le proprietà delle tabelle. Managed Service for Apache Flink utilizza le seguenti proprietà della AWS Glue tabella:
+ [Definisci i valori temporali di Apache Flink](#how-zeppelin-glue-timestamp): queste proprietà definiscono il modo in cui il servizio gestito per Apache Flink genera i valori del tempo di elaborazione dei dati interni di Apache Flink.
+ [Usa il connettore Flink e le proprietà di formato](#how-zeppelin-glue-connector): queste proprietà forniscono informazioni sui flussi di dati.

Per aggiungere una proprietà a una AWS Glue tabella, procedi come segue:

1. Accedi a Console di gestione AWS e apri la AWS Glue console all'indirizzo [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Dall'elenco di tabelle, seleziona la tabella che l'applicazione utilizza per memorizzare le informazioni sulla connessione dati. Scegli **Operazione** > **Modifica dettagli tabella**.

1. In **Proprietà tabella**, inserisci **managed-flink.proctime** per **chiave** e **user\$1action\$1time** per **valore**.

## Definisci i valori temporali di Apache Flink
<a name="how-zeppelin-glue-timestamp"></a>

Apache Flink fornisce valori temporali che descrivono quando si sono verificati eventi di elaborazione del flusso, come [Tempo di elaborazione](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#processing-time) e [Ora evento](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#event-time). Per includere questi valori nell'output dell'applicazione, definisci le proprietà nella AWS Glue tabella che indicano al runtime di Managed Service for Apache Flink di emettere questi valori nei campi specificati. 

Le chiavi e i valori utilizzati nelle proprietà della tabella sono i seguenti:


| Tipo timestamp | Chiave | Valore | 
| --- |--- |--- |
| [Tempo di elaborazione](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#processing-time) | managed-flink.proctime | Il nome della colonna che AWS Glue verrà utilizzato per esporre il valore. Questo nome di colonna non corrisponde a una colonna di tabella esistente. | 
| [Ora dell'evento](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#event-time) | managed-flink.rowtime | Il nome della colonna che AWS Glue verrà utilizzato per esporre il valore. Questo nome di colonna corrisponde a una colonna di tabella esistente. | 
| managed-flink.watermark. *column\$1name*.millisecondi | L'intervallo della filigrana in millisecondi | 

## Usa il connettore Flink e le proprietà di formato
<a name="how-zeppelin-glue-connector"></a>

Fornisci informazioni sulle origini dati ai connettori Flink dell'applicazione utilizzando le proprietà della tabella AWS Glue . Di seguito sono riportati alcuni esempi delle proprietà utilizzate per i connettori dal servizio gestito per Apache Flink:


| Tipo di connettore | Chiave | Valore | 
| --- |--- |--- |
| [Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/kafka.html#connector-options) | format | Il formato utilizzato per deserializzare e serializzare i messaggi di Kafka, ad esempio o. json csv | 
| scan.startup.mode | La modalità di avvio per il consumatore Kafka, ad esempio o. earliest-offset timestamp | 
| [Kinesis](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kinesis.html#connector-options) | format | Il formato utilizzato per deserializzare e serializzare i record del flusso di dati Kinesis, ad esempio o. json csv | 
| aws.region | La AWS regione in cui è definito lo stream.  | 
| [S3 (Filesystem)](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html) | format | Il formato utilizzato per deserializzare e serializzare i file, ad esempio o. json csv | 
| path | Il percorso Amazon S3, ad es. s3://mybucket/ | 

Per ulteriori informazioni su altri connettori oltre a Kinesis e Apache Kafka, consulta la documentazione relativa al tuo connettore.

# Esempi e tutorial per notebook Studio in Managed Service for Apache Flink
<a name="how-zeppelin-examples"></a>

**Topics**
+ [

# Tutorial: creare un notebook Studio in Managed Service per Apache Flink
](example-notebook.md)
+ [

# Tutorial: distribuzione di un notebook Studio come servizio gestito per l'applicazione Apache Flink con stato durevole
](example-notebook-deploy.md)
+ [

# Visualizza esempi di query per analizzare i dati in un notebook Studio
](how-zeppelin-sql-examples.md)

# Tutorial: creare un notebook Studio in Managed Service per Apache Flink
<a name="example-notebook"></a>

Il seguente tutorial mostra come creare un notebook Studio che legge i dati da un flusso di dati Kinesis o da un cluster Amazon MSK.

**Topics**
+ [

## Completa i prerequisiti
](#example-notebook-setup)
+ [

## Creare un database AWS Glue
](#example-notebook-glue)
+ [

## Passaggi successivi: creare un notebook Studio con Kinesis Data Streams o Amazon MSK
](#examples-notebook-nextsteps)
+ [

# Creazione di un notebook Studio con il flusso di dati Kinesis
](example-notebook-streams.md)
+ [

# Crea un notebook Studio con Amazon MSK
](example-notebook-msk.md)
+ [

# Pulisci l'applicazione e le risorse dipendenti
](example-notebook-cleanup.md)

## Completa i prerequisiti
<a name="example-notebook-setup"></a>

Assicurati che la tua AWS CLI sia la versione 2 o successiva. Per installare la versione più recente AWS CLI, consulta [Installazione, aggiornamento e disinstallazione della AWS CLI versione 2](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html).

## Creare un database AWS Glue
<a name="example-notebook-glue"></a>

Il notebook Studio utilizza un database [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) per i metadati sull'origine dati Amazon MSK.

**Creare un AWS Glue database**

1. Apri la AWS Glue console all'indirizzo [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Scegli **Aggiungi database**. Nella finestra **Aggiungi database**, inserisci **default** per **Nome database**. Scegli **Create** (Crea). 

## Passaggi successivi: creare un notebook Studio con Kinesis Data Streams o Amazon MSK
<a name="examples-notebook-nextsteps"></a>

Con questo tutorial, puoi creare un notebook Studio che utilizza il flusso di dati Kinesis o Amazon MSK:
+ [Creazione di un notebook Studio con il flusso di dati Kinesis](example-notebook-streams.md): con il flusso di dati Kinesis, viene creata rapidamente un'applicazione che utilizza un flusso di dati Kinesis come origine. È sufficiente creare un flusso di dati Kinesis come risorsa dipendente.
+ [Crea un notebook Studio con Amazon MSK](example-notebook-msk.md): con Amazon MSK, viene creata un'applicazione che utilizza un cluster Amazon MSK come origine. È necessario creare un Amazon VPC, un'istanza client Amazon EC2 e un cluster Amazon MSK come risorse dipendenti.

# Creazione di un notebook Studio con il flusso di dati Kinesis
<a name="example-notebook-streams"></a>

Questo tutorial descrive come creare un notebook Studio che utilizza un flusso di dati Kinesis come origine.

**Topics**
+ [

## Completa i prerequisiti
](#example-notebook-streams-setup)
+ [

## Crea una tabella AWS Glue
](#example-notebook-streams-glue)
+ [

## Creazione di un notebook Studio con il flusso di dati Kinesis
](#example-notebook-streams-create)
+ [

## Invio di dati al flusso di dati Kinesis
](#example-notebook-streams-send)
+ [

## Test del notebook Studio
](#example-notebook-streams-test)

## Completa i prerequisiti
<a name="example-notebook-streams-setup"></a>

Prima di creare un notebook Studio, è necessario creare un flusso di dati Kinesis (`ExampleInputStream`). L'applicazione utilizza questo flusso come origine dell'applicazione.

Puoi creare questo flusso utilizzando la console Amazon Kinesis o il comando AWS CLI seguente. Per istruzioni sulla console, consulta [Creazione e aggiornamento dei flussi di dati](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) nella *Guida per gli sviluppatori del flusso di dati Amazon Kinesis*. Assegna un nome al flusso **ExampleInputStream** e imposta il **Numero di partizioni aperte** su **1**.

Per creare lo stream (`ExampleInputStream`) utilizzando il AWS CLI, usa il seguente comando Amazon Kinesis `create-stream` AWS CLI .

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## Crea una tabella AWS Glue
<a name="example-notebook-streams-glue"></a>

Il notebook Studio utilizza un database [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) per i metadati sull'origine dati del flusso di dati Kinesis.

**Nota**  
Puoi creare prima il database manualmente oppure lasciare che il servizio gestito per Apache Flink lo crei automaticamente quando viene creato il notebook. Allo stesso modo, è possibile creare manualmente la tabella come descritto in questa sezione oppure utilizzare il codice del connettore di creazione tabella per il servizio gestito per Apache Flink nel notebook all'interno di Apache Zeppelin per creare la tabella tramite un'istruzione DDL. Puoi quindi effettuare il check-in AWS Glue per assicurarti che la tabella sia stata creata correttamente.

**Creazione di una tabella**

1. Accedi a Console di gestione AWS e apri la AWS Glue console all'indirizzo [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Se non disponi già di un AWS Glue database, scegli **Database** dalla barra di navigazione a sinistra. Scegli **Aggiungi database**. Nella finestra **Aggiungi database**, inserisci **default** per **Nome database**. Scegli **Create (Crea) **.

1. Nella barra di navigazione a sinistra, seleziona **Tabelle**. Nella pagina **Tabelle**, scegli **Aggiungi tabelle** > **Aggiungi tabella manualmente**.

1. Nella pagina **Imposta le proprietà della tabella**, inserisci **stock** per **Nome tabella**. Assicurati di selezionare il database creato in precedenza. Scegli **Next (Successivo)**.

1. Nella pagina **Aggiungi un datastore**, scegli **Kinesis**. Per **Nome del flusso**, inserisci **ExampleInputStream**. Per **URL di origine di Kinesis**, inserisci **https://kinesis.us-east-1.amazonaws.com**. Se copi e incolli l'**URL di origine di Kinesis**, assicurati di eliminare gli spazi iniziali o finali. Scegli **Next (Successivo)**.

1. Nella pagina **Classificazione**, scegli **JSON**. Scegli **Next (Successivo)**.

1. Nella pagina **Definisci uno schema**, scegli Aggiungi colonna per aggiungere una colonna. Aggiungi colonne con le seguenti proprietà:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/managed-flink/latest/java/example-notebook-streams.html)

   Scegli **Next (Successivo)**.

1. Nella pagina successiva, verifica le impostazioni e scegli **Fine.**

1. Scegli la tabella appena creata dall'elenco delle tabelle.

1. Scegli **Modifica tabella** e aggiungi una proprietà con la chiave `managed-flink.proctime` e il valore `proctime`.

1. Scegli **Applica**.

## Creazione di un notebook Studio con il flusso di dati Kinesis
<a name="example-notebook-streams-create"></a>

Ora che hai creato le risorse utilizzate dall'applicazione, puoi creare il notebook Studio. 

**Topics**
+ [

### Crea un taccuino Studio utilizzando il Console di gestione AWS
](#example-notebook-create-streams-console)
+ [

### Crea un taccuino Studio utilizzando il AWS CLI
](#example-notebook-msk-create-api)

### Crea un taccuino Studio utilizzando il Console di gestione AWS
<a name="example-notebook-create-streams-console"></a>

1. [Aprire la console Managed Service for Apache Flink a casa https://console.aws.amazon.com/managed-flink/? region=us-east-1\$1/applicazioni/dashboard.](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) 

1. Nella pagina **Applicazioni del servizio gestito per Apache Flink**, scegli la scheda **Studio**. Scegli **Crea notebook Studio**.
**Nota**  
Puoi anche creare un notebook Studio dalle console Amazon MSK o del flusso di dati Kinesis selezionando il cluster Amazon MSK o il flusso di dati Kinesis di input e scegliendo **Elabora dati in tempo reale**.

1. Nella pagina **Crea notebook Studio**, immetti le seguenti informazioni:
   + Inserisci **MyNotebook** per il nome del notebook.
   + Scegli l'**impostazione predefinita** per il **database AWS Glue**.

   Scegli **Crea notebook Studio**.

1. **Nella pagina, scegli Esegui. **MyNotebook**** Attendi che lo **stato** mostri **In esecuzione**. Si applicano costi quando il notebook è in funzione.

### Crea un taccuino Studio utilizzando il AWS CLI
<a name="example-notebook-msk-create-api"></a>

Per creare il tuo taccuino Studio utilizzando AWS CLI, procedi come segue:

1. Verifica l'ID del tuo account. Questo valore è necessario per creare l'applicazione.

1. Crea il ruolo `arn:aws:iam::AccountID:role/ZeppelinRole` e aggiungi le seguenti autorizzazioni al ruolo creato automaticamente dalla console.

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. Crea un file denominato `create.json` con i seguenti contenuti. Sostituisci i valori segnaposto con le tue informazioni.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Per creare l'applicazione, esegui il comando riportato di seguito:

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. Una volta completata l'esecuzione del comando, visualizzerai un output che mostra i dettagli per il nuovo notebook Studio. Di seguito è riportato un esempio di output.

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Per avviare l'applicazione, esegui il comando riportato di seguito. Sostituisci il valore di esempio con il tuo ID account.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Invio di dati al flusso di dati Kinesis
<a name="example-notebook-streams-send"></a>

Per inviare i dati di test al flusso di dati Kinesis, procedi come segue:

1. Apri [Kinesis Data Generator](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). 

1. Scegli **Crea un utente Cognito** con. CloudFormation

1. La CloudFormation console si apre con il modello Kinesis Data Generator. Scegli **Next (Successivo)**.

1. Nella pagina **Specifica i dettagli dello stack**, inserisci il nome utente e la password per l'utente Cognito. Scegli **Next (Successivo)**.

1. Nella pagina **Configura opzioni dello stack**, scegli **Successivo**.

1. Nella pagina **Review Kinesis-Data-Generator-Cognito -User**, scegli l'opzione **Riconosco che AWS CloudFormation potrebbe creare le risorse IAM**. casella di controllo. Scegli **Crea stack**.

1. Attendi che lo CloudFormation stack finisca di essere creato. **Una volta completato lo stack, apri lo stack **Kinesis-Data-Generator-Cognito-User nella console e scegli la scheda Output**. CloudFormation ** **KinesisDataGeneratorUrl**Apri l'URL elencato per il valore di output.

1. Nella pagina **Amazon Kinesis Data Generator**, accedi con le credenziali create nel passaggio 4.

1. Nella pagina successiva, specifica i seguenti valori:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/managed-flink/latest/java/example-notebook-streams.html)

   Per **Modello di record**, incolla il seguente codice:

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. Scegli **Invia dati**.

1. Il generatore invierà dati al flusso di dati Kinesis. 

   Lascia il generatore in esecuzione mentre completi la sezione successiva.

## Test del notebook Studio
<a name="example-notebook-streams-test"></a>

In questa sezione, il notebook Studio viene utilizzato per eseguire query sui dati del flusso di dati Kinesis.

1. [Aprire la console Managed Service for Apache Flink a casa https://console.aws.amazon.com/managed-flink/? region=us-east-1\$1/applicazioni/dashboard.](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)

1. Nella pagina **Applicazioni del servizio gestito per Apache Flink**, scegli la scheda **Notebook Studio**. Scegli **MyNotebook**.

1. **Nella pagina, scegli Apri in Apache Zeppelin. **MyNotebook****

   L'interfaccia Apache Zeppelin viene aperta in una nuova scheda.

1. Nella sezione **Ti diamo il benvenuto su Zeppelin\$1**, scegli **Nota Zeppelin**.

1. Nella pagina **Nota Zeppelin**, inserisci la seguente query in una nuova nota:

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Seleziona l'icona dell'esecuzione.

   Dopo un breve periodo, la nota visualizza i dati del flusso di dati Kinesis.

Per aprire il pannello di controllo di Apache Flink per la tua applicazione e visualizzare gli aspetti operativi, scegli **PROCESSO FLINK**. Per ulteriori informazioni sul pannello di controllo di Flink, consulta [Pannello di controllo di Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) nella [Guida per gli sviluppatori del servizio gestito per Apache Flink](https://docs.aws.amazon.com/).

Per altri esempi di query Streaming SQL in Flink, consulta [Query](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) nella [documentazione di Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Crea un notebook Studio con Amazon MSK
<a name="example-notebook-msk"></a>

Questo tutorial descrive come creare un notebook Studio che utilizza un cluster Amazon MSK come origine.

**Topics**
+ [

## Configurare un cluster Amazon MSK
](#example-notebook-msk-setup)
+ [

## Aggiungi un gateway NAT al tuo VPC
](#example-notebook-msk-nat)
+ [

## Crea una AWS Glue connessione e una tabella
](#example-notebook-msk-glue)
+ [

## Crea un notebook Studio con Amazon MSK
](#example-notebook-msk-create)
+ [

## Invio di dati al cluster Amazon MSK
](#example-notebook-msk-send)
+ [

## Test del notebook Studio
](#example-notebook-msk-test)

## Configurare un cluster Amazon MSK
<a name="example-notebook-msk-setup"></a>

Per questo tutorial, è necessario un cluster Amazon MSK che consenta l'accesso al testo in chiaro. Se non disponi già di un cluster Amazon MSK, segui il tutorial [Getting Started Using Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) per creare un Amazon VPC, un cluster Amazon MSK, un argomento e un'istanza client Amazon. EC2 

Seguendo il tutorial, completa le seguenti operazioni:
+ Nella [Fase 3: creazione di un cluster Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html), passaggio 4, modifica il valore `ClientBroker` da `TLS` a **PLAINTEXT**.

## Aggiungi un gateway NAT al tuo VPC
<a name="example-notebook-msk-nat"></a>

Se hai creato un cluster Amazon MSK seguendo il tutorial [Nozioni di base per l'uso di Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) o se il tuo Amazon VPC esistente non dispone già di un gateway NAT per le sue sottoreti private, devi aggiungere un gateway NAT al tuo Amazon VPC. Il diagramma seguente illustra l'architettura generale. 

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/it_it/managed-flink/latest/java/images/vpc_05.png)


Per creare un gateway NAT per il tuo Amazon VPC, procedi come segue:

1. Apri la console Amazon VPC all'indirizzo [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Scegli **Gateway NAT** dalla barra di navigazione a sinistra.

1. Nella pagina **Gateway NAT**, scegli **Crea gateway NAT**.

1. Nella pagina **Crea gateway NAT**, specifica i seguenti valori:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/managed-flink/latest/java/example-notebook-msk.html)

   Scegli **Crea un gateway NAT**.

1. Nella barra di navigazione a sinistra, seleziona **Tabelle di routing**.

1. Seleziona **Crea tabella di routing**.

1. Nella pagina **Crea tabella di routing**, fornisci le seguenti informazioni:
   + **Tag nome:** **ZeppelinRouteTable**
   + **VPC****: scegli il tuo VPC (ad esempio VPC).AWS KafkaTutorial**

   Scegli **Create (Crea) **.

1. Nell'elenco delle tabelle dei percorsi, scegli. **ZeppelinRouteTable** Seleziona la scheda **Route**, seleziona **Modifica route**.

1. Nella scheda **Modifica route** scegli **Aggiungi route**.

1. ****Per **Destinazione**, inserisci **0.0.0.0/0**. Per **Target**, scegli **NAT Gateway**, **ZeppelinGateway**. Seleziona **Salva route**. Scegli **Chiudi**.

1. Nella pagina Tabelle delle rotte, con l'**ZeppelinRouteTable**opzione selezionata, scegli la scheda **Associazioni di sottoreti**. Scegli **Modifica associazioni sottorete**.

1. **Nella pagina **Modifica associazioni di sottoreti**, scegli **AWS KafkaTutorialSubnet2 e AWS KafkaTutorialSubnet 3**.** Seleziona **Salva**.

## Crea una AWS Glue connessione e una tabella
<a name="example-notebook-msk-glue"></a>

Il notebook Studio utilizza un database [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) per i metadati sull'origine dati Amazon MSK. In questa sezione, crei una AWS Glue connessione che descrive come accedere al tuo cluster Amazon MSK e una AWS Glue tabella che descrive come presentare i dati della tua origine dati a client come il tuo notebook Studio. 

**Creazione di una connessione**

1. Accedi Console di gestione AWS e apri la AWS Glue console all'indirizzo [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Se non disponi già di un AWS Glue database, scegli **Database** dalla barra di navigazione a sinistra. Scegli **Aggiungi database**. Nella finestra **Aggiungi database**, inserisci **default** per **Nome database**. Scegli **Create (Crea) **.

1. Scegli **Connessioni** dalla barra di navigazione a sinistra. Scegli **Aggiungi connessione**.

1. Nella finestra **Aggiungi connessione**, fornisci i seguenti valori:
   + Per **Nome connessione**, inserisci **ZeppelinConnection**.
   + Per **Tipo di connessione**, scegli **Kafka**.
   + Per il **server bootstrap Kafka URLs**, fornisci la stringa del broker bootstrap per il tuo cluster. È possibile ottenere i broker di bootstrap dalla console MSK o immettendo il seguente comando CLI:

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + Deseleziona la casella di controllo **Richiedi connessione SSL**.

   Scegli **Next (Successivo)**.

1. Nella pagina **VPC**, fornisci i seguenti valori:
   + **Per **VPC**, scegli il nome del tuo VPC (ad esempio VPC). AWS KafkaTutorial**
   + **Per **Subnet**, scegli 2.AWS KafkaTutorialSubnet**
   + Per **Gruppi di sicurezza**, scegli tutti i gruppi disponibili.

   Scegli **Next (Successivo)**.

1. Nella pagina **Proprietà di connessione**/**Accesso alla connessione**, scegli **Finisci**.

**Creazione di una tabella**
**Nota**  
È possibile creare manualmente la tabella come descritto nei passaggi seguenti oppure utilizzare il codice del connettore di creazione tabella per il servizio gestito per Apache Flink nel notebook all'interno di Apache Zeppelin per creare la tabella tramite un'istruzione DDL. È quindi possibile effettuare il check-in AWS Glue per assicurarsi che la tabella sia stata creata correttamente.

1. Nella barra di navigazione a sinistra, seleziona **Tabelle**. Nella pagina **Tabelle**, scegli **Aggiungi tabelle** > **Aggiungi tabella manualmente**.

1. Nella pagina **Imposta le proprietà della tabella**, inserisci **stock** per **Nome tabella**. Assicurati di selezionare il database creato in precedenza. Scegli **Next (Successivo)**.

1. Nella pagina **Aggiungi un datastore**, scegli **Kafka**. Per il **nome dell'argomento**, inserisci il nome dell'argomento (ad es. **AWS KafkaTutorialTopic**). Per **Connessione**, scegli **ZeppelinConnection**.

1. Nella pagina **Classificazione**, scegli **JSON**. Scegli **Next (Successivo)**.

1. Nella pagina **Definisci uno schema**, scegli Aggiungi colonna per aggiungere una colonna. Aggiungi colonne con le seguenti proprietà:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/managed-flink/latest/java/example-notebook-msk.html)

   Scegli **Next (Successivo)**.

1. Nella pagina successiva, verifica le impostazioni e scegli **Fine.**

1. Scegli la tabella appena creata dall'elenco delle tabelle.

1. Scegliete **Modifica tabella** e aggiungete le seguenti proprietà:
   + chiave:`managed-flink.proctime`, valore: `proctime`
   + chiave:`flink.properties.group.id`, valore: `test-consumer-group`
   + chiave:`flink.properties.auto.offset.reset`, valore: `latest`
   + chiave:`classification`, valore: `json`

   Senza queste coppie chiave/valore, il notebook Flink genera un errore. 

1. Scegli **Applica**.

## Crea un notebook Studio con Amazon MSK
<a name="example-notebook-msk-create"></a>

Ora che hai creato le risorse utilizzate dall'applicazione, puoi creare il notebook Studio. 

**Topics**
+ [

### Crea un taccuino Studio utilizzando il Console di gestione AWS
](#example-notebook-create-msk-console)
+ [

### Crea un taccuino Studio utilizzando il AWS CLI
](#example-notebook-msk-create-api)

**Nota**  
Un notebook Studio può essere creato anche dalla console Amazon MSK scegliendo un cluster esistente, quindi selezionando **Elabora dati in tempo reale**.

### Crea un taccuino Studio utilizzando il Console di gestione AWS
<a name="example-notebook-create-msk-console"></a>

1. [Aprire la console Managed Service for Apache Flink a casa https://console.aws.amazon.com/managed-flink/? region=us-east-1\$1/applicazioni/dashboard.](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)

1. Nella pagina **Applicazioni del servizio gestito per Apache Flink**, scegli la scheda **Studio**. Scegli **Crea notebook Studio**.
**Nota**  
Per creare un notebook Studio dalle console Amazon MSK o del flusso di dati Kinesis, seleziona il cluster Amazon MSK o il flusso di dati Kinesis di input, quindi scegli **Elabora dati in tempo reale**.

1. Nella pagina **Crea notebook Studio**, immetti le seguenti informazioni:
   + Inserisci **MyNotebook** per **Nome notebook Studio**.
   + Scegli l'**impostazione predefinita** per il **database AWS Glue**.

   Scegli **Crea notebook Studio**.

1. **Nella pagina, scegli la scheda Configurazione. **MyNotebook**** Nella sezione **Reti**, scegli **Modifica**.

1. Nella MyNotebook pagina **Modifica rete per**, scegli la **configurazione VPC basata sul cluster Amazon MSK**. Scegli il cluster Amazon MSK per **Cluster Amazon MSK**. Scegli **Save changes** (Salva modifiche).

1. **Nella **MyNotebook**pagina, scegli Esegui.** Attendi che lo **stato** mostri **In esecuzione**.

### Crea un taccuino Studio utilizzando il AWS CLI
<a name="example-notebook-msk-create-api"></a>

Per creare il tuo taccuino Studio utilizzando il AWS CLI, procedi come segue:

1. Assicurati di disporre delle informazioni riportate di seguito. Questi valori sono necessari per creare l'applicazione.
   + ID dell'account.
   + L'ID della sottorete IDs e del gruppo di sicurezza per l'Amazon VPC che contiene il cluster Amazon MSK.

1. Crea un file denominato `create.json` con i seguenti contenuti. Sostituisci i valori segnaposto con le tue informazioni.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Per creare l'applicazione, esegui il comando riportato di seguito:

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. Una volta completata l'esecuzione del comando, dovresti visualizzare un output simile al seguente, che mostra i dettagli per il nuovo notebook Studio:

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Per avviare l'applicazione, esegui il comando riportato di seguito. Sostituisci il valore segnaposto con il tuo ID account.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Invio di dati al cluster Amazon MSK
<a name="example-notebook-msk-send"></a>

In questa sezione, esegui uno script Python nel tuo EC2 client Amazon per inviare dati alla tua origine dati Amazon MSK.

1. Connect al tuo EC2 client Amazon.

1. Esegui i seguenti comandi per installare Python versione 3, Pip e il pacchetto Kafka per Python e conferma le operazioni:

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. Configuralo AWS CLI sul tuo computer client inserendo il seguente comando:

   ```
   aws configure
   ```

   Fornisci le credenziali del tuo account e **us-east-1** per `region`.

1. Crea un file denominato `stock.py` con i seguenti contenuti. Sostituisci il valore di esempio con la stringa Bootstrap Brokers del tuo cluster Amazon MSK e aggiorna il nome dell'argomento se l'argomento non è: **AWS KafkaTutorialTopic**

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. Esegui lo script con il comando seguente:

   ```
   $ python3 stock.py
   ```

1. Lascia lo script in esecuzione mentre completi la sezione seguente.

## Test del notebook Studio
<a name="example-notebook-msk-test"></a>

In questa sezione, il notebook Studio viene utilizzato per eseguire query sui dati del cluster Amazon MSK.

1. [Aprire la console Managed Service for Apache Flink a casa? https://console.aws.amazon.com/managed-flink/ region=us-east-1\$1/applicazioni/dashboard.](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)

1. Nella pagina **Applicazioni del servizio gestito per Apache Flink**, scegli la scheda **Notebook Studio**. Scegli **MyNotebook**.

1. **Nella pagina, scegli Apri in Apache Zeppelin. **MyNotebook****

   L'interfaccia di Apache Zeppelin viene aperta in una nuova scheda.

1. Nella pagina **Ti diamo il benvenuto su Zeppelin\$1**, scegli **Nuova nota Zeppelin**.

1. Nella pagina **Nota Zeppelin**, inserisci la seguente query in una nuova nota:

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Seleziona l'icona dell'esecuzione.

   L'applicazione mostra i dati del cluster Amazon MSK.

Per aprire il pannello di controllo di Apache Flink per la tua applicazione e visualizzare gli aspetti operativi, scegli **PROCESSO FLINK**. Per ulteriori informazioni sul pannello di controllo di Flink, consulta [Pannello di controllo di Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) nella [Guida per gli sviluppatori del servizio gestito per Apache Flink](https://docs.aws.amazon.com/).

Per altri esempi di query Streaming SQL in Flink, consulta [Query](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) nella [documentazione di Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Pulisci l'applicazione e le risorse dipendenti
<a name="example-notebook-cleanup"></a>

## Eliminazione del notebook Studio
<a name="example-notebook-cleanup-app"></a>

1. Apri la console del servizio gestito per Apache Flink.

1. Scegli **MyNotebook**.

1. Scegli **Operazioni**, quindi **Elimina**.

## Eliminare il AWS Glue database e la connessione
<a name="example-notebook-cleanup-glue"></a>

1. Apri la AWS Glue console all'indirizzo [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Scegli **Database** dalla barra di navigazione a sinistra. Seleziona la casella di controllo accanto a **Predefinito** per selezionarla. Scegli **Operazione**, **Elimina database**. Conferma la selezione.

1. Scegli **Connessioni** dalla barra di navigazione a sinistra. Seleziona la casella di controllo accanto a **ZeppelinConnection**per selezionarla. Scegli **Operazione**, **Elimina connessione**. Conferma la selezione.

## Eliminazione del ruolo e della policy IAM
<a name="example-notebook-msk-cleanup-iam"></a>

1. Aprire la console IAM all'indirizzo [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Dalla barra di navigazione sinistra, scegli **Ruoli**.

1. Usa la barra di ricerca per cercare il **ZeppelinRole**ruolo.

1. Scegli il **ZeppelinRole**ruolo. Scegli **Elimina ruolo**. Conferma l'eliminazione.

## Elimina il tuo gruppo di CloudWatch log
<a name="example-notebook-cleanup-cw"></a>

La console crea automaticamente un CloudWatch gruppo e un flusso di log quando crei l'applicazione utilizzando la console. Non ottieni un gruppo di log e un flusso di log se l'applicazione viene creata utilizzando la AWS CLI.

1. Apri la CloudWatch console all'indirizzo [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. Scegli **Gruppi di log** dalla barra di navigazione a sinistra.

1. Scegli il gruppo**/AWS/KinesisAnalytics/MyNotebook**log.

1. Scegli **Operazioni** > **Elimina gruppo/i di log**. Conferma l’eliminazione.

## Pulisci le risorse di Kinesis Data Streams
<a name="example-notebook-cleanup-streams"></a>

Per eliminare il flusso di dati Kinesis, apri la console del flusso di dati Kinesis, seleziona il flusso Kinesis e scegli **Operazioni** > **Elimina**.

## Eliminazione delle risorse MSK
<a name="example-notebook-cleanup-msk"></a>

Segui i passaggi in questa sezione se hai creato un cluster Amazon MSK per questo tutorial. Questa sezione contiene istruzioni per eliminare l'istanza client Amazon EC2, l'Amazon VPC e il cluster Amazon MSK.

### Elimina il tuo cluster Amazon MSK
<a name="example-notebook-msk-cleanup-msk"></a>

Segui questi passaggi se hai creato un cluster Amazon MSK per questo tutorial.

1. Aprire la console Amazon MSK a [https://console.aws.amazon.com/msk/casa? region=us-east-1\$1/home/.](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)

1. Scegli **AWS KafkaTutorialCluster**. Scegli **Elimina**. Inserisci **delete** nella finestra che appare e conferma la selezione.

### Interruzione di un'istanza client
<a name="example-notebook-msk-cleanup-client"></a>

Segui questi passaggi se hai creato un'istanza client Amazon EC2 per questo tutorial.

1. Apri la console Amazon EC2 all'indirizzo [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Scegli **Istanze** nella barra di navigazione a sinistra.

1. Scegli la casella di controllo accanto a per selezionarla. **ZeppelinClient**

1. Scegli **Stato istanza** > **Termina istanza**.

### Eliminazione di Amazon VPC
<a name="example-notebook-msk-cleanup-vpc"></a>

Segui questi passaggi se hai creato un Amazon VPC per questo tutorial.

1. Apri la console Amazon EC2 all'indirizzo [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Scegli **Interfacce di rete** dalla barra di navigazione a sinistra.

1. Inserisci l'ID VPC nella barra di ricerca e premi Invio per effettuare la ricerca.

1. Seleziona la casella di controllo nell'intestazione della tabella per selezionare tutte le interfacce di rete visualizzate.

1. Seleziona **Operazioni** > **Scollega**. Nella finestra che appare, scegli **Abilita** in **Forza distacco**. Scegli **Scollega** e attendi che tutte le interfacce di rete raggiungano lo stato **Disponibile**.

1. Seleziona la casella di controllo nell'intestazione della tabella per selezionare nuovamente tutte le interfacce di rete visualizzate.

1. Scegli **Operazioni** > **Elimina**. Conferma l'operazione.

1. Apri la console Amazon VPC all'indirizzo [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Seleziona **AWS KafkaTutorialVPC**. Scegli **Operazioni** > **Elimina VPC**. Inserisci **delete** e conferma l'eliminazione.

# Tutorial: distribuzione di un notebook Studio come servizio gestito per l'applicazione Apache Flink con stato durevole
<a name="example-notebook-deploy"></a>

Il seguente tutorial mostra come implementare un notebook Studio come un'applicazione del servizio gestito per Apache Flink con stato durevole.

**Topics**
+ [

## Completamento dei prerequisiti
](#example-notebook-durable-setup)
+ [

## Implementa un'applicazione con uno stato durevole utilizzando il Console di gestione AWS
](#example-notebook-deploy-console)
+ [

## Implementa un'applicazione con uno stato durevole utilizzando il AWS CLI
](#example-notebook-deploy-cli)

## Completamento dei prerequisiti
<a name="example-notebook-durable-setup"></a>

Crea un nuovo notebook Studio seguendo le istruzioni in [Tutorial: crea un notebook Studio in Managed Service per Apache Flink](example-notebook.md) e utilizzando il flusso di dati Kinesis o Amazon MSK. Assegna un nome al notebook Studio `ExampleTestDeploy`.

## Implementa un'applicazione con uno stato durevole utilizzando il Console di gestione AWS
<a name="example-notebook-deploy-console"></a>

1. Aggiungi una posizione del bucket S3 in cui desideri che il codice del pacchetto venga archiviato in **Posizione del codice dell'applicazione (*opzionale*)** nella console. Ciò consente di eseguire i passaggi necessari per implementare ed eseguire l'applicazione direttamente dal notebook.

1. Aggiungi le autorizzazioni necessarie al ruolo dell'applicazione per abilitare il ruolo che stai utilizzando per leggere e scrivere su un bucket Amazon S3 e per avviare un'applicazione del servizio gestito per Apache Flink:
   + Amazon S3 FullAccess
   + Amazon ha gestito- flinkFullAccess
   + Accesso alle fonti, alle destinazioni e VPCs , se applicabile. Per ulteriori informazioni, consulta [Verifica le autorizzazioni IAM per i notebook Studio](how-zeppelin-iam.md).

1. Utilizza il seguente codice di esempio:

   ```
   %flink.ssql(type=update) 
   CREATE TABLE exampleoutput (
     'ticket' VARCHAR,
     'price' DOUBLE
   )
   WITH (
     'connector' = 'kinesis',
     'stream' = 'ExampleOutputStream',
     'aws.region' = 'us-east-1',
     'scan.stream.initpos' = 'LATEST',
     'format' = 'json'
   );
   
   INSERT INTO exampleoutput SELECT ticker, price FROM exampleinputstream
   ```

1. Con il lancio di questa funzionalità, vedrai un nuovo menu a discesa nell'angolo in alto a destra di ogni nota del notebook con il nome del notebook. Puoi fare quanto segue:
   + Visualizza le impostazioni del notebook Studio nella Console di gestione AWS.
   + Crea una nota Zeppelin ed esportala in Amazon S3. A questo punto, fornisci un nome per l'applicazione e scegli **Crea ed esporta**. Riceverai una notifica al termine dell'esportazione.
   + Se necessario, puoi visualizzare ed eseguire eventuali test aggiuntivi sull'eseguibile in Amazon S3.
   + Una volta completata la compilazione, potrai implementare il codice come un'applicazione di streaming Kinesis con stato durevole e scalabilità automatica.
   + Dal menu a discesa, scegli **Implementa nota Zeppelin come applicazione di streaming Kinesis**. Controlla il nome dell'applicazione e scegli **Distribuisci tramite AWS console**.
   + Questo ti porterà alla Console di gestione AWS pagina per la creazione di un servizio gestito per l'applicazione Apache Flink. Tieni presente che il nome dell'applicazione, il parallelismo, la posizione del codice, i ruoli predefiniti di Glue DB, VPC (se applicabile) e IAM sono stati precompilati. Verifica che i ruoli IAM dispongano delle autorizzazioni necessarie per le origini e le destinazioni. Gli snapshot sono abilitati per impostazione predefinita per la gestione dello stato durevole delle applicazioni.
   + Scegli **Crea applicazione**.
   + Puoi scegliere **Configura** e modificare qualsiasi impostazione, quindi scegli **Esegui** per avviare l'applicazione di streaming.

## Implementa un'applicazione con uno stato durevole utilizzando il AWS CLI
<a name="example-notebook-deploy-cli"></a>

Per distribuire un'applicazione utilizzando il AWS CLI, è necessario aggiornare il modello AWS CLI di servizio fornito con le informazioni sulla Beta 2. Per informazioni sull'utilizzo del modello di servizio aggiornato, consulta [Completa i prerequisitiCompletamento dei prerequisiti](example-notebook.md#example-notebook-setup).

Il codice di esempio seguente crea un nuovo notebook Studio:

```
aws kinesisanalyticsv2 create-application \
     --application-name <app-name> \
     --runtime-environment ZEPPELIN-FLINK-3_0 \
     --application-mode INTERACTIVE \
     --service-execution-role <iam-role>
     --application-configuration '{ 
       "ZeppelinApplicationConfiguration": { 
         "CatalogConfiguration": { 
           "GlueDataCatalogConfiguration": { 
             "DatabaseARN": "arn:aws:glue:us-east-1:<account>:database/<glue-database-name>" 
           } 
         } 
       },
       "FlinkApplicationConfiguration": {
         "ParallelismConfiguration": {
           "ConfigurationType": "CUSTOM",
           "Parallelism": 4,
           "ParallelismPerKPU": 4
         }
       },
       "DeployAsApplicationConfiguration": {
            "S3ContentLocation": { 
               "BucketARN": "arn:aws:s3:::<s3bucket>",
               "BasePath": "/something/"
            }
        },
       "VpcConfigurations": [
         {
           "SecurityGroupIds": [
             "<security-group>"
           ],
           "SubnetIds": [
             "<subnet-1>",
             "<subnet-2>"
           ]
         }
       ]
     }' \
     --region us-east-1
```

Il seguente esempio di codice avvia un notebook Studio:

```
aws kinesisanalyticsv2 start-application \
    --application-name <app-name> \
    --region us-east-1 \
    --no-verify-ssl
```

Il codice seguente restituisce l'URL della pagina del notebook Apache Zeppelin di un'applicazione:

```
aws kinesisanalyticsv2 create-application-presigned-url \
    --application-name <app-name> \
    --url-type ZEPPELIN_UI_URL \

    --region us-east-1 \
    --no-verify-ssl
```

# Visualizza esempi di query per analizzare i dati in un notebook Studio
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [

## Crea tabelle con Amazon MSK/Apache Kafka
](#how-zeppelin-examples-creating-tables)
+ [

## Crea tabelle con Kinesis
](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [

## Interroga una finestra rotante
](#how-zeppelin-examples-tumbling)
+ [

## Interroga una finestra scorrevole
](#how-zeppelin-examples-sliding)
+ [

## Usa SQL interattivo
](#how-zeppelin-examples-interactive-sql)
+ [

## Usa il connettore BlackHole SQL
](#how-zeppelin-examples-blackhole-connector-sql)
+ [

## Usa Scala per generare dati di esempio
](#notebook-example-data-generator)
+ [

## Usa Scala interattiva
](#notebook-example-interactive-scala)
+ [

## Usa Python interattivo
](#notebook-example-interactive-python)
+ [

## Usa una combinazione di Python, SQL e Scala interattivi
](#notebook-example-interactive-pythonsqlscala)
+ [

## Usa un flusso di dati Kinesis tra più account
](#notebook-example-crossaccount-kds)

Per informazioni sulle impostazioni delle query SQL di Apache Flink, consulta [Flink sui notebook Zeppelin per l'analisi dei dati interattiva](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html).

Per visualizzare la tua applicazione nel pannello di controllo di Apache Flink, scegli **PROCESSO FLINK** nella pagina **Nota Zeppelin** della tua applicazione.

Per ulteriori informazioni sulle query a finestra, consulta [Finestre](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html) nella [documentazione di Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

Per altri esempi di query Streaming SQL di Apache Flink, consulta [Query](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) nella [documentazione di Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

## Crea tabelle con Amazon MSK/Apache Kafka
<a name="how-zeppelin-examples-creating-tables"></a>

Puoi utilizzare il connettore Amazon MSK Flink con il servizio gestito per Apache Flink Studio per autenticare la connessione con l'autenticazione Plaintext, SSL o IAM. Crea tabelle utilizzando proprietà specifiche in base alle tue esigenze.

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

Puoi combinarle con altre proprietà in [Connettore SQL Apache Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/).

## Crea tabelle con Kinesis
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

Nell'esempio seguente, viene creata una tabella usando Kinesis:

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

Per ulteriori informazioni sulle altre proprietà utilizzabili, consulta [Connettore SQL per il flusso di dati Amazon Kinesis](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/).

## Interroga una finestra rotante
<a name="how-zeppelin-examples-tumbling"></a>

La seguente query Streaming SQL di Flink seleziona dalla tabella `ZeppelinTopic` il prezzo più alto in ogni finestra a cascata di cinque secondi:

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## Interroga una finestra scorrevole
<a name="how-zeppelin-examples-sliding"></a>

La seguente query Streaming SQL di Apache Flink seleziona il prezzo più alto in ogni finestra scorrevole di cinque secondi dalla tabella `ZeppelinTopic`:

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## Usa SQL interattivo
<a name="how-zeppelin-examples-interactive-sql"></a>

Questo esempio converte il massimo dell'ora evento e del tempo di elaborazione e la somma dei valori della tabella chiave-valore. Assicurati di avere lo script di generazione dei dati di esempio dal [Usa Scala per generare dati di esempio](#notebook-example-data-generator) in esecuzione. Per provare altre query SQL, ad esempio il filtraggio e i join, nel notebook Studio, consulta [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) nella documentazione di Apache Flink.

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## Usa il connettore BlackHole SQL
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

Il connettore BlackHole SQL non richiede la creazione di un flusso di dati Kinesis o di un cluster Amazon MSK per testare le query. Per informazioni sul connettore BlackHole SQL, consulta [BlackHole SQL Connector nella documentazione di Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html). In questo esempio, il catalogo predefinito è un catalogo in memoria.

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## Usa Scala per generare dati di esempio
<a name="notebook-example-data-generator"></a>

Questo esempio utilizza Scala per generare dati di esempio. È possibile utilizzare questi dati di esempio per testare varie query. Utilizza l'istruzione di creazione tabella per creare la tabella chiave-valore.

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## Usa Scala interattiva
<a name="notebook-example-interactive-scala"></a>

Questa è la traduzione in Scala di [Usa SQL interattivo](#how-zeppelin-examples-interactive-sql). Per altri esempi di Scala, consulta [API Table](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) nella documentazione di Apache Flink.

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Usa Python interattivo
<a name="notebook-example-interactive-python"></a>

Questa è la traduzione in Python di [Usa SQL interattivo](#how-zeppelin-examples-interactive-sql). Per altri esempi di Python, consulta [API Table](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) nella documentazione di Apache Flink. 

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Usa una combinazione di Python, SQL e Scala interattivi
<a name="notebook-example-interactive-pythonsqlscala"></a>

Puoi usare qualsiasi combinazione di SQL, Python e Scala nel tuo notebook per l'analisi interattiva. In un notebook Studio che intendi implementare come applicazione con stato durevole, puoi utilizzare una combinazione di SQL e Scala. Questo esempio mostra le sezioni che vengono ignorate e quelle che vengono implementate nell'applicazione con stato durevole.

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## Usa un flusso di dati Kinesis tra più account
<a name="notebook-example-crossaccount-kds"></a>

Per utilizzare un flusso di dati Kinesis che si trova in un account diverso da quello su cui è installato il notebook Studio, crea un ruolo di esecuzione del servizio nell'account su cui è in esecuzione il notebook Studio e una politica di attendibilità dei ruoli nell'account su cui è in esecuzione il flusso di dati. Utilizza `aws.credentials.provider`, `aws.credentials.role.arn` e `aws.credentials.role.sessionName` nel connettore Kinesis nell'istruzione DDL di creazione tabella per creare una tabella in base al flusso di dati.

Utilizza il seguente ruolo di esecuzione del servizio per l'account notebook Studio.

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

Utilizza la policy `AmazonKinesisFullAccess` e la seguente policy di attendibilità dei ruoli per l'account del flusso di dati.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

Usa il paragrafo seguente per l'istruzione di creazione tabella.

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```

# Risolvi i problemi relativi ai notebook Studio per Managed Service for Apache Flink
<a name="how-zeppelin-troubleshooting"></a>

Questa sezione contiene informazioni sulla risoluzione dei problemi per i notebook Studio.

## Arresta un'applicazione bloccata
<a name="how-zeppelin-troubleshooting-stopping"></a>

Per interrompere un'applicazione bloccata in uno stato transitorio, richiamate l'[StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)azione con il `Force` parametro impostato su. `true` Per ulteriori informazioni, consulta [Applicazioni in esecuzione](https://docs.aws.amazon.com/managed-flink/latest/java/how-running-apps.html) nella [Guida per gli sviluppatori del servizio gestito per Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/).

## Implementa come applicazione con stato durevole in un VPC senza accesso a Internet
<a name="how-zeppelin-troubleshooting-deploying-no-internet"></a>

La deploy-as-application funzione Managed Service for Apache Flink Studio non supporta applicazioni VPC senza accesso a Internet. Si consiglia di creare l'applicazione in Studio e quindi di utilizzare il servizio gestito per Apache Flink per creare manualmente un'applicazione Flink e selezionare il file zip che hai creato nel tuo notebook.

I passaggi seguenti descrivono come farlo: 

1. Compila ed esporta l'applicazione Studio in Amazon S3. Dovrebbe trattarsi di un file zip. 

1. Crea manualmente un'applicazione del servizio gestito per Apache Flink con il percorso del codice che fa riferimento alla posizione del file zip in Amazon S3. Inoltre, dovrai configurare l'applicazione con le seguenti variabili `env` (2 `groupID`, 3 `var` in totale): 

1. kinesis.analytics.flink.run.options

   1. python: source/note.py

   1. file jar: lib/ .jar PythonApplicationDependencies

1. managed.deploy\$1as\$1app.options

   1. DatabasEarn: *<glue database ARN (Amazon Resource Name)>*

1. Potrebbe essere necessario concedere ai ruoli IAM del servizio gestito per Apache Flink Studio e del servizio gestito per Apache Flink le autorizzazioni per i servizi utilizzati dall'applicazione. È possibile utilizzare lo stesso ruolo IAM per entrambe le applicazioni.

## Deploy-as-app riduzione delle dimensioni e dei tempi di costruzione
<a name="how-zeppelin-troubleshooting-deploying-as-app-reduce-build-time"></a>

Le applicazioni Studio deploy-as-app for Python racchiudono tutto ciò che è disponibile nell'ambiente Python perché non possiamo determinare di quali librerie hai bisogno. Ciò può comportare una dimensione maggiore del necessario. deploy-as-app La procedura seguente mostra come ridurre le dimensioni dell'applicazione deploy-as-app Python disinstallando le dipendenze.

Se stai creando un'applicazione Python con deploy-as-app funzionalità di Studio, potresti prendere in considerazione la possibilità di rimuovere i pacchetti Python preinstallati dal sistema se le tue applicazioni non dipendono da. Ciò non solo contribuirà a ridurre le dimensioni finali degli artefatti per evitare di violare il limite di servizio relativo alle dimensioni delle applicazioni, ma migliorerà anche i tempi di creazione delle applicazioni dotate di questa funzionalità. deploy-as-app

È possibile eseguire il seguente comando per elencare tutti i pacchetti Python installati con le rispettive dimensioni installate e rimuovere selettivamente i pacchetti con dimensioni significative.

```
%flink.pyflink

!pip list --format freeze | awk -F = {'print $1'} | xargs pip show | grep -E 'Location:|Name:' | cut -d ' ' -f 2 | paste -d ' ' - - | awk '{gsub("-","_",$1); print $2 "/" tolower($1)}' | xargs du -sh 2> /dev/null | sort -hr
```

**Nota**  
`apache-beam` è richiesto per il funzionamento di Flink Python. Questo pacchetto e le sue dipendenze non devono mai essere rimossi.

Di seguito è riportato l'elenco dei pacchetti Python preinstallati in Studio V2 che possono essere presi in considerazione per la rimozione:

```
scipy
statsmodels
plotnine
seaborn
llvmlite
bokeh
pandas
matplotlib
botocore
boto3
numba
```

**Per rimuovere un pacchetto Python dal notebook Zeppelin:**

1. Controlla se la tua applicazione dipende dal pacchetto, o da uno dei pacchetti che la utilizzano, prima di rimuoverla. È possibile identificare i dipendenti di un pacchetto utilizzando [pipdeptree](https://pypi.org/project/pipdeptree/).

1. Esegui il seguente comando per rimuovere un pacchetto:

   ```
   %flink.pyflink
   !pip uninstall -y <package-to-remove>
   ```

1. Se hai bisogno di recuperare un pacchetto rimosso per errore, esegui il seguente comando:

   ```
   %flink.pyflink
   !pip install <package-to-install>
   ```

**Example Esempio: rimuovi il `scipy` pacchetto prima di distribuire l'applicazione deploy-as-app Python con funzionalità.**  

1. Utilizza `pipdeptree` per scoprire tutti i consumatori `scipy` e verificare se puoi rimuovere `scipy` in sicurezza.
   + Installa lo strumento tramite notebook:

     ```
     %flink.pyflink             
     !pip install pipdeptree
     ```
   + Ottieni l'albero delle dipendenze invertito di `scipy` eseguendo:

     ```
     %flink.pyflink
     !pip -r -p scipy
     ```

     Verrà visualizzato un output simile al seguente (condensato per brevità):

     ```
     ...
     ------------------------------------------------------------------------ 
     scipy==1.8.0 
     ├── plotnine==0.5.1 [requires: scipy>=1.0.0] 
     ├── seaborn==0.9.0 [requires: scipy>=0.14.0] 
     └── statsmodels==0.12.2 [requires: scipy>=1.1] 
         └── plotnine==0.5.1 [requires: statsmodels>=0.8.0]
     ```

1. Ispeziona attentamente l'utilizzo di `seaborn`, `statsmodels` e `plotnine` nelle tue applicazioni. Se le tue applicazioni non dipendono da `scipy`, `seaborn`, `statemodels` o da `plotnine`, puoi rimuovere tutti questi pacchetti o solo quelli che non sono necessari alle tue applicazioni.

1. Rimuovi il pacchetto eseguendo:

   ```
   !pip uninstall -y scipy plotnine seaborn statemodels
   ```

## Annullare job
<a name="how-notbook-canceling-jobs"></a>

Questa sezione mostra come annullare i processi di Apache Flink a cui non puoi accedere da Apache Zeppelin. Se desideri annullare un processo di questo tipo, vai al pannello di controllo di Apache Flink, copia l'ID del processo, quindi utilizzalo in uno dei seguenti esempi.

Per annullare un singolo processo:

```
%flink.pyflink
import requests

requests.patch("https://zeppelin-flink:8082/jobs/[job_id]", verify=False)
```

Per annullare tutti i processi in esecuzione:

```
%flink.pyflink
import requests

r = requests.get("https://zeppelin-flink:8082/jobs", verify=False)
jobs = r.json()['jobs']

for job in jobs:
    if (job["status"] == "RUNNING"):
        print(requests.patch("https://zeppelin-flink:8082/jobs/{}".format(job["id"]), verify=False))
```

Per annullare tutti i processi:

```
%flink.pyflink
import requests

r = requests.get("https://zeppelin-flink:8082/jobs", verify=False)
jobs = r.json()['jobs']

for job in jobs:
    requests.patch("https://zeppelin-flink:8082/jobs/{}".format(job["id"]), verify=False)
```

## Riavvia l'interprete Apache Flink
<a name="how-notbook-restarting-interpreter"></a>

Per riavviare l'interprete Apache Flink sul notebook Studio

1. Scegli **Configurazione** accanto all'angolo in alto a destra della schermata.

1. Scegli **Interprete**.

1. Scegli **riavvia** e poi **OK**.

# Crea policy IAM personalizzate per i notebook Managed Service for Apache Flink Studio
<a name="how-zeppelin-appendix-iam"></a>

Normalmente si utilizzano policy IAM gestite per consentire all'applicazione di accedere a risorse dipendenti. Se hai bisogno di un controllo più preciso sulle autorizzazioni dell'applicazione, puoi utilizzare una policy IAM personalizzata. Questa sezione contiene esempi di politiche IAM personalizzate.

**Nota**  
Nei seguenti esempi di policy, sostituisci il testo segnaposto con i valori dell'applicazione.

**Topics**
+ [

## AWS Glue
](#how-zeppelin-iam-glue)
+ [

## CloudWatch Registri
](#how-zeppelin-iam-cw)
+ [

## Flussi Kinesis
](#how-zeppelin-iam-streams)
+ [

## Cluster Amazon MSK
](#how-zeppelin-iam-msk)

## AWS Glue
<a name="how-zeppelin-iam-glue"></a>

La seguente politica di esempio concede le autorizzazioni per accedere a un database. AWS Glue 

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "GlueTable",
            "Effect": "Allow",
            "Action": [
                "glue:GetConnection",
                "glue:GetTable",
                "glue:GetTables",
                "glue:GetDatabase",
                "glue:CreateTable",
                "glue:UpdateTable"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:123456789012:connection/*",
                "arn:aws:glue:us-east-1:123456789012:table/<database-name>/*",
                "arn:aws:glue:us-east-1:123456789012:database/<database-name>",
                "arn:aws:glue:us-east-1:123456789012:database/hive",
                "arn:aws:glue:us-east-1:123456789012:catalog"
            ]
        },
        {
            "Sid": "GlueDatabase",
            "Effect": "Allow",
            "Action": "glue:GetDatabases",
            "Resource": "*"
        }
    ]
}
```

------

## CloudWatch Registri
<a name="how-zeppelin-iam-cw"></a>

La seguente politica concede le autorizzazioni per accedere ai registri: CloudWatch 

```
{
      "Sid": "ListCloudwatchLogGroups",
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogGroups"
      ],
      "Resource": [
        "arn:aws:logs:<region>:<accountId>:log-group:*"
      ]
    },
    {
      "Sid": "ListCloudwatchLogStreams",
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogStreams"
      ],
      "Resource": [
        "<logGroupArn>:log-stream:*"
      ]
    },
    {
      "Sid": "PutCloudwatchLogs",
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents"
      ],
      "Resource": [
        "<logStreamArn>"
      ]
    }
```

**Nota**  
Se si crea l'applicazione utilizzando la console, la console aggiunge le politiche necessarie per accedere a CloudWatch Logs al ruolo dell'applicazione.

## Flussi Kinesis
<a name="how-zeppelin-iam-streams"></a>

L'applicazione può utilizzare un flusso Kinesis come origine o destinazione. L'applicazione necessita delle autorizzazioni di lettura per leggere da un flusso di origine e delle autorizzazioni di scrittura per scrivere su un flusso di destinazione.

La seguente policy concede le autorizzazioni per la lettura da un flusso Kinesis utilizzato come origine:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisShardDiscovery",
            "Effect": "Allow",
            "Action": "kinesis:ListShards",
            "Resource": "*"
        },
        {
            "Sid": "KinesisShardConsumption",
            "Effect": "Allow",
            "Action": [
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:DeregisterStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>"
        },
        {
            "Sid": "KinesisEfoConsumer",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamConsumer",
                "kinesis:SubscribeToShard"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>/consumer/*"
        }
    ]
}
```

------

La seguente politica concede le autorizzazioni di scrittura su un flusso Kinesis utilizzato come destinazione:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisStreamSink",
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:DescribeStreamSummary",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>"
        }
    ]
}
```

------

Se l'applicazione accede a un flusso Kinesis crittografato, è necessario concedere autorizzazioni aggiuntive per accedere al flusso e alla chiave di crittografia del flusso. 

La seguente policy concede le autorizzazioni per accedere a un flusso di origine crittografato e alla chiave di crittografia del flusso:

```
{
      "Sid": "ReadEncryptedKinesisStreamSource",
      "Effect": "Allow",
      "Action": [
        "kms:Decrypt"
      ],
      "Resource": [
        "<inputStreamKeyArn>"
      ]
    }
    ,
```

La seguente policy concede le autorizzazioni per accedere a un flusso di destinazione crittografato e alla chiave di crittografia del flusso:

```
{
      "Sid": "WriteEncryptedKinesisStreamSink",
      "Effect": "Allow",
      "Action": [
        "kms:GenerateDataKey"
      ],
      "Resource": [
        "<outputStreamKeyArn>"
      ]
    }
```

## Cluster Amazon MSK
<a name="how-zeppelin-iam-msk"></a>

Per concedere l'accesso a un cluster Amazon MSK, concedi l'accesso al VPC del cluster. Per esempi di policy per l'accesso a un Amazon VPC, consulta [Autorizzazioni delle applicazioni VPC](https://docs.aws.amazon.com/managed-flink/latest/java/vpc-permissions.html).