

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Servizio gestito per Apache Flink: come funziona
<a name="how-it-works"></a>

Managed Service for Apache Flink è un servizio Amazon completamente gestito che consente di utilizzare un'applicazione Apache Flink per elaborare dati in streaming. Innanzitutto, si programma l'applicazione Apache Flink, quindi si crea l'applicazione Managed Service for Apache Flink.

## Programma la tua applicazione Apache Flink
<a name="how-it-works-programming"></a>

Un'applicazione Apache Flink è un'applicazione Java o Scala creata con il framework Apache Flink. Puoi creare la tua applicazione Apache Flink in locale. 

Le applicazioni utilizzano principalmente l'[DataStream API o l'API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html) [Table.](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/) APIs Sono disponibili anche gli altri Apache Flink, ma sono usati meno comunemente nella creazione di applicazioni di streaming.

Le caratteristiche dei due APIs sono le seguenti:

### DataStream API
<a name="how-it-works-prog-datastream"></a>

Il modello di programmazione dell' DataStream API Apache Flink si basa su due componenti:
+ **Flusso di dati:** la rappresentazione strutturata di un flusso continuo di record di dati.
+ **Operatore di trasformazione:** accetta uno o più flussi di dati come input e produce uno o più flussi di dati come output.

Le applicazioni create con l' DataStream API eseguono le seguenti operazioni:
+ Lettura dei dati da un'origine dati (ad esempio un flusso Kinesis o un argomento Amazon MSK).
+ Trasformazione di dati, ad esempio filtraggio, aggregazione o arricchimento.
+ Scrittura dei dati trasformati in un sink di dati.

Le applicazioni che utilizzano l' DataStream API possono essere scritte in Java o Scala e possono essere lette da un flusso di dati Kinesis, un argomento di Amazon MSK o un'origine personalizzata.

L'applicazione elabora i dati utilizzando un *connettore*. Apache Flink utilizza i seguenti tipi di connettori: 
+ **Origine**: connettore utilizzato per leggere dati esterni.
+ **Sink**: connettore utilizzato per scrivere in posizioni esterne. 
+ **Operatore**: connettore utilizzato per elaborare i dati all'interno dell'applicazione.

Un'applicazione tipica è costituita da almeno un flusso di dati con un'origine, un flusso di dati con uno o più operatori e almeno un sink di dati.

Per ulteriori informazioni sull'utilizzo dell' DataStream API, consulta. [Esamina i componenti DataStream dell'API](how-datastream.md)

### API Table
<a name="how-it-works-prog-table"></a>

Il modello di programmazione dell'API Table di Apache Flink si basa sui componenti seguenti:
+ **Ambiente tabellare:** interfaccia per i dati sottostanti utilizzata per creare e ospitare una o più tabelle. 
+ **Tabella:** un oggetto che fornisce l'accesso a una tabella o una vista SQL.
+ **Origine della tabella:** serve per leggere dati da un'origine esterna, ad esempio un argomento di Amazon MSK.
+ **Funzione della tabella:** una query SQL o una chiamata API utilizzata per trasformare dati.
+ **Sink della tabella:** serve per scrivere dati in un percorso esterno, ad esempio un bucket Amazon S3.

Le applicazioni create con l'API Table eseguono le seguenti operazioni:
+ Creazione di un `TableEnvironment` collegandosi a un'`Table Source`. 
+ Crea una tabella nel `TableEnvironment` utilizzando query SQL o funzioni API Table.
+ Esecuzione di una query sulla tabella utilizzando API Table o SQL.
+ Trasformazione dei risultati della query utilizzando funzioni Table o query SQL.
+ Scrittura dei risultati della query o della funzione su un `Table Sink`.

Le applicazioni che utilizzano l'API Table possono essere scritte in Java o Scala e possono eseguire query sui dati utilizzando chiamate API o query SQL. 

Per ulteriori informazioni sull'utilizzo dell'API Table, consulta [Componenti dell'API Review Table](how-table.md).

## Crea il tuo servizio gestito per l'applicazione Apache Flink
<a name="how-it-works-app"></a>

Managed Service for Apache Flink è un AWS servizio che crea un ambiente per l'hosting dell'applicazione Apache Flink e fornisce le seguenti impostazioni:
+ **[Usa le proprietà di runtime](how-properties.md):** parametri che puoi fornire alla tua applicazione. È possibile modificare questi parametri senza ricompilare il codice dell'applicazione.
+ **[Implementa la tolleranza agli](how-fault.md)**: in che modo l'applicazione viene ripristinata dopo interruzioni e riavvii.
+ **[Registrazione e monitoraggio in Amazon Managed Service per Apache Flink](monitoring-overview.md)**: In che modo l'applicazione registra gli eventi in Logs. CloudWatch 
+ **[Implementa la scalabilità delle applicazioni](how-scaling.md)**: in che modo l'applicazione fornisce le risorse di elaborazione.

È possibile creare ed eseguire un'applicazione del servizio gestito per Apache Flink utilizzando la console o la AWS CLI. Per iniziare a creare un'applicazione del servizio gestito da Amazon per Apache Flink, consulta [Tutorial: inizia a usare l' DataStream API in Managed Service for Apache Flink](getting-started.md).

# Creare un servizio gestito per l'applicazione Apache Flink
<a name="how-creating-apps"></a>

Questo argomento contiene informazioni sulla creazione di un servizio gestito per l'applicazione Apache Flink.

**Topics**
+ [Crea il codice dell'applicazione Managed Service for Apache Flink](#how-creating-apps-building)
+ [Crea la tua applicazione Managed Service for Apache Flink](#how-creating-apps-creating)
+ [Utilizza chiavi gestite dal cliente](#how-creating-apps-use-cmk)
+ [Avvia l'applicazione Managed Service for Apache Flink](#how-creating-apps-starting)
+ [Verifica l'applicazione Managed Service for Apache Flink](#how-creating-apps-verifying)
+ [Abilita i rollback di sistema per la tua applicazione Managed Service for Apache Flink](how-system-rollbacks.md)

## Crea il codice dell'applicazione Managed Service for Apache Flink
<a name="how-creating-apps-building"></a>

Questa sezione descrive i componenti utilizzati per creare il codice dell'applicazione Managed Service for Apache Flink. 

Per il codice dell'applicazione, ti consigliamo di utilizzare la versione di Apache Flink supportata più recente. Per informazioni sull'aggiornamento delle applicazioni del servizio gestito per Apache Flink, consulta [Usa gli aggiornamenti di versione sul posto per Apache Flink](how-in-place-version-upgrades.md). 

Il codice dell'applicazione viene creato utilizzando [Apache Maven](https://maven.apache.org/). Un progetto Apache Maven utilizza un file `pom.xml` per specificare le versioni dei componenti che utilizza. 

**Nota**  
Il servizio gestito per Apache Flink supporta file JAR di dimensioni fino a 512 MB. Se si utilizza un file JAR di dimensioni superiori a queste, l'applicazione non viene avviata.

Le applicazioni possono ora utilizzare l'API Java da qualsiasi versione di Scala. È necessario includere la libreria standard Scala di propria scelta nelle applicazioni Scala.

Per informazioni sulla creazione di un'applicazione del servizio gestito per Apache Flink che utilizza **Apache Beam**, consulta [Usa Apache Beam con Managed Service per le applicazioni Apache Flink](how-creating-apps-beam.md).

### Specificate la versione Apache Flink dell'applicazione
<a name="how-creating-apps-building-flink"></a>

Quando utilizzi il servizio gestito per il runtime di Apache Flink versione 1.1.0, devi specificare la versione di Apache Flink che l'applicazione utilizza durante la compilazione. Fornisci la versione di Apache Flink con il parametro. `-Dflink.version` Ad esempio, se utilizzi Apache Flink 2.2.0, fornisci quanto segue:

```
mvn package -Dflink.version=2.2.0
```

Per creare applicazioni con versioni precedenti di Apache Flink, consulta. [Versioni precedenti](earlier.md)

## Crea la tua applicazione Managed Service for Apache Flink
<a name="how-creating-apps-creating"></a>

Dopo aver creato il codice dell'applicazione, procedi come segue per creare l'applicazione Managed Service for Apache Flink (Amazon MSF):
+ **Carica il codice dell'applicazione**: carica il codice dell'applicazione in un bucket Amazon S3. Quando crei l'applicazione, specifica il nome del bucket S3 e il nome oggetto del codice dell'applicazione. Per un tutorial che mostra come caricare il codice dell'applicazione, consulta il tutorial. [Tutorial: inizia a usare l' DataStream API in Managed Service for Apache Flink](getting-started.md)
+ **Crea la tua applicazione Managed Service for Apache Flink**: utilizza uno dei seguenti metodi per creare la tua applicazione Amazon MSF:
**Nota**  
Amazon MSF crittografa l'applicazione per impostazione predefinita utilizzando. Chiavi di proprietà di AWS Puoi anche creare la tua nuova applicazione utilizzando le chiavi gestite AWS KMS dal cliente (CMKs) per creare, possedere e gestire le tue chiavi da solo. Per informazioni su CMKs, consulta[Gestione delle chiavi in Amazon Managed Service per Apache Flink](key-management-flink.md).
  + **Crea la tua applicazione Amazon MSF utilizzando la AWS console:** puoi creare e configurare la tua applicazione utilizzando la AWS console. 

    Quando crei un'applicazione utilizzando la console, le risorse dipendenti dall'applicazione (come CloudWatch i flussi di log, i ruoli IAM e le politiche IAM) vengono create automaticamente. 

    Quando crei l'applicazione utilizzando la console, devi specificare la versione di Apache Flink utilizzata dall'applicazione selezionandola dal menu a discesa nella pagina **Servizio gestito per Apache Flink: crea applicazione**. 

    Per un tutorial su come utilizzare la console per creare un'applicazione, consulta il [Tutorial: inizia a usare l' DataStream API in Managed Service for Apache Flink](getting-started.md) tutorial.
  + **Crea la tua applicazione Amazon MSF utilizzando la AWS CLI**: puoi creare e configurare la tua applicazione utilizzando la CLI AWS . 

    Quando crei l'applicazione utilizzando la CLI, devi anche creare manualmente le risorse dipendenti dall'applicazione (come flussi di CloudWatch log, ruoli IAM e politiche IAM).

    Quando crei un'applicazione utilizzando la CLI, devi specificare la versione di Apache Flink utilizzata dall'applicazione utilizzando il parametro `RuntimeEnvironment` dell'operazione `CreateApplication`.
**Nota**  
È possibile modificare il nome `RuntimeEnvironment` di un'applicazione esistente. Per scoprire come, consulta [Usa gli aggiornamenti di versione sul posto per Apache Flink](how-in-place-version-upgrades.md).

## Utilizza chiavi gestite dal cliente
<a name="how-creating-apps-use-cmk"></a>

In Amazon MSF, le chiavi gestite dai clienti (CMKs) sono una funzionalità che consente di crittografare i dati dell'applicazione con una chiave creata, posseduta e gestita su AWS Key Management Service ()AWS KMS. Per un'applicazione Amazon MSF, ciò significa che tutti i dati soggetti a un [checkpoint](how-fault.md) o [snapshot](how-snapshots.md) Flink sono crittografati con un CMK definito per quell'applicazione.

Per utilizzare CMK con la tua applicazione, devi prima [creare la tua nuova applicazione](#how-creating-apps-creating) e quindi applicare una CMK. Per ulteriori informazioni sull'utilizzo CMKs, vedere. [Gestione delle chiavi in Amazon Managed Service per Apache Flink](key-management-flink.md)

## Avvia l'applicazione Managed Service for Apache Flink
<a name="how-creating-apps-starting"></a>

Dopo aver creato il codice dell'applicazione, averlo caricato su S3 e aver creato l'applicazione del servizio gestito per Apache Flink, è il momento di avviare l'applicazione. L'avvio di un'applicazione del servizio gestito per Apache Flink richiede in genere alcuni minuti.

Per avviare l'applicazione, utilizza uno dei seguenti metodi:
+ **Avvia l'applicazione Managed Service for Apache Flink utilizzando la AWS console:** puoi eseguire l'applicazione scegliendo **Esegui** nella pagina dell'applicazione nella console. AWS 
+ **Avvia l'applicazione Managed Service for Apache Flink utilizzando l' AWS API:** puoi eseguire l'applicazione utilizzando l'azione. [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) 

## Verifica l'applicazione Managed Service for Apache Flink
<a name="how-creating-apps-verifying"></a>

È possibile verificare il funzionamento dell'applicazione nei seguenti modi:
+ **Utilizzo CloudWatch dei registri: è** possibile utilizzare CloudWatch Logs and CloudWatch Logs Insights per verificare che l'applicazione funzioni correttamente. Per informazioni sull'utilizzo di CloudWatch Logs con l'applicazione Managed Service for Apache Flink, consulta. [Registrazione e monitoraggio in Amazon Managed Service per Apache Flink](monitoring-overview.md)
+ **Utilizzo CloudWatch delle metriche:** puoi utilizzare CloudWatch Metrics per monitorare l'attività dell'applicazione o l'attività delle risorse utilizzate dall'applicazione per l'input o l'output (come i flussi Kinesis, i flussi Firehose o i bucket Amazon S3). Per ulteriori informazioni sui CloudWatch parametri, consulta [Working with Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html) nella Amazon CloudWatch User Guide.
+ **Monitoraggio delle posizioni di output:** se l'applicazione scrive l'output in una posizione (come un bucket o un database Amazon S3), è possibile monitorare quella posizione per i dati scritti.

# Abilita i rollback di sistema per la tua applicazione Managed Service for Apache Flink
<a name="how-system-rollbacks"></a>

Con la funzionalità di rollback del sistema, puoi ottenere una maggiore disponibilità dell'applicazione Apache Flink in esecuzione su Amazon Managed Service per Apache Flink. L'attivazione di questa configurazione consente al servizio di ripristinare automaticamente l'applicazione alla versione precedentemente in esecuzione quando un'azione come o si verifica un bug nel codice o nella configurazione. `UpdateApplication` `autoscaling`

**Nota**  
Per utilizzare la funzionalità di rollback del sistema, è necessario effettuare l'attivazione aggiornando l'applicazione. Per impostazione predefinita, le applicazioni esistenti non utilizzeranno automaticamente il rollback del sistema.

## Come funziona
<a name="how-rollback-works"></a>

Quando avvii un'operazione applicativa, ad esempio un'azione di aggiornamento o di scalabilità, Amazon Managed Service for Apache Flink tenta innanzitutto di eseguire tale operazione. Se rileva problemi che impediscono il successo dell'operazione, come bug nel codice o autorizzazioni insufficienti, il servizio avvia automaticamente un'operazione. `RollbackApplication`

Il rollback tenta di ripristinare l'applicazione alla versione precedente eseguita correttamente, insieme allo stato dell'applicazione associata. Se il rollback ha esito positivo, l'applicazione continua a elaborare i dati con tempi di inattività minimi utilizzando la versione precedente. Se anche il rollback automatico fallisce, Amazon Managed Service for Apache Flink trasferisce l'applicazione allo `READY` stato, in modo che tu possa intraprendere ulteriori azioni, tra cui correggere l'errore e riprovare l'operazione. 

È necessario attivare l'utilizzo dei rollback automatici del sistema. Da questo momento in poi, puoi abilitarlo utilizzando la console o l'API per tutte le operazioni sulla tua applicazione. 

Il seguente esempio di richiesta di `UpdateApplication` azione abilita i rollback del sistema per un'applicazione:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSystemRollbackConfigurationUpdate": { 
         "RollbackEnabledUpdate": "true"
       }
    }
}
```

## Esamina gli scenari comuni per il rollback automatico del sistema
<a name="common-scenarios"></a>

I seguenti scenari illustrano i vantaggi dei rollback automatici del sistema:
+ **Aggiornamenti dell'applicazione:** se aggiorni l'applicazione con un nuovo codice che presenta bug durante l'inizializzazione del job Flink tramite il metodo principale, il rollback automatico consente di ripristinare la versione funzionante precedente. Altri scenari di aggiornamento in cui i rollback del sistema sono utili includono:
  + [Se l'applicazione viene aggiornata per essere eseguita con un parallelismo superiore a MaxParallelism.](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html#how-scaling-auto)
  + Se l'applicazione viene aggiornata per essere eseguita con sottoreti errate per un'applicazione VPC, si verifica un errore durante l'avvio del processo Flink. 
+ **Aggiornamenti della versione di Flink:** quando si esegue l'aggiornamento a una nuova versione di Apache Flink e l'applicazione aggiornata presenta un problema di compatibilità con le snapshot, il rollback del sistema consente di tornare automaticamente alla versione precedente di Flink. 
+ **AutoScaling:** Quando l'applicazione è scalabile ma riscontra problemi di ripristino da un punto di salvataggio, a causa della mancata corrispondenza dell'operatore tra l'istantanea e il grafico del lavoro Flink.

## Usa APIs l'operazione per i rollback del sistema
<a name="operation-apis"></a>

Per offrire una migliore visibilità, Amazon Managed Service per Apache Flink ne offre due APIs relative alle operazioni delle applicazioni che possono aiutarti a tenere traccia degli errori e dei relativi rollback di sistema.

`ListApplicationOperations`

Questa API elenca tutte le operazioni eseguite sull'applicazione, incluse, `UpdateApplication` `Maintenance``RollbackApplication`, e altre in ordine cronologico inverso. L'esempio seguente di richiesta di `ListApplicationOperations` azione elenca le prime 10 operazioni dell'applicazione per l'applicazione:

```
{
   "ApplicationName": "MyApplication",
   "Limit": 10
}
```

Il seguente esempio di richiesta `ListApplicationOperations` aiuta a filtrare l'elenco in base agli aggiornamenti precedenti dell'applicazione:

```
{
   "ApplicationName": "MyApplication",
   "operation": "UpdateApplication"
}
```

`DescribeApplicationOperation`

Questa API fornisce informazioni dettagliate su un'operazione specifica elencata da`ListApplicationOperations`, incluso il motivo dell'errore, se applicabile. L'esempio seguente di richiesta di `DescribeApplicationOperation` azione elenca i dettagli per un'operazione specifica dell'applicazione:

```
{
   "ApplicationName": "MyApplication",
   "OperationId": "xyzoperation"
}
```

Per informazioni sulla risoluzione dei problemi, consulta [Le migliori pratiche per il rollback del sistema](troubleshooting-system-rollback.md).

# Esegui un servizio gestito per l'applicazione Apache Flink
<a name="how-running-apps"></a>

Questo argomento contiene informazioni sull'esecuzione di un servizio gestito per Apache Flink.

Quando si esegue l'applicazione di servizio gestito per Apache Flink, il servizio crea un processo Apache Flink. Un processo Apache Flink è il ciclo di vita dell'esecuzione dell'applicazione servizio gestito per Apache Flink. L'esecuzione del processo e le risorse che utilizza sono gestite dal Job Manager. Il Job Manager divide in attività l'esecuzione dell'applicazione. Ogni attività è gestita da un Task Manager. Quando si monitorano le prestazioni dell'applicazione, è possibile esaminare le prestazioni di ogni Task Manager o del Job Manager nel suo complesso. 

Per informazioni sui job Apache Flink, consulta Jobs [and Scheduling nella documentazione](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/) di Apache Flink.

## Identifica lo stato della candidatura e del lavoro
<a name="how-running-job-status"></a>

Viene visualizzato lo status di esecuzione attuale dell’applicazione e del relativo processo:
+ **Status dell'applicazione:** lo status attuale che ne descrive la fase di esecuzione. Gli status dell’applicazione includono le seguenti opzioni:
  + **Stati costanti dell'applicazione:** l’applicazione in genere rimane in uno di questi stati finché non viene apportata una modifica dello stato:
    + **PRONTA:** un'applicazione nuova o interrotta continua a comparire come PRONTA finché non viene eseguita.
    + **IN ESECUZIONE:** un'applicazione che è stata avviata correttamente si trova nello stato IN ESECUZIONE.
  + **Stati transitori dell'applicazione:** un'applicazione che si trova in uno di questi stati è in genere in fase di transizione a un altro stato. Se una candidatura rimane in uno stato transitorio per un certo periodo di tempo, è possibile interromperla utilizzando l'[StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)azione con il `Force` parametro impostato su. `true` Questi status includono le seguenti opzioni:
    + `STARTING:`Si verifica dopo l'azione. [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) L'applicazione passa dallo status `READY` a `RUNNING`.
    + `STOPPING:`Si verifica dopo l'[StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)azione. L'applicazione passa dallo status `RUNNING` a `READY`.
    + `DELETING:`Si verifica dopo l'[DeleteApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplication.html)azione. L’applicazione è in fase di eliminazione.
    + `UPDATING:`Si verifica dopo l'[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)azione. L'applicazione si sta aggiornando e tornerà allo status `RUNNING` o `READY`.
    + `AUTOSCALING:`L'applicazione ha la `AutoScalingEnabled` proprietà [ ParallelismConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ParallelismConfiguration.html)impostata su e il servizio aumenta il parallelismo dell'applicazione. `true` Quando l'applicazione è in questo stato, l'unica azione API valida che è possibile utilizzare è l'[StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)azione con il `Force` parametro impostato su. `true` Per ulteriori informazioni sul dimensionamento automatico, consulta [Utilizza il ridimensionamento automatico in Managed Service for Apache Flink](how-scaling-auto.md).
    + `FORCE_STOPPING:`Si verifica dopo che l'[StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)azione è stata richiamata con il `Force` parametro impostato su`true`. L'applicazione è in fase di arresto forzato. L'applicazione passa dallo stato `STARTING` `UPDATING`, `STOPPING` o `AUTOSCALING` allo stato `READY`.
    + `ROLLING_BACK:`Si verifica dopo la chiamata [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html)dell'azione. È in corso il ripristino dell'applicazione a una versione precedente. L'applicazione passa dallo status `UPDATING` o `AUTOSCALING` a `RUNNING`.
    + `MAINTENANCE:` si verifica quando il servizio gestito per Apache Flink applica le patch all'applicazione. Per ulteriori informazioni, consulta [Gestisci le attività di manutenzione per Managed Service for Apache Flink](maintenance.md).

  Puoi controllare lo stato dell'applicazione utilizzando la console o utilizzando l'[DescribeApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplication.html)azione.
+ **Status del processo:** quando l’applicazione passa allo status `RUNNING`, viene visualizzato uno status del processo che ne descrive l’attuale fase di esecuzione. Un processo parte dallo status `CREATED`, quindi passa a `RUNNING` una volta iniziato. Se si verificano condizioni di errore, l'applicazione passa allo status seguente: 
  + Per le applicazioni che utilizzano Apache Flink 1.11 e versioni successive, l'applicazione passa allo stato `RESTARTING`.
  + Per le applicazioni che utilizzano Apache Flink 1.8 e versioni precedenti, l'applicazione passa allo stato `FAILING`.

  L'applicazione passa quindi allo stato `RESTARTING` o `FAILED`, a seconda che il lavoro possa essere riavviato o meno. 

  Puoi controllare lo stato del lavoro esaminando il CloudWatch registro della tua candidatura per eventuali modifiche allo stato.

## Esegui carichi di lavoro in batch
<a name="batch-workloads"></a>

Il servizio gestito per Apache Flink supporta l'esecuzione di carichi di lavoro in batch di Apache Flink. In un processo batch, quando un processo Apache Flink passa allo stato **FINITO**, lo stato dell'applicazione del servizio gestito per Apache Flink è impostato su **PRONTO**. Per ulteriori informazioni sugli stati dei processi Flink, consulta [Processi e pianificazione](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/internals/job_scheduling/).

# Consulta le risorse dell'applicazione Managed Service for Apache Flink
<a name="how-resources"></a>

Questa sezione descrive le risorse di sistema utilizzate dall'applicazione. Comprendere come il servizio gestito per Apache Flink alloca e utilizza le risorse è fondamentale per progettare, creare e mantenere un'applicazione di servizio gestito per Apache Flink efficiente e stabile.

## Managed Service per le risorse delle applicazioni Apache Flink
<a name="how-resources-kda"></a>

Managed Service for Apache Flink è un AWS servizio che crea un ambiente per ospitare l'applicazione Apache Flink. Il servizio Managed Service for Apache Flink fornisce risorse utilizzando unità denominate **Kinesis Processing** Units (). KPUs

Una KPU rappresenta le seguenti risorse di sistema:
+ un core CPU;
+ 4 GB di memoria, di cui un GB di memoria nativa e tre GB di memoria heap;
+ 50 GB di spazio su disco.

KPUs **esegue le applicazioni in unità di esecuzione distinte denominate attività e **sottoattività**.** Un’attività secondaria rappresenta l'equivalente di un thread.

Il numero di KPUs elementi disponibili per un'applicazione è uguale all'`Parallelism`impostazione dell'applicazione, diviso per l'`ParallelismPerKPU`impostazione dell'applicazione. 

Per ulteriori informazioni sul parallelismo delle applicazioni, consulta [Implementa la scalabilità delle applicazioni](how-scaling.md).

## Risorse dell'applicazione Apache Flink
<a name="how-resources-flink"></a>

L'ambiente Apache Flink alloca le risorse per l'applicazione utilizzando unità chiamate **slot di attività**. Quando il servizio gestito per Apache Flink alloca risorse per l'applicazione, assegna uno o più slot di attività di Apache Flink a una singola KPU. Il numero di slot assegnati a una singola KPU è uguale all'impostazione `ParallelismPerKPU` dell'applicazione. Per ulteriori informazioni sugli slot di attività, vedere [Job Scheduling nella documentazione di](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/) Apache Flink.

### Parallelismo degli operatori
<a name="how-resources-flink-operatorparallelism"></a>

È possibile impostare il numero massimo di attività secondarie che un operatore può utilizzare. Questo valore è denominato **parallelismo degli operatori**. Per impostazione predefinita, il parallelismo di ogni operatore dell'applicazione è uguale al parallelismo dell'applicazione. Ciò significa che, per impostazione predefinita, ove necessario ogni operatore dell'applicazione può utilizzare tutte le attività secondarie in essa disponibili.

È possibile impostare il parallelismo degli operatori nell'applicazione utilizzando il metodo `setParallelism`. Questo metodo consente di controllare il numero di attività secondarie che ogni operatore può utilizzare contemporaneamente.

Per ulteriori informazioni sugli operatori, consulta Operatori nella [documentazione](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) di Apache Flink.

### Concatenamento degli operatori
<a name="how-resources-flink-operatorchaining"></a>

Solitamente ogni operatore utilizza un’attività secondaria separata; tuttavia se più operatori agiscono sempre in sequenza il runtime può assegnarli tutti alla stessa attività. Questo processo si chiama **concatenazione degli operatori**.

Se operano tutti sugli stessi dati, è possibile concatenare diversi operatori in sequenza in un'unica attività. Di seguito sono riportati alcuni criteri necessari affinché ciò accada:
+ gli operatori eseguono un trasferimento semplice 1 a 1;
+ tutti gli operatori dispongono dello stesso parallelismo degli operatori.

Quando l'applicazione concatena gli operatori in un'unica attività secondaria conserva le risorse di sistema, poiché il servizio non ha bisogno di eseguire operazioni di rete e allocare attività secondarie a ciascun operatore. Per determinare se l’applicazione utilizza la concatenazione degli operatori è sufficiente consultare il grafico dei processi nella console del servizio gestito per Apache Flink. Ogni vertice dell'applicazione rappresenta uno o più operatori. Il grafico mostra gli operatori che sono stati concatenati in un unico vertice.

# Fatturazione al secondo in Managed Service for Apache Flink
<a name="how-pricing"></a>

Il servizio gestito per Apache Flink viene ora fatturato in incrementi di un secondo. È previsto un costo minimo di dieci minuti per applicazione. La fatturazione al secondo è applicabile alle applicazioni appena lanciate o già in esecuzione. Questa sezione descrive come Managed Service for Apache Flink contabilizza e fattura il tuo utilizzo. Per ulteriori informazioni sui prezzi di Managed Service for Apache Flink, consulta i prezzi di [Amazon Managed Service for Apache](https://aws.amazon.com/managed-service-apache-flink/pricing/) Flink. 

## Come funziona
<a name="how-resources-kda"></a>

Il servizio gestito per Apache Flink addebita all'utente la durata e il numero di **unità di elaborazione Kinesis (KPUs)** fatturate in incrementi di un secondo nelle tariffe supportate. Regioni AWS Una singola KPU comprende 1 vCPU di elaborazione e 4 GB di memoria. Ti viene addebitata una tariffa oraria in base al numero di applicazioni KPUs utilizzate per eseguire le tue applicazioni. 

Ad esempio, un'applicazione in esecuzione per 20 minuti e 10 secondi verrà addebitata per 20 minuti e 10 secondi, moltiplicata per le risorse utilizzate. A un'applicazione in esecuzione per 5 minuti verrà addebitato il costo minimo di dieci minuti, moltiplicato per le risorse utilizzate.

Il servizio gestito per Apache Flink indica l'utilizzo in ore. Ad esempio, 15 minuti corrispondono a 0,25 ore. 

Per le applicazioni Apache Flink, viene addebitata una singola KPU aggiuntiva per applicazione, utilizzata per l'orchestrazione. Le applicazioni sono inoltre a pagamento per l'esecuzione di storage e backup durevoli. Lo storage delle applicazioni in esecuzione viene utilizzato per le funzionalità di elaborazione dello stato in Managed Service for Apache Flink e viene addebitato per. GB/month. Durable backups are optional and provide point-in-time recovery for applications, charged per GB/month 

In modalità streaming, Managed Service for Apache Flink ridimensiona automaticamente il numero di dati KPUs richiesti dall'applicazione di elaborazione dei flussi in base alle variazioni delle esigenze di memoria e di calcolo. Puoi scegliere di fornire alla tua applicazione il numero richiesto di. KPUs 

## Regione AWS disponibilità
<a name="how-pricing-regions"></a>

**Nota**  
Al momento, la fatturazione al secondo non è disponibile nelle seguenti regioni: AWS GovCloud (Stati Uniti orientali), (Stati Uniti occidentali), Cina AWS GovCloud (Pechino) e Cina (Ningxia).

La fatturazione al secondo è disponibile nelle seguenti aree: Regioni AWS
+ Stati Uniti orientali (Virginia settentrionale), us-east-1
+ Stati Uniti orientali (Ohio): us-east-2
+ Stati Uniti occidentali (California settentrionale) - us-west-1
+ Stati Uniti occidentali (Oregon): us-west-2
+ Africa (Città del Capo) - af-south-1
+ Asia Pacifico (Hong Kong) - ap-east-1
+ Asia Pacifico (Hyderabad) - ap-south-1
+ Asia Pacifico (Giacarta) - ap-southeast-3
+ Asia Pacifico (Melbourne) - ap-southeast-4
+ Asia Pacifico (Mumbai) - ap-south-1
+ Asia Pacifico (Osaka) - ap-northeast-3
+ Asia Pacifico (Seoul) - ap-northeast-2
+ Asia Pacifico (Singapore) - ap-southeast-1
+ Asia Pacifico (Sydney): ap-southeast-2
+ Asia Pacifico (Tokyo), ap-northeast-1
+ Canada (Centrale) - ca-central-1
+ Canada occidentale (Calgary) - ca-west-1
+ Europa (Francoforte) - eu-central-1
+ Europa (Irlanda) - eu-west-1
+ Europa (Londra) - eu-west-2
+ Europa (Milano) (eu-south-1)
+ Europa (Parigi) - eu-west-3
+ Europa (Spagna) (eu-south-2)
+ Europa (Stoccolma) - eu-north-1
+ Europa (Zurigo) (eu-central-2)
+ Israele (Tel Aviv) - il-central-1
+ Medio Oriente (Bahrein) - me-south-1
+ Regione Medio Oriente (EAU) (me-central-1)
+ Sud America (San Paolo) - sa-east-1

## Esempi di prezzi
<a name="how-pricing-examples"></a>

Puoi trovare esempi di prezzi nella pagina dei prezzi di Managed Service for Apache Flink. Per ulteriori informazioni, consulta la pagina dei prezzi di [Amazon Managed Service for Apache Flink](https://aws.amazon.com/managed-service-apache-flink/pricing/). Di seguito sono riportati altri esempi con illustrazioni del rapporto sui costi di utilizzo per ciascuno di essi.

### Un carico di lavoro intenso e di lunga durata
<a name="pricing-example-1"></a>

Sei un grande servizio di streaming video e vorresti creare una raccomandazione video in tempo reale basata sulle interazioni degli utenti. Si utilizza un'applicazione Apache Flink in Managed Service for Apache Flink per importare continuamente gli eventi di interazione degli utenti da più flussi di dati Kinesis e per elaborare gli eventi in tempo reale prima di inviarli a un sistema a valle. Gli eventi di interazione con l'utente vengono trasformati utilizzando diversi operatori. Ciò include il partizionamento dei dati per tipo di evento, l'arricchimento dei dati con metadati aggiuntivi, l'ordinamento dei dati per timestamp e il buffering dei dati per 5 minuti prima della consegna. L'applicazione prevede molte fasi di trasformazione che richiedono molta elaborazione e sono parallelizzabili. L'applicazione Flink è configurata per funzionare con 20 KPUs per adattarsi al carico di lavoro. L'applicazione utilizza 1 GB di backup durevole delle applicazioni ogni giorno. Le tariffe mensili del servizio gestito per Apache Flink verranno calcolate come segue:

**Costi mensili**

Il prezzo nella regione Stati Uniti orientali (Virginia settentrionale) è di 0,11 USD per KPU/ora. Managed Service for Apache Flink alloca 50 GB di storage delle applicazioni in esecuzione per KPU e addebita 0,10 USD per GB/mese.
+ Costi mensili KPU: 24 ore\$1 30 giorni\$1 ( KPUs 20\$1 1 KPU aggiuntiva per l'applicazione di streaming) \$1 0,11 USD/ora = 1.584,00 USD
+ Costi mensili di archiviazione delle applicazioni in esecuzione: 30 giorni \$1 20 \$1 50 \$1 0,10 USD/GB al mese = 100,00 USD KPUs GB/KPUs 
+ Costi mensili durevoli per lo storage delle applicazioni: 30 giorni\$1 1 GB \$1 0,023 GB/mese = 0,03 USD
+ **Spese totali: 1.584,00 USD \$1 100 USD \$1 0,03 USD = 1.684,03 USD**

**Report sull'utilizzo dei costi per Managed Service for Apache Flink sulla console Billing and Cost Management per il mese**

Analisi Kinesis
+ USD 1.684,03 - Stati Uniti orientali (Virginia settentrionale)
+ Amazon Kinesis Analytics CreateSnapshot
  + 0,023 USD per GB al mese di backup durevoli delle applicazioni
    + 1 GB al mese: 0,03 USD
+ Amazon Kinesis Analytics StartApplication
  + 0,10 USD per GB al mese di storage delle applicazioni in esecuzione
    + 1.000 GB al mese: 100 USD
  + 0,11 USD per unità di elaborazione Kinesis all'ora per le applicazioni Apache Flink
    + 15.120 KPU/ora: 1.584 USD

### Un carico di lavoro in batch che dura circa 15 minuti ogni giorno
<a name="pricing-example-2"></a>

Utilizzi un'applicazione Apache Flink in Managed Service for Apache Flink per trasformare i dati di log in Amazon Simple Storage Service (Amazon S3) in modalità batch. I dati di registro vengono trasformati utilizzando diversi operatori. Ciò include l'applicazione di uno schema ai diversi eventi di registro, il partizionamento dei dati per tipo di evento e l'ordinamento dei dati per timestamp. L'applicazione prevede molte fasi di trasformazione, ma nessuna richiede un uso intensivo dal punto di vista computazionale. Questa applicazione acquisisce 2.000 dati records/second per 15 minuti ogni giorno in un mese di 30 giorni. Non vengono creati backup durevoli delle applicazioni. Le tariffe mensili del servizio gestito per Apache Flink verranno calcolate come segue:

**Costi mensili**

Il prezzo nella regione Stati Uniti orientali (Virginia settentrionale) è di 0,11 USD per KPU/ora. Managed Service for Apache Flink alloca 50 GB di storage delle applicazioni in esecuzione per KPU e addebita 0,10 USD per GB/mese.
+ Carico di lavoro in batch: durante i 15 minuti al giorno, l'applicazione Managed Service for Apache Flink ne elabora 2.000 records/second, which takes 2KPUs. 30 days/month \$1 15 minutes/day = 450 minutes/month
+ Costi KPU mensili: 450 minutes/month USD\$1 (KPUs 2\$11 KPU aggiuntiva per l'applicazione di streaming) \$1 0,11 USD/ora = 2,48 USD
+ Costi mensili di archiviazione delle applicazioni in esecuzione: 450 minutes/month \$1 2 \$1 50 \$1 0,10 USD/GB al mese = 0,11 USD KPUs GB/KPUs 
+ **Costi totali: 2,48 USD \$1 0,11 = 2,59 USD**

**Report sull'utilizzo dei costi per Managed Service for Apache Flink sulla console Billing and Cost Management per il mese**

Analisi Kinesis
+ 2,59 USD - Stati Uniti orientali (Virginia settentrionale)
+ Amazon Kinesis Analytics StartApplication
  + 0,10 USD per GB al mese di backup di applicazioni in esecuzione
    + 1,042 GB al mese: 0,11 USD
  + 0,11 USD per unità di elaborazione Kinesis all'ora per le applicazioni Apache Flink
    + 22,5 KPU/ora: 2,48 USD

### Un'applicazione di test che si interrompe e si avvia continuamente nella stessa ora, con costi minimi multipli
<a name="pricing-example-3"></a>

Sei una grande piattaforma di e-commerce che elabora milioni di transazioni ogni giorno. Vuoi sviluppare il rilevamento delle frodi in tempo reale. Utilizzi un'applicazione Apache Flink in Managed Service for Apache Flink per importare gli eventi di transazione da Kinesis Data Streams ed elaborare gli eventi in tempo reale con diverse fasi di trasformazione. Ciò include l'utilizzo di una finestra scorrevole per aggregare gli eventi, il partizionamento degli eventi per tipi di eventi e l'applicazione di regole di rilevamento specifiche per diversi tipi di eventi. Durante lo sviluppo, si avvia e si arresta l'applicazione più volte per testare ed eseguire il debug del comportamento. In alcuni casi l'applicazione viene eseguita solo per pochi minuti. C'è un'ora in cui stai testando l'applicazione con 4 KPUs e l'applicazione non utilizza alcun backup durevole dell'applicazione:
+ Alle 10:05 si avvia l'applicazione, che viene eseguita per 30 minuti prima di essere interrotta alle 10:35.
+ Alle 10:40, si riavvia l'applicazione, che viene eseguita per 5 minuti prima di essere interrotta alle 10:45.
+ Alle 10:50, si riavvia l'applicazione, che viene eseguita per 2 minuti prima di essere interrotta alle 10:52.

Il servizio gestito per Apache Flink addebita un minimo di 10 minuti di utilizzo ogni volta che un'applicazione viene avviata. Il servizio gestito mensile per l'utilizzo di Apache Flink per l'applicazione verrà calcolato come segue:
+ Prima volta che l'applicazione si avvia e si arresta: 30 minuti di utilizzo
+ Seconda volta che l'applicazione si avvia e si arresta: 10 minuti di utilizzo (l'applicazione viene eseguita per 5 minuti arrotondati al costo minimo di 10 minuti)
+ Terza volta che l'applicazione si avvia e si arresta: 10 minuti di utilizzo (l'applicazione viene eseguita per 2 minuti, arrotondati al costo minimo di 10 minuti)

In totale, all'applicazione verranno addebitati 50 minuti di utilizzo. Se non ci sono altri periodi del mese in cui l'applicazione è in esecuzione, gli addebiti mensili del servizio gestito per Apache Flink verranno calcolati come segue:

**Costi mensili**

Il prezzo nella regione Stati Uniti orientali (Virginia settentrionale) è di 0,11 USD per KPU/ora. Managed Service for Apache Flink alloca 50 GB di storage delle applicazioni in esecuzione per KPU e addebita 0,10 USD per GB/mese.
+ Costi mensili KPU: 50 minuti\$1 (KPUs 4\$11 KPU aggiuntiva per l'applicazione di streaming) \$1 0,11 USD/ora = 0,46 USD (arrotondato al centesimo più vicino)
+ Costi mensili di archiviazione delle applicazioni in esecuzione: 50 minuti\$1 4\$1 50\$1 0,10 USD/GB al mese = KPUs 0,03 USD (arrotondati al GB/KPUs centesimo più vicino)
+ **Costi totali: 0,46 USD \$1 0,03 = 0,49 USD**

**Report sull'utilizzo dei costi per Managed Service for Apache Flink sulla console Billing and Cost Management per il mese**

Analisi Kinesis
+ USD 0,49 - Stati Uniti orientali (Virginia settentrionale)
+ Amazon Kinesis Analytics StartApplication
  + 0,10 USD per GB al mese di storage delle applicazioni in esecuzione
    + 0,232 GB al mese: 0,03 USD
  + 0,11 USD per unità di elaborazione Kinesis all'ora per le applicazioni Apache Flink
    + 4,167 KPU/ora - 0,46 USD

# Esamina i componenti DataStream dell'API
<a name="how-datastream"></a>

La tua applicazione Apache Flink utilizza l'[ DataStream API Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) per trasformare i dati in un flusso di dati. 

Questa sezione descrive i diversi componenti che spostano, trasformano e tracciano i dati:
+ [Usa i connettori per spostare i dati in Managed Service for Apache Flink con l'API DataStream](how-connectors.md): questi componenti spostano i dati tra l'applicazione e le origini e le destinazioni dati esterne.
+ [Trasforma i dati utilizzando gli operatori in Managed Service for Apache Flink con l'API DataStream](how-operators.md): questi componenti trasformano o raggruppano gli elementi di dati all'interno dell'applicazione.
+ [Tieni traccia degli eventi in Managed Service for Apache Flink utilizzando l'API DataStream](how-time.md): Questo argomento descrive come Managed Service for Apache Flink tiene traccia degli eventi quando si utilizza l' DataStream API.

# Usa i connettori per spostare i dati in Managed Service for Apache Flink con l'API DataStream
<a name="how-connectors"></a>

Nell' DataStream API Amazon Managed Service for Apache Flink, i *connettori* sono componenti software che spostano i dati da e verso un'applicazione Managed Service for Apache Flink. I connettori sono integrazioni flessibili che consentono di leggere file e directory. I connettori sono costituiti da moduli completi per l'interazione con i servizi Amazon e i sistemi di terze parti.

I tipi di connettori comprendono:
+ [Aggiungi sorgenti di dati in streaming](how-sources.md): invio di dati all'applicazione da un flusso di dati, un file o un'altra origine dati Kinesis.
+ [Scrivi dati usando i sink](how-sinks.md): invia dati dall'applicazione a un flusso di dati Kinesis, a un flusso Firehose o a un'altra destinazione di dati.
+ [Usa I/O asincrono](how-async.md): fornisce l'accesso asincrono a un'origine dati (ad esempio, un database) per arricchire gli eventi di flusso. 

## Connettori disponibili
<a name="how-connectors-list"></a>

Il framework Apache Flink contiene connettori per l'accesso ai dati da vari tipi di origini. Per informazioni sui connettori disponibili nel framework Apache Flink, consulta [Connettori](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/) nella [documentazione di Apache Flink.](https://nightlies.apache.org/flink/flink-docs-release-1.15/)

**avvertimento**  
Se hai applicazioni in esecuzione su Flink 1.6, 1.8, 1.11 o 1.13 e desideri eseguirle nelle regioni del Medio Oriente (Emirati Arabi Uniti), Asia Pacifico (Hyderabad), Israele (Tel Aviv), Europa (Zurigo), Medio Oriente (Emirati Arabi Uniti), Asia Pacifico (Melbourne) o Asia Pacifico (Giacarta), potresti dover ricostruire l'archivio delle applicazioni con un connettore aggiornato o eseguire l'aggiornamento a Flink 1.18.   
I connettori Apache Flink sono archiviati nei propri archivi open source. Se stai eseguendo l'aggiornamento alla versione 1.18 o successiva, devi aggiornare le tue dipendenze. Per accedere al repository per i connettori Apache Flink, vedi. AWS [flink-connector-aws](https://github.com/apache/flink-connector-aws)  
Il precedente sorgente Kinesis non `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` è più disponibile e potrebbe essere rimosso con le future release di Flink. Utilizzate [invece Kinesis Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source).  
Non esiste compatibilità a livello di stato tra `FlinkKinesisConsumer` e`KinesisStreamsSource`. Per i dettagli, consulta [Migrazione dei job esistenti al nuovo Kinesis Streams](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer) Source nella documentazione di Apache Flink.  
 Di seguito sono riportate le linee guida consigliate:   


**Aggiornamenti dei connettori**  

| Versione di Flink | Connettore usato | Risoluzione | 
| --- | --- | --- | 
| 1.19, 1.20 | Fonte Kinesis |  Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sorgente Kinesis Data Streams più recente. Deve essere una qualsiasi versione 5.0.0 o successiva. Per ulteriori informazioni, consulta [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/).  | 
| 1.19, 1.20 | Lavello Kinesis |  Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sink Kinesis Data Streams più recente. Deve essere una qualsiasi versione 5.0.0 o successiva. Per ulteriori informazioni, vedere [Kinesis Streams](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink) Sink.  | 
| 1.19, 1.20 | Sorgente DynamoDB Streams |  Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sorgente DynamoDB Streams più recente. Deve essere una qualsiasi versione 5.0.0 o successiva. Per ulteriori informazioni, consulta [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/).  | 
| 1.19, 1.20 | Lavello DynamoDB | Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sink DynamoDB più recente. Deve essere una qualsiasi versione 5.0.0 o successiva. Per ulteriori informazioni, consulta [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/). | 
| 1.19, 1.20 | Lavandino Amazon SQS |  Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sink di Amazon SQS più recente. Deve essere una qualsiasi versione 5.0.0 o successiva. Per ulteriori informazioni, consulta [Amazon SQS Sink.](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/)  | 
| 1.19, 1,20 | Servizio gestito Amazon per Prometheus Sink |  Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sink di Amazon Managed Service for Prometheus più recente. Deve essere una qualsiasi versione 1.0.0 o successiva. Per ulteriori informazioni, vedere [Prometheus](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/) Sink.  | 

# Aggiungi sorgenti di dati di streaming a Managed Service for Apache Flink
<a name="how-sources"></a>

Apache Flink fornisce connettori per la lettura da file, socket, raccolte e origini personalizzate. Nel codice dell'applicazione, utilizzi un'[origine Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources) per ricevere dati da un flusso. Questa sezione descrive le origini disponibili per i servizi Amazon.

## Usa i flussi di dati Kinesis
<a name="input-streams"></a>

`KinesisStreamsSource`Fornisce dati in streaming all'applicazione da un flusso di dati Amazon Kinesis. 

### Creazione di una `KinesisStreamsSource`
<a name="input-streams-create"></a>

Il seguente esempio di codice illustra la creazione di una `KinesisStreamsSource`:

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Per ulteriori informazioni sull'utilizzo di a`KinesisStreamsSource`, consulta [Amazon Kinesis Data](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) Streams Connector nella [documentazione di Apache Flink e KinesisConnectors ](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) il nostro esempio pubblico su Github.

### Crea un account che utilizza un consumatore EFO `KinesisStreamsSource`
<a name="input-streams-efo"></a>

`KinesisStreamsSource`Ora supporta [Enhanced Fan-Out (](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/)EFO). 

Se un consumatore Kinesis utilizza EFO, il servizio del flusso di dati Kinesis gli fornisce una larghezza di banda dedicata, anziché chiedere al consumatore di condividere la larghezza di banda fissa del flusso con gli altri consumatori che leggono dal flusso.

Per ulteriori informazioni sull'utilizzo di EFO con il consumatore Kinesis, [consulta FLIP-128: Enhanced Fan](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) Out for Kinesis Consumers. AWS 

È possibile abilitare il consumatore EFO impostando i seguenti parametri sul consumatore Kinesis:
+ **READER\$1TYPE:** imposta questo parametro su **EFO** per consentire all'applicazione di utilizzare un consumatore EFO per accedere ai dati di Kinesis Data Stream. 
+ **EFO\$1CONSUMER\$1NAME**: imposta questo parametro su un valore di stringa che sia unico tra i consumatori di questo flusso. Il riutilizzo di un nome consumatore nello stesso flusso di dati Kinesis causerà l'interruzione del precedente consumatore che utilizzava quel nome. 

Per configurare un `KinesisStreamsSource` per l'utilizzo di EFO, aggiungi i seguenti parametri al consumatore:

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Per un esempio di un'applicazione Managed Service for Apache Flink che utilizza un consumatore EFO, vedi il nostro esempio [pubblico di Kinesis](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) Connectors su Github.

## Usa Amazon MSK
<a name="input-msk"></a>

L'origine `KafkaSource` fornisce dati di streaming all'applicazione da un argomento di Amazon MSK. 

### Creazione di una `KafkaSource`
<a name="input-msk-create"></a>

Il seguente esempio di codice illustra la creazione di una `KafkaSource`:

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Per ulteriori informazioni sull'utilizzo di una `KafkaSource`, consulta [Replica MSK](earlier.md#example-msk).

# Scrivi dati utilizzando i sinks in Managed Service for Apache Flink
<a name="how-sinks"></a>

Nel codice dell'applicazione, puoi utilizzare qualsiasi connettore [sink Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) per scrivere in sistemi esterni, inclusi AWS servizi come Kinesis Data Streams e DynamoDB.

Apache Flink fornisce anche sink per file e socket ed è possibile implementare sink personalizzati. Tra i diversi sink supportati, vengono utilizzati frequentemente i seguenti:

## Usa i flussi di dati Kinesis
<a name="sinks-streams"></a>

Apache Flink fornisce informazioni sul [connettore del flusso di dati Kinesis](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/) nella documentazione di Apache Flink.

Per un esempio di applicazione che utilizza un flusso di dati Kinesis per l'input e l'output, consulta [Tutorial: inizia a usare l' DataStream API in Managed Service for Apache Flink](getting-started.md).

## Usa Apache Kafka e Amazon Managed Streaming per Apache Kafka (MSK)
<a name="sinks-MSK"></a>

Il [connettore Apache Flink Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink) offre un supporto completo per la pubblicazione di dati su Apache Kafka e Amazon MSK, incluse le garanzie Exactly Once. [Per imparare a scrivere su Kafka, consulta gli esempi di connettori Kafka nella documentazione di Apache Flink.](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors)

## Usa Amazon S3
<a name="sinks-s3"></a>

Puoi utilizzare il `StreamingFileSink` di Apache Flink per scrivere oggetti in un bucket Amazon S3.

Per un esempio su come scrivere oggetti su S3, consulta [Esempio: scrittura su un bucket Amazon S3](earlier.md#examples-s3). 

## Usa Firehose
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer`[È un sink Apache Flink affidabile e scalabile per l'archiviazione dell'output delle applicazioni utilizzando il servizio Firehose.](https://docs.aws.amazon.com/firehose/latest/dev/) In questa sezione viene descritto come impostare un progetto Maven per creare e utilizzare un `FlinkKinesisFirehoseProducer`.

**Topics**
+ [Creazione di una `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [Esempio di codice `FlinkKinesisFirehoseProducer`](#sinks-firehose-sample)

### Creazione di una `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

Il seguente esempio di codice illustra la creazione di una `FlinkKinesisFirehoseProducer`:

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### Esempio di codice `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-sample"></a>

Il seguente esempio di codice dimostra come creare e configurare un flusso di dati Apache Flink `FlinkKinesisFirehoseProducer` e inviarlo al servizio Firehose.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Per un tutorial completo su come utilizzare il lavello Firehose, vedere. [Esempio: scrittura su Firehose](earlier.md#get-started-exercise-fh)

# Usa Asynchronous I/O nel servizio gestito per Apache Flink
<a name="how-async"></a>

Un I/O operatore asincrono arricchisce i dati del flusso utilizzando una fonte di dati esterna come un database. Il servizio gestito per Apache Flink arricchisce gli eventi di flusso in modo asincrono in modo che le richieste possano essere raggruppate in batch per una maggiore efficienza. 

Per ulteriori informazioni, consulta I/O [asincrono](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/) nella documentazione di Apache Flink.

# Trasforma i dati utilizzando gli operatori in Managed Service for Apache Flink con l'API DataStream
<a name="how-operators"></a>

Per trasformare i dati in entrata in un servizio gestito per Apache Flink viene utilizzato un operatore *Apache Flink*. Un operatore Apache Flink trasforma uno o più flussi di dati in un nuovo flusso di dati. Il nuovo flusso di dati contiene dati modificati dal flusso di dati originale. Apache Flink offre più di 25 operatori di elaborazione di flussi predefiniti. Per ulteriori informazioni, consulta [Operatori](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) nella documentazione di Apache Flink.

**Topics**
+ [Usa gli operatori di trasformazione](#how-operators-transform)
+ [Usa gli operatori di aggregazione](#how-operators-agg)

## Usa gli operatori di trasformazione
<a name="how-operators-transform"></a>

Di seguito è riportato un esempio di semplice trasformazione del testo su uno dei campi di un flusso di dati JSON. 

Questo codice crea un flusso di dati trasformato. Il nuovo flusso di dati contiene gli stessi dati del flusso originale, con la stringa "` Company`" aggiunta al contenuto del campo `TICKER`.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## Usa gli operatori di aggregazione
<a name="how-operators-agg"></a>

Di seguito è riportato un esempio di operatore di aggregazione. Il codice crea un flusso di dati aggregato. L'operatore crea una finestra a cascata di 5 secondi e restituisce la somma dei valori `PRICE` per i record nella finestra con lo stesso valore `TICKER`.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

Per ulteriori esempi di codice, consulta [Esempi di creazione e utilizzo di Managed Service per applicazioni Apache Flink](examples-collapsibles.md). 

# Tieni traccia degli eventi in Managed Service for Apache Flink utilizzando l'API DataStream
<a name="how-time"></a>

Servizio gestito per Apache Flink tiene traccia degli eventi utilizzando i seguenti timestamp:
+ **Tempo di elaborazione:** si riferisce all'ora di sistema della macchina che esegue la rispettiva operazione.
+ **Ora evento:** si riferisce all'ora in cui ogni singolo evento si è verificato sul dispositivo di produzione.
+ **Tempo di acquisizione:** si riferisce all'ora in cui gli eventi entrano nel servizio gestito per Apache Flink.

Si imposta il tempo utilizzato dall'ambiente di streaming utilizzando`setStreamTimeCharacteristic`. 

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

Per ulteriori informazioni sui timestamp, consulta [Generazione di filigrane](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/) nella documentazione di Apache Flink.

# Componenti dell'API Review Table
<a name="how-table"></a>

La tua applicazione Apache Flink utilizza l'[API Apache Flink Table](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/tableapi/) per interagire con i dati in un flusso attraverso un modello relazionale. Utilizza l'API Table per accedere ai dati utilizzando origini Table, quindi utilizza le funzioni Table per trasformare e filtrare i dati della tabella. Puoi trasformare e filtrare i dati della tabella utilizzando funzioni API o comandi SQL. 

Questa sezione contiene i seguenti argomenti:
+ [Connettori API per tabelle](how-table-connectors.md): questi componenti spostano i dati tra l'applicazione e origini e destinazioni dati esterne.
+ [Attributi temporali dell'API della tabella](how-table-timeattributes.md): questo argomento descrive il modo in cui il servizio gestito per Apache Flink tiene traccia degli eventi quando viene utilizzata l'API Table.

# Connettori API per tabelle
<a name="how-table-connectors"></a>

Nel modello di programmazione Apache Flink, i connettori sono componenti utilizzati dall'applicazione per leggere o scrivere dati da fonti esterne, come altri AWS servizi.

Con l'API Apache Flink Table, puoi utilizzare i seguenti tipi di connettori:
+ [Sorgenti API per tabelle](#how-table-connectors-source): utilizzi i connettori di origine dell'API Table per creare tabelle all'interno dell'utente `TableEnvironment` utilizzando chiamate API o query SQL.
+ [Table API sink](#how-table-connectors-sink): utilizzi i comandi SQL per scrivere dati di tabella su origini esterne come un argomento Amazon MSK o un bucket Amazon S3.

## Sorgenti API per tabelle
<a name="how-table-connectors-source"></a>

Crei un'origine di tabella da un flusso di dati. Il codice seguente crea una tabella da un argomento di Amazon MSK:

```
//create the table
    final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
    consumer.setStartFromEarliest();
    //Obtain stream
    DataStream<StockRecord> events = env.addSource(consumer);

    Table table = streamTableEnvironment.fromDataStream(events);
```

Per ulteriori informazioni sulle sorgenti delle tabelle, consulta [Table & SQL Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) nella documentazione di Apache Flink.

## Table API sink
<a name="how-table-connectors-sink"></a>

Per scrivere i dati della tabella in un sink, crea il sink in SQL, quindi esegui il sink basato su SQL sull'oggetto `StreamTableEnvironment`.

Nell'esempio di codice seguente viene mostrato come scrivere dati di tabella su un sink Amazon S3:

```
final String s3Sink = "CREATE TABLE sink_table (" +
    "event_time TIMESTAMP," +
    "ticker STRING," +
    "price DOUBLE," +
    "dt STRING," +
    "hr STRING" +
    ")" +
    " PARTITIONED BY (ticker,dt,hr)" +
    " WITH" +
    "(" +
    " 'connector' = 'filesystem'," +
    " 'path' = '" + s3Path + "'," +
    " 'format' = 'json'" +
    ") ";

    //send to s3
    streamTableEnvironment.executeSql(s3Sink);
    filteredTable.executeInsert("sink_table");
```

 Puoi utilizzare il parametro `format` per controllare il formato impiegato dal servizio gestito per Apache Flink per scrivere l'output nel sink. Per informazioni sui formati, consulta [Connettori supportati nella documentazione](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) di Apache Flink.

## Sorgenti e sink definiti dall'utente
<a name="how-table-connectors-userdef"></a>

Puoi utilizzare i connettori Apache Kafka esistenti per inviare dati da e verso altri servizi AWS , come Amazon MSK e Amazon S3. Per interagire con altre origini e destinazioni dati, puoi definire fonti e sink personalizzati. Per ulteriori informazioni, consulta [Sources and Sinks definiti dall'utente](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sourcessinks/) nella documentazione di Apache Flink.

# Attributi temporali dell'API della tabella
<a name="how-table-timeattributes"></a>

Ogni record in un flusso di dati ha diversi timestamp che definiscono quando si sono verificati gli eventi correlati al record:
+ **Ora evento**: un timestamp definito dall'utente che definisce quando si è verificato l'evento che ha creato il record.
+ **Tempo di acquisizione**: l'ora in cui l'applicazione ha recuperato il record dal flusso di dati.
+ **Tempo di elaborazione**: l'ora in cui l'applicazione ha elaborato il record.

Quando l'API Apache Flink Table crea finestre basate su tempi record, definisci quale di questi timestamp utilizza utilizzando il metodo. `setStreamTimeCharacteristic` 

Per ulteriori informazioni sull'utilizzo dei timestamp con l'API Table, consulta [Time Attributes](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/concepts/time_attributes/) e [Timely Stream Processing](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/) nella documentazione di Apache Flink.

# Usa Python con Managed Service per Apache Flink
<a name="how-python"></a>

**Nota**  
Se stai sviluppando un'applicazione Python Flink su un nuovo Mac con chip Apple Silicon, potresti riscontrare alcuni [problemi noti con le dipendenze Python della versione 1.15](https://issues.apache.org/jira/browse/FLINK-26981). PyFlink In questo caso consigliamo di eseguire l'interprete Python in Docker. Per step-by-step istruzioni, consulta lo sviluppo della versione [PyFlink 1.15](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/LocalDevelopmentOnAppleSilicon) su Apple Silicon Mac.

La versione 2.2 di Apache Flink include il supporto per la creazione di applicazioni utilizzando la versione 3.12 di Python; il supporto per Python versione 3.8 è stato rimosso. Per ulteriori informazioni, consulta [Flink Python Docs](https://nightlies.apache.org/flink/flink-docs-release-2.2/api/python/). È possibile creare un'applicazione del servizio gestito per Apache Flink con Python seguendo la procedura descritta di seguito:
+ Crea il codice dell'applicazione Python come file di testo con un metodo `main`.
+ Raggruppa il file di codice dell'applicazione e tutte le dipendenze Python o Java in un file zip e caricalo in un bucket Amazon S3.
+ Crea la tua applicazione del servizio gestito per Apache Flink, specificando la posizione del codice Amazon S3, le proprietà dell'applicazione e le sue impostazioni.

Ad alto livello, l'API Python Table è un wrapper per l'API Java Table. Per informazioni sull'API Python Table, consulta il [Table API Tutorial](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/table_api_tutorial/) nella documentazione di Apache Flink.

# Programma il tuo Managed Service per l'applicazione Apache Flink Python
<a name="how-python-programming"></a>

Puoi codificare la tua applicazione del servizio gestito per Apache Flink per Python utilizzando l'API Apache Flink Python Table. Il motore Apache Flink traduce le istruzioni dell'API Python Table (in esecuzione nella macchina virtuale Python) in istruzioni dell'API Java Table (in esecuzione nella macchina virtuale Java). 

Utilizza l'API Table Python seguendo la procedura descritta di seguito:
+ Crea un riferimento a `StreamTableEnvironment`.
+ Crea oggetti `table` dai tuoi dati di streaming di origine eseguendo query sul riferimento `StreamTableEnvironment`.
+ Esegui interrogazioni sui tuoi oggetti `table` per creare tabelle di output.
+ Scrivi le tue tabelle di output nelle tue destinazioni usando uno `StatementSet`.

Per iniziare a utilizzare l'API Python Table nel servizio gestito per Apache Flink, consulta. [Inizia a usare Amazon Managed Service per Apache Flink for Python](gs-python.md)

## Leggi e scrivi dati in streaming
<a name="how-python-programming-readwrite"></a>

Per leggere e scrivere dati in streaming, esegui query SQL nell'ambiente di tabella.

### Creare una tabella
<a name="how-python-programming-readwrite-createtable"></a>

Il seguente esempio di codice illustra una funzione definita dall'utente che crea una query SQL. La query SQL crea una tabella che interagisce con un flusso Kinesis:

```
def create_table(table_name, stream_name, region, stream_initpos):
   return """ CREATE TABLE {0} (
                `record_id` VARCHAR(64) NOT NULL,
                `event_time` BIGINT NOT NULL,
                `record_number` BIGINT NOT NULL,
                `num_retries` BIGINT NOT NULL,
                `verified` BOOLEAN NOT NULL
              )
              PARTITIONED BY (record_id)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.producer.collection-max-count' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(table_name, stream_name, region, stream_initpos)
```

### Leggi i dati di streaming
<a name="how-python-programming-readwrite-read"></a>

Il seguente esempio di codice mostra come utilizzare la query `CreateTable`SQL precedente su un riferimento di ambiente di tabella per leggere i dati:

```
   table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))
```

### Scrivi dati di streaming
<a name="how-python-programming-readwrite-write"></a>

Il seguente esempio di codice mostra come utilizzare la query SQL dell'esempio `CreateTable` per creare un riferimento alla tabella di output e come utilizzare uno `StatementSet` per interagire con le tabelle per scrivere dati su un flusso Kinesis di destinazione:

```
   table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                       .format(output_table_name, input_table_name))
```

## Leggi le proprietà di runtime
<a name="how-python-programming-properties"></a>

È possibile utilizzare le proprietà di runtime per configurare l'applicazione senza cambiare il codice dell'applicazione.

È possibile specificare le proprietà dell'applicazione nello stesso modo in cui si specifica un'applicazione del servizio gestito per Apache Flink per Java. È possibile specificare le proprietà di runtime nei seguenti modi:
+ Utilizzo dell'[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)azione.
+ Utilizzo dell'[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)azione.
+ Configurando la tua applicazione tramite la console.

È possibile recuperare le proprietà dell'applicazione nel codice leggendo un file json chiamato `application_properties.json` creato dal runtime del servizio gestito di Apache Flink.

Il seguente esempio di codice mostra come effettuare la lettura delle proprietà dell'applicazione dal file `application_properties.json`: 

```
file_path = '/etc/flink/application_properties.json'
   if os.path.isfile(file_path):
       with open(file_path, 'r') as file:
           contents = file.read()
           properties = json.loads(contents)
```

Il seguente esempio di codice di funzione definito dall'utente mostra come effettuare la lettura di un gruppo di proprietà dall'oggetto delle proprietà dell'applicazione: retrieves:

```
def property_map(properties, property_group_id):
   for prop in props:
       if prop["PropertyGroupId"] == property_group_id:
           return prop["PropertyMap"]
```

Il seguente esempio di codice mostra come effettuare la lettura di una proprietà denominata INPUT\$1STREAM\$1KEY da un gruppo di proprietà restituito dall'esempio precedente:

```
input_stream = input_property_map[INPUT_STREAM_KEY]
```

## Crea il pacchetto di codice della tua applicazione
<a name="how-python-programming-package"></a>

Una volta creata l'applicazione Python, raggruppa il file di codice e le dipendenze in un file zip.

Il file zip deve contenere uno script python con un metodo `main` e può facoltivamente contenere quanto segue:
+ File di codice Python
+ Codice Java definito dall'utente nei file JAR
+ Librerie Java nei file JAR

**Nota**  
Il file zip dell'applicazione deve contenere tutte le dipendenze dell'applicazione. Non puoi fare riferimento a librerie da altre origini per la tua applicazione.

# Crea il tuo servizio gestito per l'applicazione Apache Flink Python
<a name="how-python-creating"></a>

## Specificate i vostri file di codice
<a name="how-python-creating-code"></a>

Dopo che è stato creato, il pacchetto di codice dell'applicazione deve essere caricato in un bucket Amazon S3. Quindi crei l'applicazione utilizzando la console o l'[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)azione.

Quando create l'applicazione utilizzando l'[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)azione, specificate i file di codice e gli archivi nel file zip utilizzando uno speciale gruppo di proprietà dell'applicazione denominato`kinesis.analytics.flink.run.options`. Puoi definire i seguenti tipi di file:
+ **python**: un file di testo contenente un metodo principale Python.
+ **jarfile**: un file Java JAR contenente funzioni Java definite dall'utente.
+ **pyFiles**: un file di risorse Python contenente risorse che devono essere utilizzate dall'applicazione.
+ **pyArchives**: un file zip contenente i file di risorse per l'applicazione.

Per ulteriori informazioni sui tipi di file di codice Python di Apache Flink, consulta [Command-Line Interface](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/) nella documentazione di Apache Flink.

**Nota**  
Il servizio gestito per Apache Flink non supporta i tipi di file `pyModule`, `pyExecutable` o `pyRequirements`. Tutto il codice, tutti i requisiti e tutte le dipendenze devono essere contenuti nel file zip. Non è possibile specificare le dipendenze da installare utilizzando pip. 

Il seguente esempio di frammento json mostra come specificare le posizioni dei file all'interno del file zip dell'applicazione:

```
"ApplicationConfiguration": {
    "EnvironmentProperties": {
      "PropertyGroups": [
        {
          "PropertyGroupId": "kinesis.analytics.flink.run.options",
          "PropertyMap": {
            "python": "MyApplication/main.py",
            "jarfile": "MyApplication/lib/myJarFile.jar",
            "pyFiles": "MyApplication/lib/myDependentFile.py",
            "pyArchives": "MyApplication/lib/myArchive.zip"
          }
        },
```

# Monitora il tuo servizio gestito per l'applicazione Apache Flink Python
<a name="how-python-monitoring"></a>

Utilizzate il CloudWatch registro dell'applicazione per monitorare l'applicazione Managed Service for Apache Flink Python.

Il servizio gestito per Apache Flink effettua il log dei seguenti messaggi per applicazioni Python:
+ Messaggi scritti sulla console utilizzando `print()` nel metodo `main` dell'applicazione. 
+ Messaggi inviati in funzioni definite dall'utente utilizzando il pacchetto `logging`. Il seguente esempio di codice mostra come effettuare il log dell'applicazione da una funzione definita dall'utente:

  ```
  import logging
  
  @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
  def doNothingUdf(i):
      logging.info("Got {} in the doNothingUdf".format(str(i)))
      return i
  ```
+ Messaggi di errore generati dall'applicazione.

  Se l'applicazione genera un'eccezione nella funzione `main`, l'eccezioneverrà visualizzata nei log dell'applicazione.

  L'esempio seguente mostra una voce di log per un'eccezione generata dal codice Python:

  ```
  2021-03-15 16:21:20.000   --------------------------- Python Process Started --------------------------
  2021-03-15 16:21:21.000   Traceback (most recent call last):
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 101, in <module>"
  2021-03-15 16:21:21.000       main()
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 54, in main"
  2021-03-15 16:21:21.000   "    table_env.register_function(""doNothingUdf"", doNothingUdf)"
  2021-03-15 16:21:21.000   NameError: name 'doNothingUdf' is not defined
  2021-03-15 16:21:21.000   --------------------------- Python Process Exited ---------------------------
  2021-03-15 16:21:21.000   Run python process failed
  2021-03-15 16:21:21.000   Error occurred when trying to start the job
  ```

**Nota**  
Per evitare possibili problemi di prestazioni, si consiglia di utilizzare solo messaggi di log personalizzati durante lo sviluppo dell'applicazione. 

## Interroga i log con Insights CloudWatch
<a name="how-python-monitoring-insights"></a>

La seguente query CloudWatch Insights cerca i log creati dall'entrypoint Python durante l'esecuzione della funzione principale dell'applicazione:

```
fields @timestamp, message
| sort @timestamp asc
| filter logger like /PythonDriver/
| limit 1000
```

# Utilizzo delle proprietà di runtime in Managed Service for Apache Flink
<a name="how-properties"></a>

È possibile utilizzare le *proprietà di runtime* per configurare l'applicazione senza ricompilare il codice dell'applicazione. 

**Topics**
+ [Gestisci le proprietà di runtime utilizzando la console](#how-properties-console)
+ [Gestisci le proprietà di runtime utilizzando la CLI](#how-properties-cli)
+ [Accedi alle proprietà di runtime in un'applicazione Managed Service for Apache Flink](#how-properties-access)

## Gestisci le proprietà di runtime utilizzando la console
<a name="how-properties-console"></a>

È possibile aggiungere, aggiornare o rimuovere le proprietà di runtime dall'applicazione Managed Service for Apache Flink utilizzando. Console di gestione AWS

**Nota**  
Se utilizzi una versione precedente supportata di Apache Flink e desideri aggiornare le tue applicazioni esistenti ad Apache Flink 1.19.1, puoi farlo utilizzando gli aggiornamenti di versione di Apache Flink in loco. Con gli aggiornamenti di versione in loco, mantieni la tracciabilità delle applicazioni su un singolo ARN tra le versioni di Apache Flink, tra cui istantanee, log, metriche, tag, configurazioni Flink e altro ancora. È `RUNNING` `READY` possibile utilizzare questa funzionalità in qualsiasi stato. Per ulteriori informazioni, consulta [Usa gli aggiornamenti di versione sul posto per Apache Flink](how-in-place-version-upgrades.md).

**Aggiornamento delle proprietà di runtime per un'applicazione del servizio gestito per Apache Flink**

1. Accedi a e apri Console di gestione AWS la console Amazon MSF all'indirizzo https://console.aws.amazon.com /flink.

1. Scegli l'applicazione del servizio gestito per Apache Flink. Scegli **Dettagli dell'applicazione**.

1. Nella pagina della tua applicazione, scegli **Configura**.

1. Espandi la sezione **Proprietà**.

1. Utilizza i controlli nella sezione **Proprietà** per definire un gruppo di proprietà con coppie chiave-valore. Utilizza questi controlli per aggiungere, aggiornare o rimuovere gruppi di proprietà e proprietà di runtime.

1. Scegliere **Aggiorna**.

## Gestisci le proprietà di runtime utilizzando la CLI
<a name="how-properties-cli"></a>

Puoi aggiungere, aggiornare o rimuovere le proprietà di runtime utilizzando la [AWS CLI](https://docs.aws.amazon.com/cli). 

Questa sezione include esempi di richieste di operazioni API per la configurazione delle proprietà di runtime per un'applicazione. Per informazioni su come utilizzare un file JSON come input per un'operazione API, consulta [Codice di esempio dell'API Managed Service per Apache Flink](api-examples.md).

**Nota**  
Sostituisci l'ID account di esempio (*`012345678901`*) nell'esempio seguente con il tuo ID account.

### Aggiungi proprietà di runtime durante la creazione di un'applicazione
<a name="how-properties-create"></a>

Il seguente esempio di richiesta per l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) aggiunge due gruppi di proprietà di runtime (`ProducerConfigProperties` e `ConsumerConfigProperties`) quando crei un'applicazione:

```
{
    "ApplicationName": "MyApplication",
    "ApplicationDescription": "my java test app",
    "RuntimeEnvironment": "FLINK-1_19",
    "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role",
    "ApplicationConfiguration": {
        "ApplicationCodeConfiguration": {
            "CodeContent": {
                "S3ContentLocation": {
                    "BucketARN": "arn:aws:s3:::ka-app-code-username",
                    "FileKey": "java-getting-started-1.0.jar"
                }
            },
            "CodeContentType": "ZIPFILE"
        },
        "EnvironmentProperties":  { 
         "PropertyGroups": [ 
            { 
               "PropertyGroupId": "ProducerConfigProperties",
               "PropertyMap" : {
                    "flink.stream.initpos" : "LATEST",
                    "aws.region" : "us-west-2",
                    "AggregationEnabled" : "false"
               }
            },
            { 
               "PropertyGroupId": "ConsumerConfigProperties",
               "PropertyMap" : {
                    "aws.region" : "us-west-2"
               }
            }
         ]
      }
    }
}
```

### Aggiungere e aggiornare le proprietà di runtime in un'applicazione esistente
<a name="how-properties-update"></a>

La seguente richiesta per l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) aggiunge o aggiorna le proprietà di runtime per un'applicazione esistente:

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 2,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": [ 
        { 
          "PropertyGroupId": "ProducerConfigProperties",
          "PropertyMap" : {
            "flink.stream.initpos" : "LATEST",
            "aws.region" : "us-west-2",
            "AggregationEnabled" : "false"
          }
        },
        { 
          "PropertyGroupId": "ConsumerConfigProperties",
          "PropertyMap" : {
            "aws.region" : "us-west-2"
          }
        }
      ]
    }
  }
}
```

**Nota**  
Se utilizzi una chiave che non ha una proprietà di runtime corrispondente in un gruppo di proprietà, il servizio gestito per Apache Flink aggiunge la coppia chiave-valore come nuova proprietà. Se utilizzi una chiave per una proprietà di runtime esistente in un gruppo di proprietà, il servizio gestito per Apache Flink aggiorna il valore della proprietà. 

### Rimuovere le proprietà di runtime
<a name="how-properties-remove"></a>

La seguente richiesta per l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) rimuove tutte le proprietà di runtime e i gruppi di proprietà da un'applicazione esistente:

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 3,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": []
    }
  }
}
```

**Importante**  
Se ometti un gruppo di proprietà esistente o una chiave di proprietà esistente in un gruppo di proprietà, tale gruppo di proprietà o proprietà vengono rimossi.

## Accedi alle proprietà di runtime in un'applicazione Managed Service for Apache Flink
<a name="how-properties-access"></a>

È possibile recuperare le proprietà di runtime nel codice dell'applicazione Java utilizzando il metodo statico `KinesisAnalyticsRuntime.getApplicationProperties()`, che restituisce un oggetto `Map<String, Properties>`.

Il seguente esempio di codice Java recupera le proprietà di runtime per l'applicazione:

```
 Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
```

Recupera un gruppo di proprietà (come oggetto `Java.Util.Properties`) nel modo seguente:

```
Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");
```

In genere si configura un'origine o un sink Apache Flink passando nell'oggetto `Properties` senza bisogno di recuperare le singole proprietà. Il seguente esempio di codice dimostra come creare un'origine Flink passando un oggetto `Properties` recuperato dalle proprietà di runtime:

```
private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
  Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
  FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<String>(new SimpleStringSchema(),
    applicationProperties.get("ProducerConfigProperties"));

  sink.setDefaultStream(outputStreamName);
  sink.setDefaultPartition("0");
  return sink;
}
```

Per alcuni esempi di codice, consultare [Esempi di creazione e utilizzo di Managed Service per applicazioni Apache Flink](examples-collapsibles.md).

# Usa i connettori Apache Flink con Managed Service for Apache Flink
<a name="how-flink-connectors"></a>

I connettori Apache Flink sono componenti software che spostano i dati da e verso un'applicazione Amazon Managed Service per Apache Flink. I connettori sono integrazioni flessibili che consentono di leggere file e directory. I connettori sono costituiti da moduli completi per l'interazione con i servizi Amazon e i sistemi di terze parti.

I tipi di connettori comprendono:
+ **Fonti:** fornisci dati all'applicazione da un flusso di dati Kinesis, un file, un argomento di Apache Kafka, un file o altre fonti di dati.
+ **Sinks:** invia i dati dall'applicazione a un flusso di dati Kinesis, a un flusso Firehose, a un argomento Apache Kafka o ad altre destinazioni di dati.
+ **I/O asincrono: fornisce l'accesso** asincrono a una fonte di dati come un database per arricchire i flussi. 

I connettori Apache Flink sono archiviati nei rispettivi archivi di origine. La versione e l'artefatto dei connettori Apache Flink cambiano a seconda della versione di Apache Flink in uso e se si utilizza l'API Table o SQL. DataStream 

Amazon Managed Service per Apache Flink supporta oltre 40 connettori sorgente e sink Apache Flink predefiniti. La tabella seguente fornisce un riepilogo dei connettori più diffusi e delle versioni associate. È inoltre possibile creare sink personalizzati utilizzando il framework Async-Sink. Per ulteriori informazioni, consulta [The Generic Asynchronous Base](https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/) Sink nella documentazione di Apache Flink.

 Per accedere all'archivio per i connettori Apache Flink, consulta. AWS [flink-connector-aws](https://github.com/apache/flink-connector-aws)

## Connettori per Flink 2.2
<a name="connectors-flink-2-2"></a>

Quando si esegue l'aggiornamento a Flink 2.2, è necessario aggiornare le dipendenze dei connettori a versioni compatibili con il runtime Flink 2.x. I connettori Flink vengono rilasciati indipendentemente dal runtime Flink e non tutti i connettori hanno ancora una versione compatibile con Flink 2.x. La tabella seguente riassume la disponibilità dei connettori di uso comune in Amazon Managed Service for Apache Flink al momento della stesura di questo documento:


**Connettori per Flink 2.2**  

| Connector | Versione Flink 2.0\$1 | Note | 
| --- | --- | --- | 
| Apache Kafka | flink-connector-kafka 4.0.0-2.0 | Consigliato per Flink 2.2 | 
| Kinesis Data Streams (fonte) | flink-connector-aws-kinesis-stream 6.0.0-2.0 | Consigliato per Flink 2.2 | 
| Kinesis Data Streams (sink) | flink-connector-aws-kinesis-stream 6.0.0-2.0 | Consigliato per Flink 2.2 | 
| FileSystem (S3, HDFS) | In bundle con Flink | Integrato nella distribuzione Flink, sempre disponibile | 
| JDBC | Non ancora rilasciato per 2.x | Nessuna versione compatibile con Flink 2.x disponibile | 
| OpenSearch | Non ancora rilasciata per 2.x | Nessuna versione compatibile con Flink 2.x disponibile | 
| Elasticsearch | Non ancora rilasciata per 2.x | Prendi in considerazione la migrazione al connettore OpenSearch  | 
| Amazon Managed Service per Prometheus | Non ancora rilasciato per 2.x | Nessuna versione compatibile con Flink 2.x al momento della scrittura | 

Se l'applicazione dipende da un connettore che non dispone ancora di una versione Flink 2.2, sono disponibili due opzioni: attendere che il connettore rilasci una versione compatibile o valutare se è possibile sostituirlo con un'alternativa (ad esempio, utilizzando il catalogo JDBC o un sink personalizzato).

**Problemi noti**
+ Le applicazioni che utilizzano il percorso `KinesisStreamsSource` con EFO (Enhanced Fan-Out/ SubscribeToShard) introdotto nei connettori v5.0.0 e v6.0.0 potrebbero fallire quando gli stream Kinesis vengono sottoposti a resharding. Si tratta di un problema noto nella comunità. Per ulteriori informazioni, vedere [FLINK-37648](https://issues.apache.org/jira/browse/FLINK-37648).
+ Le applicazioni che utilizzano il percorso `KinesisStreamsSource` with EFO (Enhanced Fan-Out/ SubscribeToShard) introdotto insieme ai connettori v5.0.0 e v6.0.0 `KinesisStreamsSink` potrebbero riscontrare dei deadlock se l'applicazione Flink è sottoposta a contropressione, con conseguente arresto completo dell'elaborazione dei dati in uno o più. TaskManagers Per ripristinare l'app sono necessarie un'operazione di arresto forzato e un'operazione di avvio dell'app. Questo è un caso secondario del problema noto nella comunità: [FLINK-34071](https://issues.apache.org/jira/browse/FLINK-34071).

## Connettori per versioni precedenti di Flink
<a name="connectors-older-versions"></a>


**Connettori per versioni precedenti di Flink**  

| Connector | Flink versione 1.15 | Flink versione 1.18 | Versioni Flink 1.19 | Versioni Flink 1.20 | 
| --- | --- | --- | --- | --- | 
| Kinesis Data Stream - API di origine DataStream e tabella | flink-connector-kinesis, 1.15.4 | flink-connector-kinesis, 4,3,0-1,18 | flink-connector-kinesis, 5,0,0-1,19 | flink-connector-kinesis, 5,0,0-1,20 | 
| API Kinesis Data Stream - Sink - DataStream e Table | flink-connector-aws-kinesis-stream, 1.15.4 | flink-connector-aws-kinesis-stream, 4.3.0-1.18 | flink-connector-aws-kinesis-stream, 5.0.0-1.19 | flink-connector-aws-kinesis-stream, 5.0.0-1.20 | 
| Kinesis Data Source/Sink Streams - - SQL | flink-sql-connector-kinesis, 1.15.4 | flink-sql-connector-kinesis, 4,3,0-1,18 | flink-sql-connector-kinesis, 5,0,0-1,19 | flink-sql-connector-kinesis-stream, 5.0.0-1.20 | 
| Kafka e Table API DataStream  | flink-connector-kafka, 1.15.4 | flink-connector-kafka, 3,2,0-1,18 | flink-connector-kafka, 3,3,0-1,19 | flink-connector-kafka, 3,3,0-1,20 | 
| Kafka - SQL | flink-sql-connector-kafka, 1.15.4 | flink-sql-connector-kafka, 3,2,0-1,18 | flink-sql-connector-kafka, 3,3,0-1,19 | flink-sql-connector-kafka, 3,3,0-1,20 | 
| API Firehose DataStream e Table | flink-connector-aws-kinesis-firehose, 1.15.4 | flink-connector-aws-firehose, 4,3,0-1,18 | flink-connector-aws-firehose, 5,0,0-1,19 | flink-connector-aws-firehose, 5,0,0-1,20 | 
| Firehose - SQL | flink-sql-connector-aws-kinesis-firehose, 1.15.4 | flink-sql-connector-aws-manichetta antincendio, 4.3.0-1.18 | flink-sql-connector-aws- manichetta antincendio, 5.0.0-1.19 | flink-sql-connector-aws- manichetta antincendio, 5.0.0-1.20 | 
| DynamoDB DataStream e API per tabelle | flink-connector-dynamodb, 3,0,0-1,15 | flink-connector-dynamodb, 4,3,0-1,18 | flink-connector-dynamodb, 5,0,0-1,19 | flink-connector-dynamodb, 5,0,0-1,20 | 
| DynamoDB - SQL | flink-sql-connector-dynamodb, 3,0-1,15 | flink-sql-connector-dynamodb, 4,3,0-1,18 | flink-sql-connector-dynamodb, 5,0,0-1,19 | flink-sql-connector-dynamodb, 5,0,0-1,20 | 
| OpenSearch - DataStream e Table API | - | flink-connector-opensearch, 1.2.0-1,18 | flink-connector-opensearch, 1,2,0-1,19 | flink-connector-opensearch, 1,2,0-1,19 | 
| OpenSearch - SQL | - | flink-sql-connector-opensearch, 1,2,0-1,18 | flink-sql-connector-opensearch, 1,2,0-1,19 | flink-sql-connector-opensearch, 1,2,0-1,19 | 
| Servizio gestito Amazon per Prometheus DataStream | - | flink-sql-connector-opensearch, 1,2,0-1,18 | flink-connector-prometheus, 1,0,0-1,19 | flink-connector-prometheus, 1,0,0-1,20 | 
| Amazon SQS DataStream e API per tabelle | - | flink-sql-connector-opensearch, 1,2,0-1,18 | flink-connector-sqs, 5,0,0-1,19 | flink-connector-sqs, 5,0,0-1,20 | 

Per ulteriori informazioni sui connettori in Amazon Managed Service for Apache Flink, consulta:
+ [DataStream Connettori API](https://docs.aws.amazon.com/managed-flink/latest/java/how-connectors.html)
+ [Tabella dei connettori API](https://docs.aws.amazon.com/managed-flink/latest/java/how-table-connectors.html)

### Problemi noti
<a name="connectors-known-issues"></a>

Esiste un problema noto di Apache Flink open source con il connettore Apache Kafka in Apache Flink 1.15. Questo problema è stato risolto nelle versioni successive di Apache Flink. 

Per ulteriori informazioni, consulta [Problemi noti](flink-1-15-2.md#flink-1-15-known-issues). 

# Implementazione della tolleranza agli errori in Managed Service for Apache Flink
<a name="how-fault"></a>

La creazione di checkpoint è il metodo utilizzato per implementare la tolleranza agli errori nel servizio gestito da Amazon per Apache Flink. Un *checkpoint* è un up-to-date backup di un'applicazione in esecuzione che viene utilizzato per il ripristino immediato in caso di interruzione o failover imprevisti dell'applicazione. 

[Per i dettagli sul checkpoint nelle applicazioni Apache Flink, consulta Checkpoints nella documentazione di Apache Flink.](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/checkpoints/)

Uno *snapshot* è un backup dello stato dell'applicazione creato e gestito manualmente. Gli snapshot consentono di ripristinare l'applicazione a uno stato precedente chiamando [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html). Per ulteriori informazioni, consulta [Gestione dei backup delle applicazioni tramite istantanee](how-snapshots.md).

Se abilitato per l'applicazione, il servizio di creazione di checkpoint fornisce tolleranza agli errori creando e caricando backup dei dati dell'applicazione in caso di riavvii imprevisti della stessa. I riavvii imprevisti dell’applicazione potrebbero essere causati da riavvii imprevisti dei processi, errori delle istanze, ecc. Ciò conferisce all’applicazione la stessa semantica dell’esecuzione senza errori durante tali riavvii. 

Se le istantanee sono abilitate per l'applicazione e configurate utilizzando l'applicazione, il servizio fornisce la semantica di elaborazione esatta [ApplicationRestoreConfiguration](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html)durante gli aggiornamenti dell'applicazione o durante il ridimensionamento o la manutenzione relativi al servizio.

## Configura il checkpoint in Managed Service for Apache Flink
<a name="how-fault-configure"></a>

È possibile configurare il comportamento di creazione di checkpoint dell'applicazione. Puoi definire se mantenere lo stato di creazione di checkpoint, con quale frequenza salvare lo stato dell'applicazione nei checkpoint e l'intervallo minimo tra la fine di un'operazione di creazione checkpoint e l'inizio di un'altra.

È possibile configurare le seguenti impostazioni utilizzando le operazioni API [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) o [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html):
+ `CheckpointingEnabled`: indica se la creazione di checkpoint è abilitata nell'applicazione.
+ `CheckpointInterval`: contiene il tempo in millisecondi tra le operazioni di checkpoint (persistenza).
+ `ConfigurationType`: imposta questo valore su `DEFAULT` per utilizzare il comportamento di creazione di checkpoint predefinito. Imposta questo valore su `CUSTOM` per configurare altri valori.
**Nota**  
Il comportamento di checkpoint predefinito è il seguente:  
**CheckpointingEnabled:** vero
**CheckpointInterval:** 60000
**MinPauseBetweenCheckpoints: 5000**
Se **ConfigurationType**è impostato su`DEFAULT`, verranno utilizzati i valori precedenti, anche se sono impostati su altri valori utilizzando o impostando i valori nel codice dell'applicazione. AWS Command Line Interface
**Nota**  
A partire da Flink 1.15, il servizio gestito per Apache Flink utilizzerà `stop-with-savepoint` durante la creazione automatica di snapshot, ovvero per l'aggiornamento, il dimensionamento o l'arresto dell'applicazione. 
+ `MinPauseBetweenCheckpoints`: il tempo minimo in millisecondi tra la fine di un'operazione di checkpoint e l'inizio di un'altra. L'impostazione di questo valore impedisce all'applicazione di continuare a creare checkpoint quando tale operazione richiede più tempo di quanto specificato dall'`CheckpointInterval`.

## Consulta gli esempi di API di checkpointing
<a name="how-fault-examples"></a>

Questa sezione include esempi di richieste di operazioni API per la configurazione della creazione di checkpoint per un'applicazione. Per informazioni su come utilizzare un file JSON come input per un'operazione API, consulta [Codice di esempio dell'API Managed Service per Apache Flink](api-examples.md).

### Configura il checkpoint per una nuova applicazione
<a name="how-fault-examples-create-config"></a>

La seguente richiesta di esempio per l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) configura la creazione di checkpoint durante la creazione di un'applicazione:

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "true",
            "CheckpointInterval": 20000,
            "ConfigurationType": "CUSTOM",
            "MinPauseBetweenCheckpoints": 10000
         }
      }
}
```

### Disabilita il checkpoint per una nuova applicazione
<a name="how-fault-examples-create-disable"></a>

La seguente richiesta di esempio per l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) disabilita la creazione di checkpoint durante la creazione di un'applicazione:

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "false"
         }
      }
}
```

### Configura il checkpoint per un'applicazione esistente
<a name="how-fault-examples-update-config"></a>

La seguente richiesta di esempio per l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) configura la creazione di checkpoint per un'applicazione esistente:

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": true,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

### Disabilita il checkpoint per un'applicazione esistente
<a name="how-fault-examples-update-update-disable"></a>

La seguente richiesta di esempio per l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) disabilita la creazione di checkpoint per un'applicazione esistente:

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": false,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

# Gestione dei backup delle applicazioni tramite istantanee
<a name="how-snapshots"></a>

Uno *snapshot* è l'implementazione nel servizio gestito per Apache Flink di un *savepoint* di Apache Flink. Uno snapshot è un backup dello stato dell'applicazione attivato, creato e gestito dall'utente o dal servizio. [Per informazioni su Apache Flink Savepoints, consulta Savepoints nella documentazione di Apache Flink.](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/) Utilizzando le istantanee, è possibile riavviare un'applicazione da una particolare istantanea dello stato dell'applicazione.

**Nota**  
È consigliabile che l'applicazione crei uno snapshot più volte al giorno per riavviarsi correttamente con i dati di stato corretti. La frequenza corretta per l'acquisizione degli snapshot dipende dalla logica di business dell'applicazione. L'acquisizione di istantanee frequenti consente di ripristinare i dati più recenti, ma aumenta i costi e richiede più risorse di sistema.

Nel servizio gestito per Apache Flink vengono gestiti gli snapshot che utilizzano le seguenti operazioni API:
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html)

Per il limite nel numero di snapshot per ogni applicazione, consulta [Servizio gestito per Apache Flink e quota di notebook Studio](limits.md). Se l'applicazione raggiunge il limite di snapshot, non è possibile creare manualmente uno snapshtot con una `LimitExceededException`. 

Il servizio gestito per Apache Flink non elimina gli snapshot. Questi snapshot dovranno essere eliminati manualmente utilizzando l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html).

Per caricare uno snapshot salvato dello stato dell'applicazione all'avvio di un'applicazione, utilizza il parametro [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html) della [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) o l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html).

**Topics**
+ [Gestisci la creazione automatica di istantanee](#how-fault-snapshot-update)
+ [Esegui il ripristino da un'istantanea che contiene dati di stato incompatibili](#how-fault-snapshot-restore)
+ [Consulta gli esempi di API snapshot](#how-fault-snapshot-examples)

## Gestisci la creazione automatica di istantanee
<a name="how-fault-snapshot-update"></a>

Se `SnapshotsEnabled` è impostato su `true` in [ ApplicationSnapshotConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html)for the application, Managed Service for Apache Flink crea e utilizza automaticamente le istantanee quando l'applicazione viene aggiornata, ridimensionata o interrotta per fornire una semantica di elaborazione esatta.

**Nota**  
L'impostazione di `ApplicationSnapshotConfiguration::SnapshotsEnabled` su `false` comporterà la perdita di dati durante gli aggiornamenti dell'applicazione.

**Nota**  
Il servizio gestito per Apache Flink attiva i savepoint intermedi durante la creazione degli snapshot. A partire dalla versione 1.15 di Flink, i savepoint intermedi non producono più effetti collaterali. [Vedi Attivazione dei punti di salvataggio.](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints)

Gli snapshot creati in modo automatico hanno le seguenti qualità:
+ L'istantanea è gestita dal servizio, ma è possibile visualizzarla utilizzando l'azione. [ ListApplicationSnapshots](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html) Gli snapshot creati automaticamente vengono conteggiati in base al limite di snapshot.
+ Se l'applicazione supera il limite di snapshot, gli snapshot creati manualmente avranno esito negativo, ma il servizio gestito per Apache Flink continuerà a creare snapshot con successo quando l'applicazione verrà aggiornata, dimensionata o interrotta. È necessario eliminare manualmente le istantanee utilizzando l'[ DeleteApplicationSnapshot](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)azione prima di crearne altre manualmente.

## Esegui il ripristino da un'istantanea che contiene dati di stato incompatibili
<a name="how-fault-snapshot-restore"></a>

Poiché gli snapshot contengono informazioni sugli operatori, il ripristino dei dati di stato da uno snapshot per un operatore che è stato modificato rispetto alla versione precedente dell'applicazione potrebbe avere risultati imprevisti. Un'applicazione genera un errore se tenta di ripristinare i dati di stato da uno snapshot che non corrisponde all'operatore corrente. L'applicazione con errori rimarrà bloccata nello stato `STOPPING` o `UPDATING`. 

Per consentire a un'applicazione di eseguire il ripristino da un'istantanea che contiene dati di stato incompatibili, impostate il `AllowNonRestoredState` parametro [FlinkRunConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_FlinkRunConfiguration.html)to `true` utilizzando l'azione. [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)

Quando un'applicazione viene ripristinata da uno snapshot obsoleto, si verifica il seguente comportamento:
+ **Operatore aggiunto:** se viene aggiunto un nuovo operatore, il savepoint non ha dati di stato per il nuovo operatore. Non si verificherà alcun errore e non è necessario impostare `AllowNonRestoredState`.
+ **Operatore eliminato:** se viene eliminato un operatore esistente, il savepoint contiene i dati di stato per l'operatore mancante. Si verificherà un errore a meno che `AllowNonRestoredState` non sia impostato su `true`.
+ **Operatore modificato:** se vengono apportate modifiche compatibili, ad esempio la modifica del tipo di parametro in un tipo compatibile, l'applicazione può eseguire il ripristino dallo snapshot obsoleto. Per ulteriori informazioni sul ripristino da istantanee, consulta [Savepoints](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/) nella documentazione di Apache Flink. Un'applicazione che utilizza Apache Flink versione 1.8 o successiva può essere ripristinata da uno snapshot con uno schema diverso. Un'applicazione che utilizza Apache Flink versione 1.6 non può essere ripristinata. Per two-phase-commit i sink, consigliamo di utilizzare lo snapshot di sistema (SW) anziché lo snapshot creato dall'utente (). CreateApplicationSnapshot

  Per Flink, il servizio gestito per Apache Flink attiva savepoint intermedi durante la creazione degli snapshot. A partire da Flink 1.15, i savepoint intermedi non producono più effetti collaterali. Consulta [Attivazione dei savepoint](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints).

Se è necessario riprendere un'applicazione incompatibile con i dati del savepoint esistenti, si consiglia di saltare il ripristino dall'istantanea impostando il parametro dell'azione su. `ApplicationRestoreType` [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)`SKIP_RESTORE_FROM_SNAPSHOT`

Per ulteriori informazioni sul modo in cui Apache Flink gestisce i dati di stato incompatibili, consulta [Evoluzione dello schema di stato](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/) nella *documentazione di Apache Flink*.

## Consulta gli esempi di API snapshot
<a name="how-fault-snapshot-examples"></a>

Questa sezione include esempi di richieste di operazioni API per l'utilizzo di snapshot con un'applicazione. Per informazioni su come utilizzare un file JSON come input per un'operazione API, consulta [Codice di esempio dell'API Managed Service per Apache Flink](api-examples.md).

### Abilita le istantanee per un'applicazione
<a name="how-fault-savepoint-examples-enable"></a>

La seguente richiesta di esempio per l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) abilita gli snapshot per un'applicazione:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSnapshotConfigurationUpdate": { 
         "SnapshotsEnabledUpdate": "true"
       }
    }
}
```

### Creazione di una snapshot
<a name="how-fault-savepoint-examples-create"></a>

La seguente richiesta di esempio per l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html) crea uno snapshot dello stato corrente dell'applicazione:

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### Elenca le istantanee di un'applicazione
<a name="how-fault-snapshot-examples-list"></a>

La seguente richiesta di esempio per l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html) elenca i primi 50 snapshot per lo stato corrente dell'applicazione:

```
{
   "ApplicationName": "MyApplication",
   "Limit": 50
}
```

### Elenca i dettagli di un'istantanea dell'applicazione
<a name="how-fault-snapshot-examples-describe"></a>

La seguente richiesta di esempio per l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html) elenca i dettagli relativi a uno snapshot specifico per l'applicazione:

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### Eliminazione di uno snapshot
<a name="how-fault-snapshot-examples-delete"></a>

La seguente richiesta di esempio per l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) elimina uno snapshot salvato in precedenza. È possibile ottenere il valore `SnapshotCreationTimestamp` utilizzando [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html) o [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html):

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot",
   "SnapshotCreationTimestamp": 12345678901.0,
}
```

### Riavviare un'applicazione utilizzando un'istantanea denominata
<a name="how-fault-snapshot-examples-load-custom"></a>

La seguente richiesta di esempio per l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) avvia l'applicazione utilizzando lo stato salvato da uno snapshot specifico:

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_CUSTOM_SNAPSHOT",
         "SnapshotName": "MyCustomSnapshot"
      }
   }
}
```

### Riavviare un'applicazione utilizzando l'istantanea più recente
<a name="how-fault-snapshot-examples-load-recent"></a>

La seguente richiesta di esempio per l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) avvia l'applicazione utilizzando lo snapshot più recente:

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
      }
   }
}
```

### Riavviare un'applicazione senza utilizzare alcuna istantanea
<a name="how-fault-snapshot-examples-load-none"></a>

La seguente richiesta di esempio per l'operazione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) avvia l'applicazione senza caricare lo stato dell'applicazione, anche se è presente uno snapshot:

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
      }
   }
}
```

# Usa gli aggiornamenti di versione sul posto per Apache Flink
<a name="how-in-place-version-upgrades"></a>

Con gli aggiornamenti di versione in loco per Apache Flink, è possibile mantenere la tracciabilità delle applicazioni su un singolo ARN tra le versioni di Apache Flink. Ciò include istantanee, log, metriche, tag, configurazioni Flink, aumenti dei limiti di risorse e altro ancora. VPCs 

Puoi eseguire aggiornamenti di versione sul posto per Apache Flink per aggiornare le applicazioni esistenti a una nuova versione di Flink in Amazon Managed Service for Apache Flink. Per eseguire questa operazione, puoi utilizzare,, SDK o. AWS CLI AWS CloudFormation AWS Console di gestione AWS

**Nota**  
Non puoi utilizzare aggiornamenti di versione in loco per Apache Flink con Amazon Managed Service per Apache Flink Studio.

**Topics**
+ [Aggiorna le applicazioni utilizzando aggiornamenti di versione in loco per Apache Flink](upgrading-applications.md)
+ [Aggiorna la tua applicazione a una nuova versione di Apache Flink](upgrading-application-new-version.md)
+ [Ripristina gli aggiornamenti delle applicazioni](rollback.md)
+ [Procedure consigliate e consigli generali per gli aggiornamenti delle applicazioni](best-practices-recommendations.md)
+ [Precauzioni e problemi noti relativi agli aggiornamenti delle applicazioni](precautions.md)
+ [Aggiornamento a Flink 2.2: guida completa](flink-2-2-upgrade-guide.md)
+ [Guida alla compatibilità dello stato per gli aggiornamenti di Flink 2.2](state-compatibility.md)

# Aggiorna le applicazioni utilizzando aggiornamenti di versione in loco per Apache Flink
<a name="upgrading-applications"></a>

Prima di iniziare, ti consigliamo di guardare questo video: [Aggiornamenti delle versioni in loco](https://www.youtube.com/watch?v=f1qGGdaP2XI).

Per eseguire aggiornamenti di versione sul posto per Apache Flink, puoi utilizzare,, SDK o. AWS CLI AWS CloudFormation AWS Console di gestione AWSÈ possibile utilizzare questa funzionalità con qualsiasi applicazione esistente utilizzata con Managed Service for Apache Flink in uno stato or. `READY` `RUNNING` Utilizza l' UpdateApplication API per aggiungere la possibilità di modificare il runtime di Flink.

## Prima dell'aggiornamento: aggiorna la tua applicazione Apache Flink
<a name="before-upgrading"></a>

Quando scrivi le tue applicazioni Flink, le raggruppi con le relative dipendenze in un JAR dell'applicazione e carichi il JAR nel tuo bucket Amazon S3. Da lì, Amazon Managed Service for Apache Flink esegue il job nel nuovo runtime Flink che hai selezionato. Potrebbe essere necessario aggiornare le applicazioni per ottenere la compatibilità con il runtime Flink a cui desideri eseguire l'aggiornamento. Possono esserci delle incongruenze tra le versioni di Flink che impediscono l'aggiornamento della versione. Più comunemente, ciò avverrà con connettori per sorgenti (ingresso) o destinazioni (lavandini, uscite) e dipendenze di Scala. Flink 1.15 e le versioni successive di Managed Service for Apache Flink sono indipendenti dalla scala e il tuo JAR deve contenere la versione di Scala che intendi utilizzare.

**Per aggiornare l'applicazione**

1. Leggi i consigli della community di Flink sull'aggiornamento delle applicazioni con state. Vedi [Aggiornamento delle applicazioni e delle versioni di Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/).

1. Leggi l'elenco dei problemi e delle limitazioni più comuni. Per informazioni, consulta [Precauzioni e problemi noti relativi agli aggiornamenti delle applicazioni](precautions.md).

1. Aggiorna le tue dipendenze e testa le tue applicazioni localmente. Queste dipendenze sono in genere:

   1. Il runtime e l'API di Flink.

   1. Connettori consigliati per il nuovo runtime Flink. Puoi trovarli [nelle versioni Release](https://docs.aws.amazon.com/managed-flink/latest/java/release-version-list.html) per il runtime specifico a cui desideri eseguire l'aggiornamento.

   1. Scala — Apache Flink è indipendente dalla scala a partire da Flink 1.15 incluso. È necessario includere le dipendenze di Scala che si desidera utilizzare nel JAR dell'applicazione.

1. Crea una nuova applicazione JAR su zipfile e caricala su Amazon S3. Ti consigliamo di utilizzare un nome diverso dal precedente JAR/zipFile. Se devi eseguire il rollback, utilizzerai queste informazioni.

1. Se esegui applicazioni con stato, ti consigliamo vivamente di scattare un'istantanea dell'applicazione corrente. In questo modo è possibile eseguire il rollback in modalità statefully in caso di problemi durante o dopo l'aggiornamento. 

# Aggiorna la tua applicazione a una nuova versione di Apache Flink
<a name="upgrading-application-new-version"></a>

Puoi aggiornare la tua applicazione Flink utilizzando l'[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)azione.

Puoi chiamare l'`UpdateApplication`API in diversi modi:
+ Utilizza il flusso **di lavoro di configurazione** esistente su Console di gestione AWS.
  + Vai alla pagina della tua app su Console di gestione AWS.
  + Scegli **Configura**.
  + Seleziona il nuovo runtime e l'istantanea da cui vuoi iniziare, nota anche come configurazione di ripristino. Utilizza l'impostazione più recente come configurazione di ripristino per avviare l'app dall'ultima istantanea. Seleziona la nuova applicazione aggiornata JAR/zip su Amazon S3.
+ Usa l'azione AWS CLI [update-application](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html).
+ Usa CloudFormation (CFN).
  + Aggiorna il [RuntimeEnvironment](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisanalyticsv2-application.html#cfn-kinesisanalyticsv2-application-runtimeenvironment)campo. In precedenza, CloudFormation eliminava l'applicazione e ne creava una nuova, causando la perdita delle istantanee e della cronologia dell'altra app. Ora CloudFormation aggiorna la RuntimeEnvironment versione in uso e non elimina l'applicazione. 
+ Usa l' AWS SDK.
  + Consulta la documentazione SDK per il linguaggio di programmazione che preferisci. Per informazioni, consulta [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html). 

È possibile eseguire l'aggiornamento mentre l'applicazione è in `RUNNING` stato o mentre l'applicazione è arrestata in `READY` tale stato. Amazon Managed Service for Apache Flink esegue la convalida per verificare la compatibilità tra la versione di runtime originale e la versione di runtime di destinazione. Questo controllo di compatibilità viene eseguito quando lo esegui [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)mentre sei in `RUNNING` uno stato o successivamente [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)se esegui l'upgrade mentre sei in stato. `READY` 

## Aggiorna un'applicazione in `RUNNING` stato
<a name="upgrading-running"></a>

L'esempio seguente mostra l'aggiornamento di un'app nello `RUNNING` stato denominato `UpgradeTest` Flink 1.18 negli Stati Uniti orientali (Virginia settentrionale) utilizzando AWS CLI e l'avvio dell'app aggiornata dall'ultima istantanea. 

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --run-configuration-update '{"ApplicationRestoreConfiguration": '\
 '{"ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"}}' \
 --current-application-version-id ${current_application_version}
```
+ Se hai abilitato gli snapshot del servizio e desideri continuare l'applicazione dallo snapshot più recente, Amazon Managed Service for Apache Flink verifica che il runtime dell'`RUNNING`applicazione corrente sia compatibile con il runtime di destinazione selezionato.
+ Se hai specificato uno snapshot da cui continuare il runtime di destinazione, Amazon Managed Service for Apache Flink verifica che il runtime di destinazione sia compatibile con lo snapshot specificato. Se il controllo di compatibilità fallisce, la richiesta di aggiornamento viene rifiutata e l'applicazione rimane invariata nello stato. `RUNNING`
+ Se scegli di avviare l'applicazione senza uno snapshot, Amazon Managed Service for Apache Flink non esegue alcun controllo di compatibilità.
+ Se l'applicazione aggiornata fallisce o rimane bloccata in uno `UPDATING` stato transitivo, segui le istruzioni nella [Ripristina gli aggiornamenti delle applicazioni](rollback.md) sezione per tornare allo stato integro. 

**Flusso di processo per l'esecuzione di applicazioni statali**

![\[Il diagramma seguente rappresenta il flusso di lavoro consigliato per aggiornare l'applicazione durante l'esecuzione. Partiamo dal presupposto che l'applicazione sia dotata di stato e che le istantanee siano state abilitate. Per questo flusso di lavoro, al momento dell'aggiornamento, ripristini l'applicazione dall'ultima istantanea scattata automaticamente da Amazon Managed Service for Apache Flink prima dell'aggiornamento.\]](http://docs.aws.amazon.com/it_it/managed-flink/latest/java/images/in-place-update-while-running.png)


## **Aggiorna un'applicazione nello stato READY**
<a name="upgrading-ready"></a>

L'esempio seguente mostra l'aggiornamento di un'app nello `READY` stato denominato `UpgradeTest` Flink 1.18 negli Stati Uniti orientali (Virginia settentrionale) utilizzando il. AWS CLI Non esiste un'istantanea specificata per avviare l'app perché l'applicazione non è in esecuzione. È possibile specificare un'istantanea quando si invia la richiesta di avvio dell'applicazione.

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --current-application-version-id ${current_application_version}
```
+ È possibile aggiornare il runtime delle applicazioni in `READY` stato a qualsiasi versione di Flink. Amazon Managed Service for Apache Flink non esegue alcun controllo finché non avvii l'applicazione.
+  Amazon Managed Service for Apache Flink esegue solo controlli di compatibilità sullo snapshot selezionato per avviare l'app. Si tratta di controlli di compatibilità di base che seguono la tabella di compatibilità [Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#compatibility-table). Controllano solo la versione di Flink con cui è stata scattata l'istantanea e la versione di Flink che hai scelto come target. Se il runtime Flink dell'istantanea selezionata è incompatibile con il nuovo runtime dell'app, la richiesta di avvio potrebbe essere rifiutata.

**Flusso di processo per applicazioni Ready State**

![\[Il diagramma seguente rappresenta il flusso di lavoro consigliato per aggiornare l'applicazione mentre è pronta. Partiamo dal presupposto che l'applicazione sia dotata di stato e che le istantanee siano state abilitate. Per questo flusso di lavoro, al momento dell'aggiornamento, ripristini l'applicazione dall'ultima istantanea scattata automaticamente da Amazon Managed Service for Apache Flink quando l'applicazione è stata interrotta.\]](http://docs.aws.amazon.com/it_it/managed-flink/latest/java/images/in-place-update-while-ready.png)


# Ripristina gli aggiornamenti delle applicazioni
<a name="rollback"></a>

Se riscontri problemi con l'applicazione o riscontri incongruenze nel codice dell'applicazione tra le versioni di Flink, puoi eseguire il rollback utilizzando AWS CLI, AWS CloudFormation, AWS SDK o. Console di gestione AWS Gli esempi seguenti mostrano come si presenta il rollback in diversi scenari di errore.

## L'aggiornamento del runtime è riuscito, l'applicazione è in `RUNNING` stato, ma il processo non riesce e viene riavviato continuamente
<a name="succeeded-restarting"></a>

Supponiamo che stiate cercando di aggiornare un'applicazione stateful denominata `TestApplication` da Flink 1.15 a Flink 1.18 negli Stati Uniti orientali (Virginia settentrionale). Tuttavia, l'applicazione Flink 1.18 aggiornata non si avvia o si riavvia costantemente, anche se l'applicazione è in stato. `RUNNING` Si tratta di uno scenario di errore comune. Per evitare ulteriori tempi di inattività, si consiglia di ripristinare immediatamente l'applicazione alla versione precedente in esecuzione (Flink 1.15) e di diagnosticare il problema in un secondo momento.

Per ripristinare l'applicazione alla versione precedente in esecuzione, utilizzate il comando [AWS CLI rollback-application](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) o l'azione API. [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) Questa azione API ripristina le modifiche che hai apportato e che hanno portato alla versione più recente. Quindi riavvia l'applicazione utilizzando l'ultima istantanea riuscita. 

Ti consigliamo vivamente di scattare un'istantanea con l'app esistente prima di tentare l'aggiornamento. Ciò contribuirà a evitare la perdita di dati o la necessità di rielaborarli. 

In questo scenario di errore, l'applicazione non CloudFormation verrà ripristinata automaticamente. È necessario aggiornare il CloudFormation modello in modo che punti al runtime precedente e al codice precedente per CloudFormation forzare l'aggiornamento dell'applicazione. Altrimenti, CloudFormation si presuppone che l'applicazione sia stata aggiornata quando passa allo `RUNNING` stato.

## Ripristino di un'applicazione bloccata `UPDATING`
<a name="stuck-updating"></a>

Se l'applicazione si blocca nello `AUTOSCALING` stato `UPDATING` or dopo un tentativo di aggiornamento, Amazon Managed Service for Apache Flink offre il AWS CLI comando [rollback-applications](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) o l'azione [RollbackApplications](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html)API che può ripristinare l'applicazione alla versione precedente al blocco o allo stato. `UPDATING` `AUTOSCALING` Questa API ripristina le modifiche che hai apportato che hanno causato il blocco o lo stato transitivo dell'applicazione. `UPDATING` `AUTOSCALING`

# Procedure consigliate e consigli generali per gli aggiornamenti delle applicazioni
<a name="best-practices-recommendations"></a>
+ Prova la nuova versione job/runtime senza stato in un ambiente non di produzione prima di tentare un aggiornamento in produzione.
+ Valuta la possibilità di testare prima lo stateful upgrade con un'applicazione non di produzione.
+ Assicurati che il tuo nuovo job graph abbia uno stato compatibile con l'istantanea che utilizzerai per avviare l'applicazione aggiornata.
  + Assicurati che i tipi memorizzati negli stati dell'operatore rimangano gli stessi. Se il tipo è cambiato, Apache Flink non può ripristinare lo stato dell'operatore.
  + Assicurati che l'operatore IDs impostato utilizzando il `uid` metodo rimanga lo stesso. Apache Flink consiglia vivamente l'assegnazione di elementi unici IDs agli operatori. Per ulteriori informazioni, consulta [Assigning Operator IDs nella documentazione](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#assigning-operator-ids) di Apache Flink.

    Se non lo assegnate IDs ai vostri operatori, Flink li genera automaticamente. In tal caso, potrebbero dipendere dalla struttura del programma e, se modificate, causare problemi di compatibilità. Flink utilizza Operator IDs per abbinare lo stato nell'istantanea all'operatore. La modifica dell'operatore IDs comporta il mancato avvio dell'applicazione, l'eliminazione dello stato memorizzato nell'istantanea e l'avvio del nuovo operatore senza stato.
  + Non modificate la chiave utilizzata per memorizzare lo stato della chiave.
  + Non modificate il tipo di input degli operatori statici come window o join. Ciò modifica implicitamente il tipo di stato interno dell'operatore, causando un'incompatibilità di stato.

# Precauzioni e problemi noti relativi agli aggiornamenti delle applicazioni
<a name="precautions"></a>

## Kafka Commit sul checkpoint fallisce ripetutamente dopo il riavvio del broker
<a name="apache-kafka-connector"></a>

Esiste un problema noto di Apache Flink open source con il connettore Apache Kafka nella versione 1.15 di Flink, causato da un bug critico di Kafka Client open source in Kafka Client 2.8.1. [Per ulteriori informazioni, vedi [Kafka Commit on checkpointing fallisce ripetutamente dopo il riavvio del broker e non è in grado di ripristinare](https://issues.apache.org/jira/browse/FLINK-28060) la connessione al coordinatore del gruppo dopo un'eccezione. KafkaConsumer commitOffsetAsync ](https://issues.apache.org/jira/browse/KAFKA-13840)

Per evitare questo problema, ti consigliamo di utilizzare Apache Flink 1.18 o versione successiva in Amazon Managed Service for Apache Flink.

## Limitazioni note della compatibilità tra stati
<a name="state-precautions"></a>
+ Se utilizzi l'API Table, Apache Flink non garantisce la compatibilità degli stati tra le versioni di Flink. Per ulteriori informazioni, consulta [Stateful Upgrades and Evolution](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution) nella documentazione di Apache Flink.
+ Gli stati di Flink 1.6 non sono compatibili con Flink 1.18. L'API rifiuta la tua richiesta se tenti di eseguire l'aggiornamento da 1.6 a 1.18 e versioni successive con state. Puoi eseguire l'aggiornamento alla versione 1.8, 1.11, 1.13 e 1.15 e scattare un'istantanea, quindi eseguire l'aggiornamento alla versione 1.18 e successive. Per ulteriori informazioni, consulta [Aggiornamento delle applicazioni e delle versioni Flink nella documentazione di Apache Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/).

## Problemi noti con il connettore Flink Kinesis
<a name="kinesis-connector-precautions"></a>
+ Se si utilizza Flink 1.11 o versioni precedenti e si utilizza il `amazon-kinesis-connector-flink` connettore per il supporto Enhanced-fan-out (EFO), è necessario eseguire ulteriori passaggi per un aggiornamento stateful a Flink 1.13 o versione successiva. Ciò è dovuto alla modifica del nome del pacchetto del connettore. Per ulteriori informazioni, consulta [amazon-kinesis-connector-flink](https://github.com/awslabs/amazon-kinesis-connector-flink).

  Il `amazon-kinesis-connector-flink` connettore per Flink 1.11 e versioni precedenti utilizza la confezione`software.amazon.kinesis`, mentre il connettore Kinesis per Flink 1.13 e versioni successive utilizza. `org.apache.flink.streaming.connectors.kinesis` [Utilizzate questo strumento per supportare la migrazione: -state-migrator. amazon-kinesis-connector-flink](https://github.com/awslabs/amazon-kinesis-connector-flink-state-migrator)
+ Se si utilizza Flink 1.13 o versioni precedenti `FlinkKinesisProducer` e si esegue l'aggiornamento a Flink 1.15 o successivo, per un aggiornamento statico è necessario continuare a utilizzare in Flink 1.15 o versione successiva, anziché la versione più recente. `FlinkKinesisProducer` `KinesisStreamsSink` Tuttavia, se hai già un `uid` set personalizzato sul tuo sink, dovresti essere in grado di passare a perché non mantiene lo stato. `KinesisStreamsSink` `FlinkKinesisProducer` Flink lo tratterà come lo stesso operatore perché `uid` è impostata una personalizzazione.

## Applicazioni Flink scritte in Scala
<a name="scala-precautions"></a>
+ A partire da Flink 1.15, Apache Flink non include Scala nel runtime. È necessario includere la versione di Scala che si desidera utilizzare e altre dipendenze di Scala nel codice JAR/zip quando si esegue l'aggiornamento a Flink 1.15 o versioni successive. Per ulteriori informazioni, consulta [Amazon Managed Service for Apache Flink for Apache Flink release 1.15.2](https://docs.aws.amazon.com/managed-flink/latest/java/flink-1-15-2.html).
+ Se la tua applicazione utilizza Scala e la stai aggiornando da Flink 1.11 o precedente (Scala 2.11) a Flink 1.13 (Scala 2.12), assicurati che il codice utilizzi Scala 2.12. Altrimenti, l'applicazione Flink 1.13 potrebbe non riuscire a trovare le classi Scala 2.11 nel runtime Flink 1.13.

## Aspetti da considerare quando si esegue il downgrade dell'applicazione Flink
<a name="downgrading-precautions"></a>
+ Il downgrade delle applicazioni Flink è possibile, ma limitato ai casi in cui l'applicazione era precedentemente in esecuzione con la versione precedente di Flink. Per un aggiornamento statico, Managed Service for Apache, Flink richiederà l'utilizzo di un'istantanea scattata con una versione corrispondente o precedente per il downgrade
+ Se state aggiornando il runtime da Flink 1.13 o versione successiva a Flink 1.11 o precedente e se l'app utilizza il backend di stato, l'applicazione fallirà continuamente. HashMap 

# Aggiornamento a Flink 2.2: guida completa
<a name="flink-2-2-upgrade-guide"></a>

Questa guida fornisce step-by-step istruzioni per aggiornare l'applicazione Amazon Managed Service for Apache Flink da Flink 1.x a Flink 2.2. Si tratta di un aggiornamento di versione importante con modifiche epocali che richiedono un'attenta pianificazione e test.

**L'aggiornamento della versione principale è unidirezionale**  
L'operazione di aggiornamento può spostare l'applicazione da Flink 1.x a 2.2 con conservazione dello stato, ma non è possibile tornare indietro da 2.2 a 1.x con lo stato 2.2. Se l'applicazione non funziona correttamente dopo l'aggiornamento, utilizzate l'API Rollback per tornare alla versione 1.x con lo stato 1.x originale dell'ultima istantanea.

## Prerequisiti
<a name="upgrade-guide-prerequisites"></a>

Prima di iniziare l'aggiornamento:
+ Revisione [Interruzione delle modifiche e delle deprecazioni](flink-2-2.md#flink-2-2-breaking-changes)
+ Revisione [Guida alla compatibilità dello stato per gli aggiornamenti di Flink 2.2](state-compatibility.md)
+ Assicurati di disporre di un ambiente non di produzione per i test
+ Documenta la configurazione e le dipendenze correnti dell'applicazione

## Comprendere i percorsi di migrazione
<a name="upgrade-guide-migration-paths"></a>

L'esperienza di aggiornamento dipende dalla compatibilità dell'applicazione con Flink 2.2. La comprensione di questi percorsi aiuta a prepararsi in modo appropriato e a stabilire aspettative realistiche.

**Percorso 1: Stato binario e dell'applicazione compatibili**

**Cosa aspettarsi:**
+ Richiama l'operazione di aggiornamento
+ Completa la migrazione alla versione 2.2 con la transizione dello stato dell'applicazione: → → `RUNNING` `UPDATING` `RUNNING`
+ Conserva tutto lo stato dell'applicazione senza perdita o rielaborazione dei dati
+ Stessa esperienza delle migrazioni di versioni minori

Ideale per: applicazioni stateless o applicazioni che utilizzano serializzazione compatibile (Avro, schemi Protobuf compatibili, senza raccolte) POJOs 

**Percorso 2: incompatibilità binarie**

**Cosa aspettarsi:**
+ Richiama l'operazione di aggiornamento
+ L'operazione fallisce e viene a galla l'incompatibilità binaria tramite l'API Operations e i log
+ Con il rollback automatico abilitato: le applicazioni vengono ripristinate automaticamente in pochi minuti senza l'intervento dell'utente
+ Con il rollback automatico disabilitato: le applicazioni rimangono in esecuzione senza elaborazione dei dati; si torna manualmente alla versione precedente
+ Una volta corretto il file binario, utilizza l'[UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) per un'esperienza simile a Path 1

Ideale per: applicazioni che utilizzano Remove e APIs che vengono rilevate durante l'avvio del job Flink

**Percorso 3: Stato dell'applicazione incompatibile**

**Cosa aspettarsi:**
+ Richiama l'operazione di aggiornamento
+ Inizialmente la migrazione sembra avere esito positivo
+ Le applicazioni entrano nei cicli di riavvio in pochi secondi quando il ripristino dello stato fallisce
+ Rileva gli errori tramite CloudWatch metriche che mostrano riavvii continui
+ Richiama manualmente l'operazione Rollback
+ Ritorna alla produzione entro pochi minuti dall'avvio del rollback
+ Verifica la tua [Migrazione statale](state-compatibility.md#state-compat-migration) candidatura

Ideale per: applicazioni con incompatibilità di serializzazione dello stato (con raccolte, determinati stati serializzati POJOs con Kryo)

**Nota**  
Si consiglia vivamente di creare una replica dell'applicazione di produzione e di testare ciascuna delle seguenti fasi dell'aggiornamento sulla replica prima di seguire gli stessi passaggi per l'applicazione di produzione.

## Fase 1: preparazione
<a name="upgrade-guide-phase-1"></a>

**Aggiorna il codice dell'applicazione**

Aggiorna il codice dell'applicazione per renderlo compatibile con Flink 2.2:
+ **Aggiorna le dipendenze di Flink** alla versione 2.2.0 nel tuo o `pom.xml` `build.gradle`
+ **Aggiorna le dipendenze dei connettori alle versioni compatibili** con Flink 2.2 (vedi) [Disponibilità del connettore](flink-2-2.md#flink-2-2-connectors)
+ **Rimuovi l'utilizzo** di API obsolete:
  + Sostituisci DataSet l'API con DataStream API o Table API/SQL
  + Sostituisci la versione precedente`SourceFunction`/`SinkFunction`con FLIP-27 Source e FLIP-143 Sink APIs
  + Sostituisci l'utilizzo dell'API Scala con l'API Java
+ **Aggiornamento a Java 17**

**Carica il codice dell'applicazione aggiornato**
+ Crea il JAR della tua applicazione con le dipendenze di Flink 2.2
+ Carica su Amazon S3 con un **nome di file diverso** dal tuo JAR corrente (ad esempio,) `my-app-flink-2.2.jar`
+ Prendi nota del bucket e della chiave S3 da utilizzare nella fase di aggiornamento

## Fase 2: abilitare il rollback automatico
<a name="upgrade-guide-phase-2"></a>

Il rollback automatico consente ad Amazon Managed Service for Apache Flink di ripristinare automaticamente la versione precedente se l'aggiornamento non riesce.

**Verifica lo stato del rollback automatico**

*Console di gestione AWS:*

1. Vai alla tua applicazione

1. Scegli **Configuration** (Configurazione)

1. In **Impostazioni dell'applicazione**, verifica che il **rollback del sistema sia abilitato**

*AWS CLI:*

```
aws kinesisanalyticsv2 describe-application \
    --application-name MyApplication \
    --query 'ApplicationDetail.ApplicationConfigurationDescription.ApplicationSystemRollbackConfigurationDescription.RollbackEnabled'
```

**Abilita il rollback automatico (se non abilitato)**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --application-configuration-update '{
        "ApplicationSystemRollbackConfigurationUpdate": {
            "RollbackEnabledUpdate": true
        }
    }'
```

## Fase 3: scattare un'istantanea (opzionale)
<a name="upgrade-guide-phase-3"></a>

Se le istantanee automatiche sono abilitate per l'applicazione, potete saltare questo passaggio, altrimenti scattate un'istantanea dell'applicazione per salvare lo stato dell'applicazione prima dell'aggiornamento.

**Scatta un'istantanea dall'applicazione in esecuzione**

*Console di gestione AWS:*

1. Passa alla tua applicazione

1. Scegli **Istantanee**

1. Scegli **Crea istantanea**

1. Immettete il nome di un'istantanea (ad esempio,) `pre-flink-2.2-upgrade`

1. Scegli **Crea**

*AWS CLI:*

```
aws kinesisanalyticsv2 create-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

**Verifica la creazione dell'istantanea**

```
aws kinesisanalyticsv2 describe-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

Attendi fino a quando non `SnapshotStatus` è così `READY` prima di procedere.

## Fase 4: aggiornamento dell'applicazione
<a name="upgrade-guide-phase-4"></a>

È possibile aggiornare l'applicazione Flink utilizzando l'[https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)azione.

Puoi chiamare l'`UpdateApplication`API in diversi modi:
+ **Usa il Console di gestione AWS.**
  + Vai alla pagina della tua app su Console di gestione AWS.
  + Scegli **Configura**.
  + Seleziona il nuovo runtime e l'istantanea da cui vuoi iniziare, nota anche come configurazione di ripristino. Utilizza l'impostazione più recente come configurazione di ripristino per avviare l'app dall'ultima istantanea. Seleziona la nuova applicazione aggiornata JAR/zip su Amazon S3.
+ **Usa l'azione. AWS CLI[https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html)**
+ **Usa CloudFormation.**
  + Aggiorna il `RuntimeEnvironment` campo. In precedenza, CloudFormation eliminava l'applicazione e ne creava una nuova, causando la perdita delle istantanee e della cronologia dell'altra app. Ora CloudFormation aggiorna la `RuntimeEnvironment` versione in uso e non elimina l'applicazione.
+ **Usa l' AWS SDK.**
  + Consulta la documentazione SDK per il linguaggio di programmazione che preferisci. Per informazioni, consulta [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html).

È possibile eseguire l'aggiornamento mentre l'applicazione è in `RUNNING` stato o mentre l'applicazione è arrestata in `READY` tale stato. Amazon Managed Service for Apache Flink convalida la compatibilità tra la versione di runtime originale e la versione di runtime di destinazione. Questo controllo di compatibilità viene eseguito quando lo esegui `UpdateApplication` mentre sei in `RUNNING` uno stato o successivamente `StartApplication` se esegui l'upgrade mentre sei in stato. `READY`

**Aggiornamento dallo stato RUNNING**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

**Aggiornamento dallo stato READY**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

## Fase 5: aggiornamento del monitor
<a name="upgrade-guide-phase-5"></a>

**Verifica della compatibilità**
+ Utilizza l'API Operations per verificare lo stato dell'aggiornamento. In caso di incompatibilità binarie o problemi con l'avvio del processo, l'operazione di aggiornamento avrà esito negativo e verranno generati i log.
+ Se l'operazione di aggiornamento è riuscita ma l'applicazione è bloccata nei cicli di riavvio, significa che lo stato è incompatibile con la nuova versione di Flink o che c'è un problema con il codice aggiornato. Scopri come identificare [Guida alla compatibilità dello stato per gli aggiornamenti di Flink 2.2](state-compatibility.md) i problemi di incompatibilità tra stati.

**Monitora lo stato delle applicazioni**

*Stato dell'applicazione:*
+ Lo stato della domanda dovrebbe cambiare: `RUNNING` `UPDATING` → `RUNNING`
+ Controlla il runtime dell'applicazione. Se è 2.2, l'operazione di aggiornamento è andata a buon fine.
+ Se l'applicazione è attiva `RUNNING` ma è ancora nel runtime precedente, è stato attivato il rollback automatico. L'API Operations mostrerà il funzionamento come. `FAILED` Controlla i log per trovare l'eccezione in caso di errore.

Inoltre, monitora queste metriche in: CloudWatch

*Metrica di riavvio:*
+ `numRestarts`: Monitora eventuali riavvii imprevisti: l'aggiornamento ha esito positivo se `numRestarts` è pari a zero `uptime` e/o `runningTime` è in aumento.

*Metriche di Checkpoint:*
+ `lastCheckpointDuration`: Dovrebbe essere simile ai valori di pre-aggiornamento
+ `numberOfFailedCheckpoints`: Dovrebbe rimanere su 0

## Fase 6: convalida del comportamento dell'applicazione
<a name="upgrade-guide-phase-6"></a>

Dopo l'esecuzione dell'applicazione su Flink 2.2:

**Validazione funzionale**
+ Verifica che i dati vengano letti dalle fonti
+ Verifica che i dati vengano scritti nei sink
+ Verifica che la logica aziendale produca i risultati attesi
+ Confronta l'output con la linea di base precedente all'aggiornamento

**Convalida delle prestazioni**
+ Monitora le metriche di latenza (end-to-end tempo di elaborazione)
+ Monitora le metriche del throughput (record al secondo)
+ Monitora la durata e le dimensioni del checkpoint
+ Monitora l'utilizzo della memoria e della CPU

**Esegui per più di 24 ore**

Consenti all'applicazione di funzionare per almeno 24 ore in produzione per garantire:
+ Nessuna perdita di memoria
+ Comportamento stabile ai checkpoint
+ Nessun riavvio imprevisto
+ Produttività costante

## Fase 7: procedure di rollback
<a name="upgrade-guide-phase-7"></a>

Se l'aggiornamento non riesce o l'applicazione è in esecuzione ma non integra, tornate alla versione precedente.

**Rollback automatico**

Se il rollback automatico è abilitato e l'aggiornamento non riesce durante l'avvio, Amazon Managed Service for Apache Flink torna automaticamente alla versione precedente.

**Rollback manuale**

Se l'applicazione è in esecuzione ma non integra, utilizza l'`RollbackApplication`API:

*Console di gestione AWS:*

1. Vai alla tua applicazione

1. Scegli **Azioni** → **Ripristina**

1. Conferma il rollback

*AWS CLI:*

```
aws kinesisanalyticsv2 rollback-application \
    --application-name MyApplication \
    --current-application-version-id <version-id>
```

**Cosa succede durante il rollback:**
+ L'applicazione si arresta
+ Il runtime ritorna alla versione precedente di Flink
+ Il codice dell'applicazione torna al JAR precedente
+ **L'applicazione viene riavviata dall'ultima istantanea riuscita scattata prima dell'aggiornamento**

**Importante**  
Non è possibile ripristinare un'istantanea di Flink 2.2 su Flink 1.x
Rollback utilizza l'istantanea scattata prima dell'aggiornamento
Scatta sempre un'istantanea prima dell'aggiornamento (Fase 3)

## Fasi successive
<a name="upgrade-guide-next-steps"></a>

Per domande o problemi durante l'aggiornamento, consulta [Risoluzione dei problemi relativi al servizio gestito per Apache Flink](troubleshooting.md) o contatta l' AWS assistenza.

# Guida alla compatibilità dello stato per gli aggiornamenti di Flink 2.2
<a name="state-compatibility"></a>

Durante l'aggiornamento da Flink 1.x a Flink 2.2, problemi di compatibilità dello stato possono impedire il ripristino dell'applicazione dalle istantanee. Questa guida aiuta a identificare potenziali problemi di compatibilità e fornisce strategie di migrazione.

## Comprensione delle modifiche alla compatibilità degli stati
<a name="state-compat-understanding"></a>

Amazon Managed Service for Apache Flink 2.2 introduce diverse modifiche alla serializzazione che influiscono sulla compatibilità degli stati. Le seguenti sono le principali:
+ **Aggiornamento della versione Kryo**: Apache Flink 2.2 aggiorna il serializzatore Kryo in bundle dalla versione 2 alla versione 5. Poiché Kryo v5 utilizza un formato di codifica binaria diverso da Kryo v2, qualsiasi stato dell'operatore serializzato tramite Kryo in un savepoint Flink 1.x non può essere ripristinato in Flink 2.2.
+ **Serializzazione delle raccolte Java: in Flink 1.x, le raccolte Java (come, and) all'interno venivano serializzate** utilizzando Kryo. `HashMap` `ArrayList` `HashSet` POJOs Flink 2.2 introduce serializzatori ottimizzati specifici per le raccolte che sono incompatibili con lo stato serializzato Kryo della versione 1.x. Le applicazioni che utilizzano raccolte Java con serializzatori POJO o Kryo in 1.x non possono ripristinare questo stato in Flink 2.2. Consultate la [documentazione](https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/datastream/fault-tolerance/serialization/types_serialization/) di Flink per maggiori dettagli sui tipi di dati e sulla serializzazione.
+ **Compatibilità con Kinesis Connector: la versione del connettore** Kinesis Data Streams (KDS) precedente alla 5.0 mantiene uno stato che non è compatibile con il connettore Kinesis Flink 2.2 versione 6.0. È necessario migrare alla versione 5.0 o successiva del connettore prima dell'aggiornamento.

## Riferimento sulla compatibilità della serializzazione
<a name="state-compat-reference"></a>

Esamina tutte le dichiarazioni di stato nell'applicazione e abbina i tipi di serializzazione alla tabella seguente. Se un tipo di stato è incompatibile, consulta la [Migrazione statale](#state-compat-migration) sezione prima di procedere con l'aggiornamento.


**Riferimento sulla compatibilità della serializzazione**  

| Tipo di serializzazione | Compatibile? | Informazioni | 
| --- | --- | --- | 
| Avro (SpecificRecord,GenericRecord) | Sì | Utilizza il proprio formato binario indipendente da Kryo. Assicurati di utilizzare le informazioni di tipo Avro native di Flink, non Avro registrato come serializzatore Kryo. | 
| Protobug | Sì | Utilizza la propria codifica binaria indipendente da Kryo. Verifica che le modifiche allo schema seguano le regole di evoluzione retrocompatibili. | 
| POJOs senza collezioni | Sì | Gestito dal serializzatore POJO di Flink, ma solo se la classe soddisfa tutti i criteri POJO: public class, public no-arg constructor, tutti i campi pubblici o accessibili tramite getters/setters e tutti i tipi di campo stessi serializzabili da Flink. Un POJO che viola uno di questi criteri ritorna silenziosamente a Kryo e diventa incompatibile. | 
| Personalizzato TypeSerializers | Sì | Compatibile solo se il serializzatore non delega a Kryo internamente. | 
| Stato delle API SQL e Table | Sì (con avvertenza) | Utilizza i serializzatori interni di Flink. Tuttavia, Apache Flink non garantisce la compatibilità dello stato tra le versioni principali delle applicazioni Table API. Esegui prima il test in un ambiente non di produzione. | 
| POJOs con collezioni Java (HashMap,ArrayList,HashSet) | No | In Flink 1.x, le raccolte interne POJOs sono state serializzate tramite Kryo v2. Flink 2.2 introduce serializzatori di raccolte dedicati il cui formato binario è incompatibile con il formato Kryo v2. | 
| Classi di casi Scala | No | Serializzato tramite Kryo in Flink 1.x. L'aggiornamento da Kryo v2 a v5 modifica il formato binario. | 
| Record Java | No | In genere si ricorre alla serializzazione Kryo in Flink 1.x. Verifica testando con. disableGenericTypes() | 
| Tipi di librerie di terze parti | No | I tipi senza un serializzatore personalizzato registrato ricadono su Kryo. La modifica del formato binario da Kryo v2 a v5 interrompe la compatibilità. | 
| Qualsiasi tipo che utilizza Kryo fallback | No | Se Flink non è in grado di gestire un tipo con un serializzatore integrato o registrato, torna a Kryo. Tutti gli stati serializzati con Kryo a partire dalla versione 1.x sono incompatibili con 2.2. | 

## Metodi diagnostici
<a name="state-compat-diagnostics"></a>

[È possibile identificare i problemi di compatibilità degli stati in modo proattivo esaminando i log delle applicazioni o ispezionando i log dopo l'operazione dell'API. UpdateApplication ](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)

**Identifica il fallback di Kryo nella tua applicazione**

Puoi utilizzare il seguente schema regex nei tuoi log per identificare il fallback di Kryo nella tua applicazione:

```
Class class (?<className>[^\s]+) cannot be used as a POJO type
```

Registro di esempio:

```
Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance and
schema evolution.
```

Se l'aggiornamento non riesce utilizzando l' UpdateApplication API, le seguenti eccezioni potrebbero segnalare un'incompatibilità di stato basata sul serializzatore:

**IndexOutOfBoundsException**

```
Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923)
    ... 23 more
```

**StateMigrationException (POJOSerializer)**

```
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be
incompatible with the old state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).
```

## Lista di controllo prima dell'aggiornamento
<a name="state-compat-checklist"></a>
+ Controlla tutte le dichiarazioni statali contenute nella tua domanda
+ Verifica POJOs con collections (`HashMap`,`ArrayList`,`HashSet`)
+ Verifica i metodi di serializzazione per ogni tipo di stato
+ Crea un'applicazione prod replica e verifica la compatibilità dello stato utilizzando l' UpdateApplication API su questa replica
+ Se lo stato è incompatibile, seleziona una strategia da [Migrazione statale](#state-compat-migration)
+ Abilita il rollback automatico nella configurazione dell'applicazione Flink di produzione

## Migrazione statale
<a name="state-compat-migration"></a>

**Ricostruisci lo stato completo**

Ideale per applicazioni in cui lo stato può essere ricostruito a partire dai dati di origine.

Se l'applicazione è in grado di ricostruire lo stato dai dati di origine:

1. Arresta l'applicazione Flink 1.x

1. Esegui l'aggiornamento a Flink 2.x con codice aggiornato

1. Inizia con `SKIP_RESTORE_FROM_SNAPSHOT`

1. Consenti all'applicazione di ricostruire lo stato

```
aws kinesisanalyticsv2 start-application \
    --application-name MyApplication \
    --run-configuration '{
        "ApplicationRestoreConfiguration": {
            "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
        }
    }'
```

## Best practice
<a name="state-compat-best-practices"></a>

1. **Usa sempre Avro o Protobuf per stati complessi: forniscono l'evoluzione dello** schema e sono indipendenti da Kryo

1. **Evita le raccolte in POJOs**: usa invece la versione nativa di Flink `ListState` `MapState`

1. **Prova il ripristino dello stato localmente**: prima dell'aggiornamento della produzione, esegui il test con istantanee effettive

1. **Scatta istantanee frequentemente**, soprattutto prima degli aggiornamenti delle versioni principali

1. **Abilita il rollback automatico**: configura l'applicazione MSF per il rollback automatico in caso di errore

1. **Documenta i tipi di stato**: conserva la documentazione di tutti i tipi di stato e dei relativi metodi di serializzazione

1. **Monitora le dimensioni dei checkpoint**: l'aumento delle dimensioni dei checkpoint può indicare problemi di serializzazione

## Fasi successive
<a name="state-compat-next-steps"></a>

**Pianifica il tuo aggiornamento**: Vedi. [Aggiornamento a Flink 2.2: guida completa](flink-2-2-upgrade-guide.md)

Per domande o problemi durante la migrazione, consulta [Risoluzione dei problemi relativi al servizio gestito per Apache Flink](troubleshooting.md) o contatta l' AWS assistenza.

# Implementazione della scalabilità delle applicazioni in Managed Service for Apache Flink
<a name="how-scaling"></a>

È possibile configurare l'esecuzione parallela delle attività e l'allocazione delle risorse per il servizio gestito da Amazon per Apache Flink, al fine di implementare il dimensionamento. Per informazioni su come Apache Flink pianifica le istanze parallele di attività, consulta [Parallel Execution](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/) nella documentazione di Apache Flink.

**Topics**
+ [Configura il parallelismo dell'applicazione e la KPU ParallelismPer](#how-parallelism)
+ [Allocazione delle unità di elaborazione Kinesis](#how-scaling-kpus)
+ [Aggiorna il parallelismo dell'applicazione](#how-scaling-howto)
+ [Utilizza il ridimensionamento automatico in Managed Service for Apache Flink](how-scaling-auto.md)
+ [Considerazioni su maxParallelism](#how-scaling-auto-max-parallelism)

## Configura il parallelismo dell'applicazione e la KPU ParallelismPer
<a name="how-parallelism"></a>

È possibile configurare l'esecuzione parallela per le attività dell'applicazione servizio gestito per Apache Flink (leggere da un’origine o eseguire un operatore, per esempio) utilizzando le proprietà [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html): 
+ `Parallelism`: utilizza questa proprietà per impostare il parallelismo predefinito dell'applicazione di Apache Flink. Tutti gli operatori, le origini e i sink vengono eseguiti con questo parallelismo, a meno che non siano sovrascritti nel codice dell'applicazione. Il valore predefinito è `1`, il valore massimo predefinito è `256`.
+ `ParallelismPerKPU`: utilizza questa proprietà per impostare il numero di task paralleli che è possibile pianificare per ogni unità di elaborazione Kinesis (KPU, Kinesis Processing Unit) dell'applicazione. Il valore predefinito è `1`, il valore massimo predefinito è `8`. Per le applicazioni che prevedono operazioni di blocco (ad esempio I/O), un valore più elevato di `ParallelismPerKPU` implica di utilizzare le risorse KPU nella loro totalità.

**Nota**  
Il limite di `Parallelism` è uguale a `ParallelismPerKPU` volte il limite di KPUs (che ha un valore predefinito di 64). Il KPUs limite può essere aumentato richiedendo un aumento del limite. Per istruzioni su come richiedere un aumento del limite, consultare "Richiedere un aumento del limite" in [Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html).

Per informazioni sull'impostazione del parallelismo delle attività per un operatore specifico, consulta [Impostazione del parallelismo: operatore](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#operator-level) nella documentazione di Apache Flink.

## Allocazione delle unità di elaborazione Kinesis
<a name="how-scaling-kpus"></a>

Managed Service for Apache Flink fornisce capacità come. KPUs Una singola KPU offre 1 vCPU e 4 GB di memoria. Per ogni KPU allocata, vengono forniti anche 50 GB di spazio di archiviazione delle applicazioni in esecuzione. 

Managed Service for Apache Flink calcola il numero KPUs necessario per eseguire l'applicazione utilizzando le proprietà `Parallelism` and`ParallelismPerKPU`, come segue:

```
Allocated KPUs for the application = Parallelism/ParallelismPerKPU
```

Il servizio gestito per Apache Flink fornisce rapidamente risorse alle applicazioni in risposta ai picchi della velocità di trasmissione effettiva o di attività di elaborazione. Rimuove gradualmente le risorse dall'applicazione dopo il superamento del picco di attività. Per disabilitare l'allocazione automatica delle risorse, è sufficiente impostare il valore `AutoScalingEnabled` su `false`, come descritto in seguito in [Aggiorna il parallelismo dell'applicazione](#how-scaling-howto). 

Il limite predefinito KPUs per l'applicazione è 64. Per istruzioni su come richiedere un aumento di tale limite, consulta "Richiedere un aumento del limite" in [Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html).

**Nota**  
A scopo di orchestrazione viene addebitata una KPU aggiuntiva. Per ulteriori informazioni, consulta il [Piano tariffario del servizio gestito da Amazon per Apache Flink](https://aws.amazon.com/kinesis/data-analytics/pricing/).

## Aggiorna il parallelismo dell'applicazione
<a name="how-scaling-howto"></a>

Questa sezione contiene esempi di richieste di azioni API che impostano il parallelismo di un'applicazione. Per ulteriori esempi e istruzioni sull’utilizzo dei blocchi di richiesta con le azioni API, consulta [Codice di esempio dell'API Managed Service per Apache Flink](api-examples.md).

Il seguente esempio di richiesta per l'azione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) imposta il parallelismo durante la creazione di un'applicazione:

```
{
   "ApplicationName": "string",
   "RuntimeEnvironment":"FLINK-1_18",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
         "S3ContentLocation":{
            "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
            "FileKey":"myflink.jar",
            "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
            }
         },
      "CodeContentType":"ZIPFILE"
   },   
      "FlinkApplicationConfiguration": { 
         "ParallelismConfiguration": { 
            "AutoScalingEnabled": "true",
            "ConfigurationType": "CUSTOM",
            "Parallelism": 4,
            "ParallelismPerKPU": 4
         }
      }
   }
}
```

Il seguente esempio di richiesta per l'azione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) imposta il parallelismo per un'applicazione pre-esistente:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "true",
            "ConfigurationTypeUpdate": "CUSTOM",
            "ParallelismPerKPUUpdate": 4,
            "ParallelismUpdate": 4
         }
      }
   }
}
```

Il seguente esempio di richiesta per l'azione [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) disabilita il parallelismo per un'applicazione pre-esistente:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "false"
         }
      }
   }
}
```

# Utilizza il ridimensionamento automatico in Managed Service for Apache Flink
<a name="how-scaling-auto"></a>

Il servizio gestito per Apache Flink ridimensiona in modo elastico il parallelismo dell'applicazione, per adattarsi alla velocità di trasmissione effettiva dei dati della fonte e alla complessità dell'operatore nella maggior parte delle situazioni. Il dimensionamento automatico è abilitato per impostazione predefinita. Il servizio gestito per Apache Flink monitora l'utilizzo delle risorse (CPU) da parte dell'applicazione e aumenta o diminuisce elasticamente il parallelismo dell'applicazione in base alle esigenze:
+ L'applicazione si ridimensiona (aumenta il parallelismo) se il valore massimo della CloudWatch metrica `containerCPUUtilization` è superiore o superiore al 75% per 15 minuti. Ciò significa che l'`ScaleUp`azione viene avviata quando ci sono 15 punti dati consecutivi con un periodo di 1 minuto pari o superiore al 75%. Un'`ScaleUp`azione raddoppia la durata dell'applicazione. `CurrentParallelism` `ParallelismPerKPU`non viene modificato. Di conseguenza, KPUs anche il numero di stanziati raddoppia. 
+ L'applicazione si riduce (diminuisce il parallelismo) quando l'utilizzo della CPU rimane inferiore al 10% per sei ore. Ciò significa che l'`ScaleDown`azione viene avviata quando ci sono 360 punti dati consecutivi con un periodo di 1 minuto inferiore al 10 percento. Un'`ScaleDown`azione dimezza (arrotonda per eccesso) il parallelismo dell'applicazione. `ParallelismPerKPU`non viene modificato e KPUs anche il numero di allocazioni viene dimezzato (arrotondato per eccesso). 

**Nota**  
È possibile fare riferimento a un periodo massimo di `containerCPUUtilization` oltre 1 minuto per trovare la correlazione con un datapoint utilizzato per l'azione Scaling, ma non è necessario indicare il momento esatto in cui l'azione viene inizializzata.

Il servizio gestito per Apache Flink non ridurrà il valore di `CurrentParallelism` dell'applicazione a un valore inferiore rispetto all’impostazione `Parallelism`.

Quando il servizio gestito per Apache Flink dimensiona l'applicazione, questa comparirà nello status `AUTOSCALING`. È possibile verificare lo stato attuale dell'applicazione utilizzando le azioni o. [ DescribeApplication[ ListApplications](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html)](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_DescribeApplication.html) Mentre il servizio sta scalando l'applicazione, l'unica azione API valida che puoi utilizzare è [ StopApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html)con il `Force` parametro impostato `true` su.

È possibile utilizzare la proprietà `AutoScalingEnabled` (parte di [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html)) per abilitare o disabilitare il dimensionamento automatico. Al tuo AWS account vengono addebitate le disposizioni relative al servizio gestito per KPUs Apache Flink, che è una funzione dell'`parallelism`applicazione e delle impostazioni. `parallelismPerKPU` Un picco di attività aumenta i costi del servizio gestito per Apache Flink.

Per ulteriori informazioni sulle tariffe, consulta il [Piano tariffario del servizio gestito da Amazon per Apache Flink](https://aws.amazon.com/kinesis/data-analytics/pricing/). 

È fondamentale tenere presente quanto segue in merito al dimensionamento dell'applicazione:
+ Il dimensionamento automatico è abilitato per impostazione predefinita.
+ Il dimensionamento non si applica ai notebook Studio. Tuttavia, nel caso in cui si implementi un notebook Studio come applicazione con stato permanente, il dimensionamento verrà eseguito sull'applicazione implementata.
+ L'applicazione ha un limite predefinito di 64. KPUs Per ulteriori informazioni, consulta [Servizio gestito per Apache Flink e quota di notebook Studio](limits.md).
+ Quando il dimensionamento automatico aggiorna il parallelismo dell'applicazione, l'applicazione subisce un’interruzione. Segui i seguenti passaggi per evitare che l’applicazione si interrompa:
  + Disabilita il dimensionamento automatico
  + Configura `parallelism` e `parallelismPerKPU` con l'[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)azione della tua applicazione. Per ulteriori informazioni sull'impostazione delle impostazioni di parallelismo dell'applicazione, consulta. [Aggiorna il parallelismo dell'applicazione](how-scaling.md#how-scaling-howto)
  + È fondamentale monitorare periodicamente l'utilizzo delle risorse dell'applicazione, per verificare che disponga di impostazioni di parallelismo adatte al carico di lavoro. Per informazioni sul monitoraggio dell'utilizzo delle risorse di allocazione, consulta [Metriche e dimensioni in Managed Service for Apache Flink](metrics-dimensions.md).

## Implementa la scalabilità automatica personalizzata
<a name="how-scaling-custom-autoscaling"></a>

Se desideri un controllo più preciso sulla scalabilità automatica o utilizzare metriche di attivazione diverse da quelle, puoi usare questo esempio: `containerCPUUtilization` 
+ [AutoScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/AutoScaling)

  Questo esempio illustra come scalare l'applicazione Managed Service for Apache Flink utilizzando una CloudWatch metrica diversa dall'applicazione Apache Flink, incluse le metriche di Amazon MSK e Amazon Kinesis Data Streams, utilizzate come sorgenti o sink.

[Per ulteriori informazioni, consulta Monitoraggio avanzato e scalabilità automatica per Apache Flink.](https://aws.amazon.com/blogs/big-data/enhanced-monitoring-and-automatic-scaling-for-apache-flink/)

## Implementa la scalabilità automatica pianificata
<a name="how-scaling-scheduled-autoscaling"></a>

Se il carico di lavoro segue un profilo prevedibile nel tempo, potresti preferire scalare preventivamente la tua applicazione Apache Flink. Questo ridimensiona l'applicazione a un orario pianificato, invece di scalare in modo reattivo in base a una metrica. Per impostare la scalabilità verso l'alto e verso il basso a orari prestabiliti del giorno, puoi utilizzare questo esempio:
+ [ScheduledScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/ScheduledScaling)

## Considerazioni su maxParallelism
<a name="how-scaling-auto-max-parallelism"></a>

Il parallelismo massimo che un lavoro Flink può scalare è limitato dal *minimo* `maxParallelism` tra tutti gli operatori del lavoro. Ad esempio, se si dispone di un job semplice con solo una sorgente e un sink, e l'origine ha 16 e il sink ha 8, l'applicazione non può scalare oltre il parallelismo di 8. `maxParallelism`

Per sapere come viene calcolato il valore predefinito `maxParallelism` di un operatore e come sovrascriverlo, consulta [Impostazione del parallelismo massimo nella documentazione](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism) di Apache Flink.

Come regola di base, tieni presente che se non definisci `maxParallelism` alcun operatore e avvii l'applicazione con un parallelismo inferiore o uguale a 128, tutti gli operatori avranno un parallelismo pari a 128. `maxParallelism`

**Nota**  
Il parallelismo massimo del processo è il limite massimo di parallelismo per scalare l'applicazione mantenendo lo stato.   
Se modifichi un'applicazione esistente, l'applicazione non sarà in grado `maxParallelism` di riavviarsi da un'istantanea precedente scattata con la vecchia. `maxParallelism` È possibile riavviare l'applicazione solo senza un'istantanea.   
Se si prevede di scalare l'applicazione fino a un parallelismo maggiore di 128, è necessario impostarlo in modo esplicito nell'`maxParallelism`applicazione.
+ La logica di scalabilità automatica impedirà il ridimensionamento di un lavoro Flink a un parallelismo superiore al parallelismo massimo del lavoro.
+ Se utilizzi una scalabilità automatica personalizzata o una scalabilità pianificata, configurale in modo che non superino il parallelismo massimo del lavoro.
+ Se scalate manualmente l'applicazione oltre il parallelismo massimo, l'applicazione non si avvia.

# Aggiungere tag al servizio gestito per le applicazioni Apache Flink
<a name="how-tagging"></a>



Questa sezione descrive come aggiungere tag di metadati chiave-valore alle applicazioni del servizio gestito per Apache Flink. Questi tag possono essere utilizzati per i seguenti scopi:
+ Determinazione della fatturazione per singole applicazioni del servizio gestito per Apache Flink. Per ulteriori informazioni, consulta [Utilizzo dei tag per l'allocazione dei costi](https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/cost-alloc-tags.html) nella *Guida per l'utente di Billing and Cost Management*.
+ Controllo dell'accesso alle risorse dell’applicazione in base ai tag. Per ulteriori informazioni, consulta [Controllo degli accessi tramite tag](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_tags.html) nella *AWS Identity and Access Management Guida per l'utente*.
+ Ambiti definiti dall'utente. È possibile definire le funzionalità delle applicazioni in base alla presenza di tag utente.

Tieni in considerazione i seguenti concetti chiave durante il tagging:
+ Il numero massimo di tag delle applicazioni include i tag di sistema. Il numero massimo di tag delle applicazioni definiti dall'utente è 50.
+ Se un'azione include un elenco di tag con valori `Key` duplicati, il servizio genera `InvalidArgumentException`.

**Topics**
+ [Aggiungi tag quando viene creata un'applicazione](how-tagging-create.md)
+ [Aggiungi o aggiorna i tag per un'applicazione esistente](how-tagging-add.md)
+ [Elenca i tag di un'applicazione](how-tagging-list.md)
+ [Rimuovere i tag da un'applicazione](how-tagging-remove.md)

# Aggiungi tag quando viene creata un'applicazione
<a name="how-tagging-create"></a>

I tag vengono aggiunti durante la creazione di un'applicazione utilizzando il `tags` parametro dell'[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)azione.

La richiesta di esempio seguente mostra il nodo `Tags` per una richiesta `CreateApplication`:

```
"Tags": [ 
    { 
        "Key": "Key1",
        "Value": "Value1"
    },
    { 
        "Key": "Key2",
        "Value": "Value2"
    }
]
```

# Aggiungi o aggiorna i tag per un'applicazione esistente
<a name="how-tagging-add"></a>

Si aggiungono tag a un'applicazione utilizzando l'[TagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_TagResource.html)azione. Non è possibile aggiungere tag a un'applicazione utilizzando l'[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)azione.

Per aggiornare un tag esistente, aggiungere un tag con la stessa chiave del tag esistente.

La seguente richiesta di esempio per l’operazione `TagResource` aggiunge nuovi tag o aggiorna i tag esistenti:

```
{
   "ResourceARN": "string",
   "Tags": [ 
      { 
         "Key": "NewTagKey",
         "Value": "NewTagValue"
      },
      { 
         "Key": "ExistingKeyOfTagToUpdate",
         "Value": "NewValueForExistingTag"
      }
   ]
}
```

# Elenca i tag di un'applicazione
<a name="how-tagging-list"></a>

Per elencare i tag esistenti, si utilizza l'[ListTagsForResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListTagsForResource.html)azione.

L'esempio seguente di richiesta per l'`ListTagsForResource`azione elenca i tag per un'applicazione:

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication"
}
```

# Rimuovere i tag da un'applicazione
<a name="how-tagging-remove"></a>

Per rimuovere i tag da un'applicazione, si utilizza l'[UntagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UntagResource.html)azione.

La seguente richiesta di esempio per l'operazione `UntagResource` rimuove i tag per un'applicazione:

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication",
   "TagKeys": [ "KeyOfFirstTagToRemove", "KeyOfSecondTagToRemove" ]
}
```

# Utilizzo CloudFormation con Managed Service per Apache Flink
<a name="lambda-cfn-flink"></a>

L'esercizio seguente mostra come avviare un'applicazione Flink creata CloudFormation utilizzando una funzione Lambda nello stesso stack. 

## Prima di iniziare
<a name="before-you-begin"></a>

Prima di iniziare questo esercizio, seguite i passaggi per creare un'applicazione Flink utilizzando at. CloudFormation [AWS::KinesisAnalytics::Application](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesis-analyticsapplication.html)

## Scrivere una funzione Lambda
<a name="write-lambda-function"></a>

Per avviare un'applicazione Flink dopo averla creata o aggiornata, utilizziamo l'API kinesisanalyticsv2 [start-application](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/start-application.html). La chiamata verrà attivata da un CloudFormation evento dopo la creazione dell'applicazione Flink. Discuteremo come configurare lo stack per attivare la funzione Lambda più avanti in questo esercizio, ma prima concentriamoci sulla dichiarazione della funzione Lambda e sul relativo codice. In questo esempio utilizziamo il runtime di `Python3.8`. 

```
StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration. 
              run_configuration = { 
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': 'RESTORE_FROM_LATEST_SNAPSHOT',
                }
              }
                            
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
```

Nel codice precedente, Lambda elabora gli eventi in CloudFormation arrivo, filtra tutto `Create` e, ottiene lo stato dell'applicazione `Update` e, se lo stato lo è, la avvia. `READY` Per ottenere lo stato dell'applicazione, è necessario creare il ruolo Lambda, come illustrato di seguito.

## Creare un ruolo Lambda
<a name="create-lambda-role"></a>

Crei un ruolo per Lambda per "parlare" con successo con l'applicazione e scrivere i log. Questo ruolo utilizza politiche gestite predefinite, ma potresti voler restringere il campo all'utilizzo di politiche personalizzate.

```
StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
```

Tieni presente che le risorse Lambda verranno create dopo la creazione dell'applicazione Flink nello stesso stack, poiché dipendono da essa.

## Richiama la funzione Lambda
<a name="invoking-lambda-function"></a>

Ora non resta che richiamare la funzione Lambda. A tale scopo, è possibile utilizzare una [risorsa personalizzata](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cfn-customresource.html).

```
StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
```

Questo è tutto ciò che serve per avviare l'applicazione Flink usando Lambda. Ora sei pronto per creare il tuo stack o utilizzare l'esempio completo riportato di seguito per vedere come funzionano nella pratica tutti questi passaggi.

## Rivedi un esempio esteso
<a name="lambda-cfn-flink-full-example"></a>

L'esempio seguente è una versione leggermente estesa dei passaggi precedenti con un'ulteriore `RunConfiguration` regolazione effettuata tramite [i parametri del modello](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/parameters-section-structure.html). Questo è uno stack funzionante di prova. Assicurati di leggere le note di accompagnamento: 

stack.yaml

```
Description: 'kinesisanalyticsv2 CloudFormation Test Application'
Parameters:
  ApplicationRestoreType:
    Description: ApplicationRestoreConfiguration option, can be SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT or RESTORE_FROM_CUSTOM_SNAPSHOT.
    Type: String
    Default: SKIP_RESTORE_FROM_SNAPSHOT
    AllowedValues: [ SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT, RESTORE_FROM_CUSTOM_SNAPSHOT ]
  SnapshotName:
    Description: ApplicationRestoreConfiguration option, name of a snapshot to restore to, used with RESTORE_FROM_CUSTOM_SNAPSHOT ApplicationRestoreType.
    Type: String
    Default: ''
  AllowNonRestoredState:
    Description: FlinkRunConfiguration option, can be true or false.
    Default: true
    Type: String
    AllowedValues: [ true, false ]
  CodeContentBucketArn:
    Description: ARN of a bucket with application code.
    Type: String
  CodeContentFileKey:
    Description: A jar filename with an application code inside a bucket.
    Type: String
Conditions:
  IsSnapshotNameEmpty: !Equals [ !Ref SnapshotName, '' ]
Resources:
  TestServiceExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service: 
                - kinesisanlaytics.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonKinesisFullAccess
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
      Path: /
  InputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  OutputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  TestFlinkApplication:
    Type: 'AWS::kinesisanalyticsv2::Application'
    Properties:
      ApplicationName: 'CFNTestFlinkApplication'
      ApplicationDescription: 'Test Flink Application'
      RuntimeEnvironment: 'FLINK-1_18'
      ServiceExecutionRole: !GetAtt TestServiceExecutionRole.Arn
      ApplicationConfiguration:
        EnvironmentProperties:
          PropertyGroups:
            - PropertyGroupId: 'KinesisStreams'
              PropertyMap:
                INPUT_STREAM_NAME: !Ref InputKinesisStream
                OUTPUT_STREAM_NAME: !Ref OutputKinesisStream
                AWS_REGION: !Ref AWS::Region
        FlinkApplicationConfiguration:
          CheckpointConfiguration:
            ConfigurationType: 'CUSTOM'
            CheckpointingEnabled: True
            CheckpointInterval: 1500
            MinPauseBetweenCheckpoints: 500
          MonitoringConfiguration:
            ConfigurationType: 'CUSTOM'
            MetricsLevel: 'APPLICATION'
            LogLevel: 'INFO'
          ParallelismConfiguration:
            ConfigurationType: 'CUSTOM'
            Parallelism: 1
            ParallelismPerKPU: 1
            AutoScalingEnabled: True
        ApplicationSnapshotConfiguration:
          SnapshotsEnabled: True
        ApplicationCodeConfiguration:
          CodeContent:
            S3ContentLocation:
              BucketARN: !Ref CodeContentBucketArn
              FileKey: !Ref CodeContentFileKey
          CodeContentType: 'ZIPFILE'     
  StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
  StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration from passed parameters. 
              run_configuration = { 
                'FlinkRunConfiguration': {
                  'AllowNonRestoredState': event['ResourceProperties']['AllowNonRestoredState'] == 'true'
                },
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': event['ResourceProperties']['ApplicationRestoreType'],
                }
              }
              
              # add SnapshotName to RunConfiguration if specified.
              if event['ResourceProperties']['SnapshotName'] != '':
                run_configuration['ApplicationRestoreConfiguration']['SnapshotName'] = event['ResourceProperties']['SnapshotName']
              
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
  StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
      ApplicationRestoreType: !Ref ApplicationRestoreType
      SnapshotName: !Ref SnapshotName
      AllowNonRestoredState: !Ref AllowNonRestoredState
```

Potresti voler modificare i ruoli per Lambda e per l'applicazione stessa.

Prima di creare lo stack di cui sopra, non dimenticare di specificare i parametri.

parameters.json

```
[
  {
    "ParameterKey": "CodeContentBucketArn",
    "ParameterValue": "YOUR_BUCKET_ARN"
  },
  {
    "ParameterKey": "CodeContentFileKey",
    "ParameterValue": "YOUR_JAR"
  },
  {
    "ParameterKey": "ApplicationRestoreType",
    "ParameterValue": "SKIP_RESTORE_FROM_SNAPSHOT"
  },
  {
    "ParameterKey": "AllowNonRestoredState",
    "ParameterValue": "true"
  }
]
```

Sostituisci `YOUR_BUCKET_ARN` e `YOUR_JAR` con i tuoi requisiti specifici. Puoi seguire questa [guida](https://docs.aws.amazon.com/managed-flink/latest/java/get-started-exercise.html) per creare un bucket Amazon S3 e un jar di applicazioni.

Ora crea lo stack (sostituisci YOUR\$1REGION con una regione a tua scelta, ad esempio us-east-1):

```
aws cloudformation create-stack --region YOUR_REGION --template-body "file://stack.yaml" --parameters "file://parameters.json" --stack-name "TestManaged Service for Apache FlinkStack" --capabilities CAPABILITY_NAMED_IAM
```

Ora puoi accedere a [https://console.aws.amazon.com/cloudformation](https://console.aws.amazon.com/cloudformation) e visualizzare i progressi. Una volta creata, dovresti vedere la tua applicazione Flink nello stato `Starting`. Potrebbero essere necessari alcuni minuti prima dell'`Running`. 

Per ulteriori informazioni, consulta gli argomenti seguenti:
+ [Quattro modi per recuperare qualsiasi proprietà del AWS servizio utilizzando AWS CloudFormation (Parte 1](https://aws.amazon.com/blogs/mt/four-ways-to-retrieve-any-aws-service-property-using-aws-cloudformation-part-1/) di 3).
+ [Procedura dettagliata: ricerca dell'immagine di Amazon Machine](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/walkthrough-custom-resources-lambda-lookup-amiids.html). IDs

# Usa la dashboard di Apache Flink con Managed Service per Apache Flink
<a name="how-dashboard"></a>

È possibile utilizzare il pannello di controllo di Apache Flink dell'applicazione per monitorare lo stato dell'applicazione del servizio gestito per Apache Flink. Il pannello di controllo dell'applicazione presenta le seguenti informazioni:
+ Risorse in uso, inclusi task manager e task slot. 
+ Informazioni sui processi, inclusi quelli in esecuzione, completati, annullati e non riusciti. 

Per informazioni sui task manager, i task slot e i processi di Apache Flink, consulta [Architettura di Apache Flink](https://flink.apache.org/what-is-flink/flink-architecture/) sul sito Web di Apache Flink. 

Tieni presente quanto segue riguardo all'utilizzo del pannello di controllo di Apache Flink con le applicazioni del servizio gestito per Apache Flink:
+ Il pannello di controllo di Apache Flink per le applicazioni del servizio gestito per Apache Flink è di sola lettura. Non è possibile apportare modifiche all'applicazione del servizio gestito per Apache Flink utilizzando il pannello di controllo di Apache Flink.
+ Il pannello di controllo di Apache Flink non è compatibile con Microsoft Internet Explorer.

## Accedi alla dashboard Apache Flink della tua applicazione
<a name="how-dashboard-accessing"></a>

Puoi accedere al pannello di controllo Apache Flink della tua applicazione tramite la console del servizio gestito per Apache Flink o richiedendo un endpoint URL sicuro tramite la CLI.

### Accedi alla dashboard Apache Flink della tua applicazione utilizzando la console Managed Service for Apache Flink
<a name="how-dashboard-accessing-console"></a>

Per accedere al pannello di controllo Apache Flink dell'applicazione dalla console, scegli **Pannello di controllo Apache Flink** nella pagina dell'applicazione.

**Nota**  
Quando apri il pannello di controllo dalla console del servizio gestito per Apache Flink, l'URL generato dalla console sarà valido per 12 ore.

### Accedi alla dashboard Apache Flink della tua applicazione utilizzando il servizio gestito per Apache Flink CLI
<a name="how-dashboard-accessing-cli"></a>

Puoi utilizzare la CLI del servizio gestito per Apache Flink per generare un URL per accedere alla dashboard dell'applicazione. L'URL generato è valido per un intervallo di tempo specificato.

**Nota**  
Se non accedi entro tre minuti, non sarà più valido.

L'URL della dashboard viene generato utilizzando l'azione. [ CreateApplicationPresignedUrl](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationPresignedUrl.html) È possibile specificare i valori seguenti per l'operazione: 
+ Nome dell'applicazione
+ Tempo di validità dell'URL in secondi
+ Specifica `FLINK_DASHBOARD_URL` come tipo di URL.