

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Utiliser un bloc-notes Studio avec service géré pour Apache Flink
<a name="how-notebook"></a>

Les blocs-notes Studio pour le service géré pour Apache Flink vous permettent d’interroger des flux de données de manière interactive en temps réel, et de créer et d’exécuter facilement des applications de traitement de flux à l’aide des normes SQL, Python et Scala. En quelques clics dans la console de AWS gestion, vous pouvez lancer un bloc-notes sans serveur pour interroger des flux de données et obtenir des résultats en quelques secondes. 

Un bloc-notes est un environnement de développement basé sur le Web. Grâce aux blocs-notes, vous bénéficiez d’une expérience de développement interactive simple associée aux capacités avancées fournies par Apache Flink. Les blocs-notes Studio utilisent des blocs-notes alimentés par [Apache Zeppelin](https://zeppelin.apache.org/) et se servent d’[Apache Flink](https://flink.apache.org/) comme moteur de traitement des flux. Les blocs-notes Studio combinent parfaitement ces technologies pour rendre les analyses avancées sur les flux de données accessibles aux développeurs de tous niveaux de compétences. 

Apache Zeppelin fournit à vos blocs-notes Studio une suite complète d’outils d’analyse, y compris les outils suivants :
+ Visualisation des données
+ Exportation de données dans des fichiers
+ Contrôle du format de sortie pour une analyse simplifiée

Pour commencer à utiliser le service géré pour Apache Flink et Apache Zeppelin, consultez [Tutoriel : Création d'un bloc-notes Studio dans Managed Service pour Apache Flink](example-notebook.md). Pour plus d’informations sur Apache Zeppelin, consultez la [documentation Apache Zeppelin](http://zeppelin.apache.org).

 Avec un bloc-notes, vous modélisez des requêtes à l'aide de l'[API Apache Flink Table et SQL](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/) dans SQL, Python ou Scala, ou de [DataStream l'API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) dans Scala. En quelques clics, vous pouvez ensuite transformer le bloc-notes Studio en une application de traitement de flux de service géré en continu et non interactive pour Apache Flink pour vos charges de travail de production.

**Topics**
+ [Utilisez la bonne version d'exécution du bloc-notes Studio](studio-notebook-versions.md)
+ [Création d'un bloc-notes Studio](how-zeppelin-creating.md)
+ [

# Effectuez une analyse interactive des données de streaming
](how-zeppelin-interactive.md)
+ [

# Déployez en tant qu'application à état durable
](how-notebook-durable.md)
+ [Autorisations IAM](how-zeppelin-iam.md)
+ [Utiliser les connecteurs et les dépendances](how-zeppelin-connectors.md)
+ [Fonctions définies par l’utilisateur](how-zeppelin-udf.md)
+ [

# Activation de la création de points de contrôle
](how-zeppelin-checkpoint.md)
+ [

# Mettre à niveau Studio Runtime
](upgrading-studio-runtime.md)
+ [

# Travaillez avec AWS Glue
](how-zeppelin-glue.md)
+ [Exemples et didacticiels pour les blocs-notes Studio dans Managed Service for Apache Flink](how-zeppelin-examples.md)
+ [

# Résoudre les problèmes liés aux blocs-notes Studio pour Managed Service pour Apache Flink
](how-zeppelin-troubleshooting.md)
+ [

# Création de politiques IAM personnalisées pour le service géré pour les ordinateurs portables Apache Flink Studio
](how-zeppelin-appendix-iam.md)

# Utilisez la bonne version d'exécution du bloc-notes Studio
<a name="studio-notebook-versions"></a>

Avec Amazon Managed Service pour Apache Flink Studio, vous pouvez interroger des flux de données en temps réel et créer et exécuter des applications de traitement de flux à l'aide de SQL, Python et Scala standard dans un bloc-notes interactif. Les blocs-notes Studio sont alimentés par [Apache Zeppelin](https://zeppelin.apache.org/) et utilisent [Apache Flink](https://flink.apache.org/) comme moteur de traitement des flux. 

**Note**  
Nous déconseillerons Studio Runtime avec **Apache Flink version 1.11 le 5 novembre 2024**. À compter de cette date, vous ne pourrez plus exécuter de nouveaux blocs-notes ni créer de nouvelles applications à l'aide de cette version. Nous vous recommandons de passer à la dernière version d'exécution (Apache Flink 1.15 et Apache Zeppelin 0.10) avant cette date. Pour obtenir des conseils sur la mise à niveau de votre ordinateur portable, consultez[Mettre à niveau Studio Runtime](upgrading-studio-runtime.md).


**Temps d'exécution du studio**  

| Version d'Apache Flink | Version d'Apache Zeppelin | Version de Python |  | 
| --- | --- | --- | --- | 
| 1.15 | 0.1 | 3.8 | Recommandée | 
| 1.13 | 0.9 | 3.8 | Supporté jusqu'au 16 octobre 2024 | 
| 1.11 | 0.9 | 3.7 | Obsolète le 24 février 2025 | 

# Création d'un bloc-notes Studio
<a name="how-zeppelin-creating"></a>

Un bloc-notes Studio contient des requêtes ou des programmes écrits en SQL, Python ou Scala qui s’exécutent sur des données de streaming et renvoient des résultats analytiques. Vous créez votre application à l’aide de la console ou de l’interface CLI et vous fournissez des requêtes pour analyser les données de votre source de données.

Votre application comporte les composants suivants :
+ Une source de données, telle qu’un cluster Amazon MSK, un flux de données Kinesis ou un compartiment Amazon S3.
+ Une AWS Glue base de données. Cette base de données contient des tables qui stockent vos schémas et points de terminaison de source de données et de destination. Pour plus d’informations, consultez [Travaillez avec AWS Glue](how-zeppelin-glue.md).
+ Votre code d’application. Votre code implémente votre requête ou votre programme d’analyse.
+ Les paramètres et les propriétés d’exécution de votre application. Pour obtenir des informations sur les paramètres et les propriétés d’exécution de votre application, consultez les rubriques suivantes dans le [Guide du développeur pour les applications Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html) :
  + **Parallélisme et mise à l’échelle de l’application :** vous utilisez le paramètre de parallélisme de votre application pour contrôler le nombre de requêtes que votre application peut exécuter simultanément. Vos requêtes peuvent également tirer parti d’un parallélisme accru si elles comportent plusieurs chemins d’exécution, par exemple dans les circonstances suivantes :
    + Lors du traitement de plusieurs partitions d’un flux de données Kinesis
    + Lorsque vous partitionnez des données à l’aide de l’opérateur `KeyBy`.
    + Lors de l’utilisation de plusieurs opérateurs de fenêtre

    Pour plus d’informations sur la mise à l’échelle des applications, consultez [Mise à l’échelle des applications dans le service géré pour Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html).
  + **Journalisation et surveillance :** pour obtenir des informations sur la journalisation et la surveillance des applications, consultez la section [Journalisation et surveillance dans le service géré Amazon pour Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/monitoring-overview.html).
  + Votre application utilise des points de contrôle et des points de sauvegarde pour la tolérance aux pannes. Les points de contrôle et de sauvegarde ne sont pas activés par défaut pour les blocs-notes Studio.

Vous pouvez créer votre bloc-notes Studio à l'aide du AWS Management Console ou du AWS CLI. 

Lorsque vous créez l’application à partir de la console, vous disposez des options suivantes :
+ Dans la console Amazon MSK, choisissez votre cluster, puis choisissez **Traiter les données en temps réel.**
+ Dans la console Kinesis Data Streams, choisissez votre flux de données, puis dans l’onglet **Applications**, choisissez **Traiter les données en temps réel**.
+ Dans la console du service géré pour Apache Flink, choisissez l’onglet **Studio**, puis sélectionnez **Créer un bloc-notes Studio**.

# Effectuez une analyse interactive des données de streaming
<a name="how-zeppelin-interactive"></a>

Vous utilisez un bloc-notes sans serveur alimenté par Apache Zeppelin pour interagir avec vos données de streaming. Votre bloc-notes peut contenir plusieurs notes, et chaque note peut comporter un ou plusieurs paragraphes dans lesquels vous pouvez écrire votre code.

L’exemple de requête SQL suivant montre comment récupérer des données à partir d’une source de données :

```
%flink.ssql(type=update)
select * from stock;
```

Pour d'autres exemples de requêtes SQL Flink Streaming, voir [Exemples et didacticiels pour les blocs-notes Studio dans Managed Service for Apache Flink](how-zeppelin-examples.md) ci-dessous et [Requêtes](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/overview/) dans la documentation Apache Flink.

Vous pouvez utiliser les requêtes SQL Flink dans le bloc-notes Studio pour interroger des données de streaming. Vous pouvez également utiliser Python (API de table) et Scala (Table et Datastream APIs) pour écrire des programmes permettant d'interroger vos données de streaming de manière interactive. Vous pouvez consulter les résultats de vos requêtes ou de vos programmes, les mettre à jour en quelques secondes et les réexécuter pour afficher les résultats mis à jour.

## Interprètes Flink
<a name="how-zeppelin-interactive-interpreters"></a>

Vous spécifiez le langage que le service géré pour Apache Flink utilise pour exécuter votre application à l’aide d’un *interprète*. Vous pouvez utiliser les interpréteurs suivants avec le service géré pour Apache Flink :


| Nom | Classe | Description | 
| --- |--- |--- |
| %flink | FlinkInterpreter | Crée ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironmentet fournit un environnement Scala | 
| %flink.pyflink | PyFlinkInterpreter | Fournit un environnement python | 
| %flink.ipyflink | IPyFlinkInterpreter | Fournit un environnement ipython | 
| %flink.ssql | FlinkStreamSqlInterpreter | Fournit un environnement Stream SQL | 
| %flink.bsql | FlinkBatchSqlInterpreter | Fournit un environnement SQL par lots | 

Pour plus d’informations sur les interprètes Flink, consultez la section [Interprète Flink pour Apache Zeppelin](https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html).

Si vous utilisez `%flink.pyflink` ou `%flink.ipyflink` en tant qu’interprète, vous devrez utiliser le `ZeppelinContext` pour visualiser les résultats dans le bloc-notes.

Pour PyFlink des exemples plus spécifiques, voir [Interroger vos flux de données de manière interactive à l'aide du service géré pour Apache Flink Studio et Python](https://aws.amazon.com/blogs/big-data/query-your-data-streams-interactively-using-kinesis-data-analytics-studio-and-python/).

## Variables d’environnement de table Apache Flink
<a name="how-zeppelin-interactive-env-vars"></a>

Apache Zeppelin permet d’accéder aux ressources de l’environnement des tables à l’aide de variables d’environnement. 

Vous accédez aux ressources de l’environnement des tables Scala avec les variables suivantes :


| Variable | Ressource | 
| --- |--- |
| senv | StreamExecutionEnvironment | 
| stenv | StreamTableEnvironment for blink planner | 

Vous accédez aux ressources de l’environnement des tables Python avec les variables suivantes :


| Variable | Ressource | 
| --- |--- |
| s\$1env | StreamExecutionEnvironment | 
| st\$1env | StreamTableEnvironment for blink planner | 

Pour plus d'informations sur l'utilisation des environnements de tables, consultez [Concepts et API communes](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/common/) dans la documentation d'Apache Flink. 

# Déployez en tant qu'application à état durable
<a name="how-notebook-durable"></a>

Vous pouvez créer votre code et l’exporter vers Amazon S3. Vous pouvez promouvoir le code que vous avez écrit dans votre note vers une application de traitement des flux en cours d’exécution continue. Il existe deux modes d’exécution d’une application Apache Flink sur service géré pour Apache Flink : avec un bloc-notes Studio, vous pouvez développer votre code de manière interactive, consultez les résultats de votre code en temps réel et le visualiser dans votre note. Après avoir déployé une note pour qu’elle s’exécute en mode streaming, le service géré pour Apache Flink crée pour vous une application qui s’exécute en continu, lit les données de vos sources, écrit sur vos destinations, maintient l’état de l’application à long terme et s’adapte automatiquement en fonction du débit de vos flux sources. 

**Note**  
Le compartiment S3 vers lequel vous exportez le code de votre application doit se trouver dans la même région que votre bloc-notes Studio.

Vous ne pouvez déployer une note depuis votre bloc-notes Studio que si elle répond aux critères suivants :
+ Les paragraphes doivent être classés dans l’ordre séquentiel. Lorsque vous déployez votre application, tous les paragraphes d'une note sont exécutés séquentiellement (left-to-right, top-to-bottom) tels qu'ils apparaissent dans votre note. Vous pouvez vérifier cet ordre en choisissant **Exécuter tous les paragraphes** dans votre note.
+ Votre code est une combinaison de Python et de SQL ou de Scala et de SQL. Nous ne prenons pas en charge Python et Scala ensemble pour le moment pour deploy-as-application.
+ Votre note ne doit avoir que les interprètes suivants :`%flink`, `%flink.ssql`, `%flink.pyflink`, `%flink.ipyflink`, `%md`.
+ L’utilisation de l’objet `z` du [Contexte Zeppelin](https://zeppelin.apache.org/docs/0.9.0/usage/other_features/zeppelin_context.html) n’est pas prise en charge. Les méthodes qui ne renvoient rien ne feront rien d’autre que de consigner un avertissement. D’autres méthodes déclencheront des exceptions Python ou échoueront à compiler dans Scala.
+ Une note doit aboutir à une seule tâche Apache Flink. 
+ Les notes contenant des [formulaires dynamiques](https://zeppelin.apache.org/docs/0.9.0/usage/dynamic_form/intro.html) ne sont pas prises en charge pour le déploiement en tant qu’application.
+ Les paragraphes %md ([Markdown](https://zeppelin.apache.org/docs/0.9.0/interpreter/markdown.html)) seront ignorés lors du déploiement en tant qu’application, car ils sont censés contenir une documentation lisible par l’homme qui ne convient pas à l’exécution dans le cadre de l’application résultante.
+ Les paragraphes désactivés pour être exécutés dans Zeppelin seront ignorés lors du déploiement en tant qu’application. Même si un paragraphe désactivé utilise un interprète incompatible, par exemple `%flink.ipyflink` dans une note comportant les interprètes `%flink` `and %flink.ssql`, il sera ignoré lors du déploiement de la note en tant qu’application et n’entraînera aucune erreur.
+ Pour que le déploiement de l'application réussisse, il doit y avoir au moins un paragraphe contenant le code source (Flink SQL PyFlink ou Flink Scala) activé pour fonctionner.
+ La définition du parallélisme dans la directive de l’interprète au sein d’un paragraphe (par exemple `%flink.ssql(parallelism=32)`) sera ignorée dans les applications déployées à partir d’une note. Vous pouvez plutôt mettre à jour l'application déployée via l' AWS Management Console AWS API AWS Command Line Interface ou pour modifier les paramètres du and/or ParallelismPer KPU de parallélisme en fonction du niveau de parallélisme requis par votre application, ou vous pouvez activer le dimensionnement automatique pour votre application déployée.
+ Si vous effectuez un déploiement en tant qu’application à état durable, votre VPC doit disposer d’un accès à Internet. Si votre VPC n’a pas accès à Internet, consultez [Déployez en tant qu'application à état durable dans un VPC sans accès à Internet](how-zeppelin-troubleshooting.md#how-zeppelin-troubleshooting-deploying-no-internet). 

## Critères Scala/Python
<a name="how-notebook-durable-scala"></a>
+ Dans votre code Scala ou Python, utilisez le [planificateur Blink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/#dependency-structure) (`senv`, `stenv` pour Scala ; `s_env`, `st_env` pour Python) et non l’ancien planificateur « Flink » (`stenv_2` pour Scala, `st_env_2` pour Python). Le projet Apache Flink recommande l’utilisation du planificateur Blink pour les cas d’utilisation en production. Il s’agit du planificateur par défaut dans Zeppelin et dans Flink.
+ Vos paragraphes Python ne doivent pas utiliser d'[invocations/assignations shell utilisant des](https://ipython.readthedocs.io/en/stable/interactive/python-ipython-diff.html#shell-assignment) [commandes IPython magiques](https://ipython.readthedocs.io/en/stable/interactive/magics.html) telles que `!` `%timeit` ou contenues `%conda` dans des notes destinées à être déployées en tant qu'applications.
+ Vous ne pouvez pas utiliser les classes de cas Scala comme paramètres de fonctions transmises à des opérateurs de flux de données d’ordre supérieur tels que `map` et `filter`. Pour obtenir des informations sur les classes de cas Scala, consultez [CASE CLASSES](https://docs.scala-lang.org/overviews/scala-book/case-classes.html) dans la documentation Scala.

## Critères SQL
<a name="how-notebook-durable-sql"></a>
+ Les instructions SELECT simples ne sont pas autorisées, car il n’existe nulle part d’équivalent à la section de sortie d’un paragraphe dans laquelle les données peuvent être fournies.
+ Dans un paragraphe donné, les instructions DDL (`USE`, `CREATE`, `ALTER`, `DROP`, `SET`, `RESET`) doivent précéder les instructions DML (`INSERT`). Cela est dû au fait que les instructions DML d’un paragraphe doivent être soumises ensemble sous la forme d’une seule tâche Flink.
+ Il doit y avoir au maximum un paragraphe contenant des instructions DML. En effet, pour cette deploy-as-application fonctionnalité, nous ne prenons en charge que la soumission d'une seule tâche à Flink.

Pour plus d’informations et un exemple, consultez [Traduire, rédiger et analyser des données de streaming en utilisant les fonctions SQL avec le service géré Amazon pour Apache Flink, Amazon Translate et Amazon Comprehend](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesisanalytics-MyApplicatioamazon-translate-and-amazon-comprehend/).

# Vérifiez les autorisations IAM pour les blocs-notes Studio
<a name="how-zeppelin-iam"></a>

Le service géré pour Apache Flink vous crée un rôle IAM lorsque vous créez un bloc-notes Studio via la AWS Management Console. Il associe également à ce rôle une politique qui autorise les accès suivants :


****  

| Service | Accès  | 
| --- | --- | 
| CloudWatch Journaux | List | 
| Amazon EC2 | List | 
| AWS Glue | Lire, écrire | 
| Service géré pour Apache Flink | Lecture | 
| Service géré pour Apache Flink V2 | Lecture | 
| Amazon S3 | Lire, écrire | 

# Utiliser les connecteurs et les dépendances
<a name="how-zeppelin-connectors"></a>

Les connecteurs vous permettent de lire et d’écrire des données à travers différentes technologies. Le service géré pour Apache Flink intègre trois connecteurs par défaut à votre bloc-notes Studio. Vous pouvez également utiliser des connecteurs personnalisés. Pour plus d’informations sur les connecteurs, consultez [Table & SQL Connectors](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/) dans la documentation Apache Flink.

## Connecteurs par défaut
<a name="zeppelin-default-connectors"></a>

Si vous utilisez le AWS Management Console pour créer votre bloc-notes Studio, Managed Service for Apache Flink inclut par défaut les connecteurs personnalisés suivants : `flink-sql-connector-kinesis` `flink-connector-kafka_2.12` et`aws-msk-iam-auth`. Pour créer un bloc-notes Studio via la console sans ces connecteurs personnalisés, choisissez l’option **Créer avec des paramètres personnalisés**. Ensuite, lorsque vous arrivez sur la page **Configurations**, décochez les cases à côté des deux connecteurs.

Si vous utilisez l'[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)API pour créer votre bloc-notes Studio, les `flink-connector-kafka` connecteurs `flink-sql-connector-flink` et ne sont pas inclus par défaut. Pour les ajouter, spécifiez-les en tant que `MavenReference` dans le type de données `CustomArtifactsConfiguration`, comme indiqué dans les exemples suivants.

Le connecteur `aws-msk-iam-auth` est le connecteur à utiliser avec Amazon MSK qui inclut la fonctionnalité d’authentification automatique auprès d’IAM. 

**Note**  
Les versions de connecteur présentées dans l’exemple suivant sont les seules que nous prenons en charge.

```
For the Kinesis connector:

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "org.apache.flink",

      "ArtifactId": "flink-sql-connector-kinesis",
      "Version": "1.15.4"

   }      
}]

For authenticating with AWS MSK through AWS IAM:

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "software.amazon.msk",
      "ArtifactId": "aws-msk-iam-auth",
      "Version": "1.1.6"
   }      
}]
            
For the Apache Kafka connector:  

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "org.apache.flink",

      "ArtifactId": "flink-connector-kafka",
      "Version": "1.15.4"

   }      
}]
```

Pour ajouter ces connecteurs à un bloc-notes existant, utilisez l'opération [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)API et spécifiez-les `MavenReference` en tant que type de `CustomArtifactsConfigurationUpdate` données.

**Note**  
Vous pouvez définir `failOnError` sur true pour le connecteur `flink-sql-connector-kinesis` dans l’API de table.

## Ajoutez des dépendances et des connecteurs personnalisés
<a name="zeppelin-custom-connectors"></a>

Pour utiliser le AWS Management Console pour ajouter une dépendance ou un connecteur personnalisé à votre bloc-notes Studio, procédez comme suit :

1. Chargez le fichier de votre connecteur personnalisé sur Amazon S3.

1. Dans le AWS Management Console, choisissez l'option de création **personnalisée pour créer** votre bloc-notes Studio.

1. Suivez le processus de création du bloc-notes Studio jusqu’à ce que vous arriviez à l’étape **Configurations**.

1. Dans la section **Connecteurs personnalisés**, choisissez **Ajouter un connecteur personnalisé**.

1. Spécifiez l’emplacement Amazon S3 de la dépendance ou du connecteur personnalisé.

1. Sélectionnez **Enregistrer les modifications**.

Pour ajouter un fichier JAR de dépendance ou un connecteur personnalisé lorsque vous créez un nouveau bloc-notes Studio à l'aide de l'[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)API, spécifiez l'emplacement Amazon S3 du fichier JAR de dépendance ou du connecteur personnalisé dans le type de `CustomArtifactsConfiguration` données. Pour ajouter une dépendance ou un connecteur personnalisé à un bloc-notes Studio existant, appelez l'opération d'[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)API et spécifiez l'emplacement Amazon S3 du JAR de dépendance ou du connecteur personnalisé dans le type de `CustomArtifactsConfigurationUpdate` données.

**Note**  
Lorsque vous incluez une dépendance ou un connecteur personnalisé, vous devez également inclure toutes ses dépendances transitives qui ne sont pas regroupées en son sein.

# Mettre en œuvre des fonctions définies par l'utilisateur
<a name="how-zeppelin-udf"></a>

Les fonctions définies par l'utilisateur (UDFs) sont des points d'extension qui vous permettent d'appeler une logique fréquemment utilisée ou une logique personnalisée qui ne peut être exprimée autrement dans les requêtes. Vous pouvez utiliser Python ou un langage JVM tel que Java ou Scala pour implémenter vos paragraphes UDFs dans votre bloc-notes Studio. Vous pouvez également ajouter à votre bloc-notes Studio des fichiers JAR externes qui contiennent des éléments UDFs implémentés dans un langage JVM. 

Lorsque vous JARs implémentez ce registre des classes abstraites qui sous-classent `UserDefinedFunction` (ou vos propres classes abstraites), utilisez la portée fournie dans Apache Maven, les déclarations de `compileOnly` dépendance dans Gradle, la portée fournie dans SBT ou une directive équivalente dans la configuration de construction de votre projet UDF. Cela permet au code source UDF de se compiler par rapport au Flink APIs, mais les classes de l'API Flink ne sont pas elles-mêmes incluses dans les artefacts de construction. Reportez-vous à ce [pom](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47) tiré de l’exemple de fichier JAR UDF qui respecte ces prérequis dans un projet Maven. 

**Note**  
Pour un exemple de configuration, consultez [Traduire, rédiger et analyser des données de streaming en utilisant les fonctions SQL avec le service géré Amazon pour Apache Flink, Amazon Translate et Amazon Comprehend](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/) dans le blog *AWS Machine Learning*.

Pour utiliser la console afin d’ajouter des fichiers JAR UDF à votre bloc-notes Studio, procédez comme suit :

1. Chargez vos fichiers JAR UDF sur Amazon S3.

1. Dans le AWS Management Console, choisissez l'option de création **personnalisée pour créer** votre bloc-notes Studio.

1. Suivez le processus de création du bloc-notes Studio jusqu’à ce que vous arriviez à l’étape **Configurations**.

1. Dans la section **Fonctions définies par l’utilisateur**, choisissez **Ajouter une fonction définie par l’utilisateur**.

1. Spécifiez l’emplacement Amazon S3 du fichier JAR ou du fichier ZIP contenant l’implémentation de votre UDF.

1. Sélectionnez **Enregistrer les modifications**.

Pour ajouter un JAR UDF lorsque vous créez un nouveau bloc-notes Studio à l'aide de l'[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)API, spécifiez l'emplacement du JAR dans le type de `CustomArtifactConfiguration` données. Pour ajouter un fichier JAR UDF à un bloc-notes Studio existant, appelez l'opération d'[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)API et spécifiez l'emplacement du fichier JAR dans le type de `CustomArtifactsConfigurationUpdate` données. Vous pouvez également utiliser le AWS Management Console pour ajouter des fichiers JAR UDF à votre bloc-notes Studio.

## Considérations relatives aux fonctions définies par l’utilisateur
<a name="how-zeppelin-udf-considerations"></a>
+ Le service géré pour Apache Flink Studio utilise la [terminologie d’Apache Zeppelin](https://zeppelin.apache.org/docs/0.9.0/quickstart/explore_ui.html) selon laquelle un bloc-notes est une instance Zeppelin pouvant contenir plusieurs notes. Chaque note peut ensuite contenir plusieurs paragraphes. Avec le service géré pour Apache Flink Studio, le processus d’interprétation est partagé entre toutes les notes du bloc-notes. Ainsi, si vous effectuez un enregistrement de fonction explicite à l'aide de [createTemporarySystemFunction](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableEnvironment.html#createTemporarySystemFunction-java.lang.String-java.lang.Class-) dans une note, celle-ci peut être référencée telle quelle dans une autre note du même bloc-notes. 

  L’opération *Déployer en tant qu’application* fonctionne toutefois sur une note *individuelle* et non sur toutes les notes du bloc-notes. Lorsque vous effectuez un déploiement en tant qu’application, seul le contenu d’une note active est utilisé pour générer l’application. Tout enregistrement de fonction explicite effectué dans d’autres blocs-notes ne fait pas partie des dépendances d’application générées. En outre, lors de l’option Déployer en tant qu’application, un enregistrement de fonction implicite se produit en convertissant le nom de classe principal de JAR en une chaîne minuscule.

   Par exemple, si `TextAnalyticsUDF` est la classe principale pour le fichier JAR UDF, un enregistrement implicite donnera le nom de fonction `textanalyticsudf`. Ainsi, si un enregistrement de fonction explicite dans la note 1 de Studio se produit comme suit, toutes les autres notes de ce bloc-notes (disons la note 2) peuvent faire référence à la fonction par son nom `myNewFuncNameForClass` grâce à l’interprète partagé :

  `stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())`

   Toutefois, lors de l’opération de déploiement en tant qu’application mentionnée à la note 2, cet enregistrement explicite *ne sera pas inclus* dans les dépendances et, par conséquent, l’application déployée ne fonctionnera pas comme prévu. En raison de l’enregistrement implicite, par défaut, toutes les références à cette fonction sont supposées être avec `textanalyticsudf` et non `myNewFuncNameForClass`.

   S’il est nécessaire d’enregistrer un nom de fonction personnalisé, la note 2 elle-même devrait contenir un autre paragraphe pour effectuer un autre enregistrement explicite comme suit : 

  ```
  %flink(parallelism=l)
  import com.amazonaws.kinesis.udf.textanalytics.TextAnalyticsUDF 
  # re-register the JAR for UDF with custom name
  stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())
  ```

  ```
  %flink. ssql(type=update, parallelism=1) 
  INSERT INTO
      table2
  SELECT
      myNewFuncNameForClass(column_name)
  FROM
      table1
  ;
  ```
+ Si votre fichier JAR UDF inclut Flink SDKs, configurez votre projet Java de manière à ce que le code source UDF puisse être compilé par rapport au Flink SDKs, mais que les classes du SDK Flink ne soient pas elles-mêmes incluses dans l'artefact de construction, par exemple le JAR. 

  Vous pouvez utiliser la portée `provided` dans Apache Maven, les instructions de dépendance `compileOnly` dans Gradle, la portée `provided` dans SBT ou une directive équivalente dans la configuration de construction de leur projet UDF. Reportez-vous à ce [pom](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47) tiré de l’exemple de fichier JAR UDF qui respecte ces prérequis dans un projet Maven. Pour un step-by-step didacticiel complet, consultez ce guide [Translate, redact et analysez les données de streaming à l'aide des fonctions SQL avec Amazon Managed Service pour Apache Flink, Amazon Translate et Amazon Comprehend](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/).

# Activation de la création de points de contrôle
<a name="how-zeppelin-checkpoint"></a>

Vous activez le point de contrôle à l’aide des paramètres d’environnement. Pour obtenir des informations sur le point de contrôle, consultez [Tolérance aux pannes](https://docs.aws.amazon.com/managed-flink/latest/java/how-fault.html) dans le [guide du développeur du service géré pour Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/).

## Définissez l'intervalle entre les points de contrôle
<a name="how-zeppelin-checkpoint-interval"></a>

L’exemple de code Scala suivant définit l’intervalle de point de contrôle de votre application à une minute :

```
// start a checkpoint every 1 minute
stenv.enableCheckpointing(60000)
```

L’exemple de code Python suivant définit l’intervalle de point de contrôle de votre application à une minute :

```
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.interval", "1min"
)
```

## Définissez le type de point de contrôle
<a name="how-zeppelin-checkpoint-type"></a>

L’exemple de code Scala suivant définit le mode point de contrôle de votre application sur `EXACTLY_ONCE` (valeur par défaut) :

```
// set mode to exactly-once (this is the default)
stenv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
```

L’exemple de code Python suivant définit le mode point de contrôle de votre application sur `EXACTLY_ONCE` (valeur par défaut) :

```
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.mode", "EXACTLY_ONCE"
)
```

# Mettre à niveau Studio Runtime
<a name="upgrading-studio-runtime"></a>

Cette section contient des informations sur la mise à niveau de l'environnement d'exécution de votre bloc-notes Studio. Nous vous recommandons de toujours passer à la dernière version compatible de Studio Runtime.

## Mettez à niveau votre ordinateur portable vers un nouveau Studio Runtime
<a name="upgrading-notebook"></a>

Selon la façon dont vous utilisez Studio, les étapes de mise à niveau de votre environnement d'exécution diffèrent. Sélectionnez l'option qui correspond à votre cas d'utilisation.

### Requêtes SQL ou code Python sans dépendances externes
<a name="notebook-no-dependencies"></a>

Si vous utilisez SQL ou Python sans aucune dépendance externe, suivez le processus de mise à niveau de Runtime suivant. Nous vous recommandons de passer à la dernière version de Runtime. Le processus de mise à niveau est le même, quelle que soit la version d'exécution à partir de laquelle vous effectuez la mise à niveau. 

1. Créez un nouveau bloc-notes Studio à l'aide de la dernière version du Runtime.

1. Copiez et collez le code de chaque note de l'ancien bloc-notes vers le nouveau bloc-notes.

1. Dans le nouveau bloc-notes, ajustez le code pour le rendre compatible avec toutes les fonctionnalités d'Apache Flink modifiées par rapport à la version précédente.
   + Exécutez le nouveau bloc-notes. Ouvrez le bloc-notes, exécutez-le note par note, en séquence, et testez s'il fonctionne.
   + Apportez les modifications nécessaires au code.
   + Arrêtez le nouveau bloc-notes.

1. Si vous aviez déployé l'ancien bloc-notes en tant qu'application :
   + Déployez le nouveau bloc-notes en tant que nouvelle application distincte.
   + Arrêtez l'ancienne application.
   + Exécutez la nouvelle application sans capture instantanée.

1. Arrêtez l'ancien bloc-notes s'il fonctionne. Démarrez le nouveau bloc-notes, selon les besoins, pour une utilisation interactive.

**Flux de processus pour la mise à niveau sans dépendances externes**

![\[Le schéma suivant représente le flux de travail recommandé pour mettre à niveau votre bloc-notes sans dépendances externes.\]](http://docs.aws.amazon.com/fr_fr/managed-flink/latest/java/images/MSF-Studio-upgrade-without-dependencies.png)


### Requêtes SQL ou code Python avec dépendances externes
<a name="notebook-dependencies"></a>

Suivez ce processus si vous utilisez SQL ou Python et si vous utilisez des dépendances externes telles que des connecteurs ou des artefacts personnalisés, tels que des fonctions définies par l'utilisateur implémentées en Python ou Java. Nous vous recommandons de passer à la dernière version du Runtime. Le processus est le même, quelle que soit la version d'exécution à partir de laquelle vous effectuez la mise à niveau.

1. Créez un nouveau bloc-notes Studio à l'aide de la dernière version du Runtime.

1. Copiez et collez le code de chaque note de l'ancien bloc-notes vers le nouveau bloc-notes.

1. Mettez à jour les dépendances externes et les artefacts personnalisés.
   + Recherchez de nouveaux connecteurs compatibles avec la version Apache Flink du nouveau Runtime. Reportez-vous à la section [Connecteurs Table & SQL](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/overview/) de la documentation d'Apache Flink pour trouver les connecteurs appropriés pour la version de Flink.
   + Mettez à jour le code des fonctions définies par l'utilisateur pour qu'il corresponde aux modifications apportées à l'API Apache Flink et à toutes les dépendances Python ou JAR utilisées par les fonctions définies par l'utilisateur. Reconditionnez votre artefact personnalisé mis à jour.
   + Ajoutez ces nouveaux connecteurs et artefacts au nouveau bloc-notes.

1. Dans le nouveau bloc-notes, ajustez le code pour le rendre compatible avec toutes les fonctionnalités d'Apache Flink modifiées par rapport à la version précédente.
   + Exécutez le nouveau bloc-notes. Ouvrez le bloc-notes, exécutez-le note par note, en séquence, et testez s'il fonctionne.
   + Apportez les modifications nécessaires au code.
   + Arrêtez le nouveau bloc-notes.

1. Si vous aviez déployé l'ancien bloc-notes en tant qu'application :
   + Déployez le nouveau bloc-notes en tant que nouvelle application distincte.
   + Arrêtez l'ancienne application.
   + Exécutez la nouvelle application sans capture instantanée.

1. Arrêtez l'ancien bloc-notes s'il fonctionne. Démarrez le nouveau bloc-notes, selon les besoins, pour une utilisation interactive.

**Flux de processus pour la mise à niveau avec des dépendances externes**

![\[Le schéma suivant représente le flux de travail recommandé pour mettre à niveau votre bloc-notes avec des dépendances externes.\]](http://docs.aws.amazon.com/fr_fr/managed-flink/latest/java/images/MSF-Studio-upgrade-with-dependencies.png)


# Travaillez avec AWS Glue
<a name="how-zeppelin-glue"></a>

Votre bloc-notes Studio stocke et obtient des informations sur ses sources de données et ses récepteurs AWS Glue. Lorsque vous créez votre bloc-notes Studio, vous spécifiez la AWS Glue base de données qui contient vos informations de connexion. Lorsque vous accédez à vos sources de données et à vos récepteurs, vous spécifiez AWS Glue les tables contenues dans la base de données. Vos AWS Glue tables permettent d'accéder aux AWS Glue connexions qui définissent les emplacements, les schémas et les paramètres de vos sources de données et de vos destinations.

Les blocs-notes Studio utilisent les propriétés des tables pour stocker des données spécifiques aux applications. Pour de plus amples informations, veuillez consulter [Propriétés de tableau](how-zeppelin-glue-properties.md).

Pour un exemple de configuration d'une AWS Glue connexion, d'une base de données et d'une table à utiliser avec les blocs-notes Studio, consultez [Création d'une AWS Glue base de données](example-notebook.md#example-notebook-glue) le [Tutoriel : Création d'un bloc-notes Studio dans Managed Service pour Apache Flink](example-notebook.md) didacticiel.

# Propriétés de tableau
<a name="how-zeppelin-glue-properties"></a>

Outre les champs de données, vos AWS Glue tables fournissent d'autres informations à votre bloc-notes Studio à l'aide des propriétés des tables. Le service géré pour Apache Flink utilise les propriétés de AWS Glue table suivantes :
+ [Définir les valeurs temporelles d'Apache Flink](#how-zeppelin-glue-timestamp) : ces propriétés définissent la manière dont le service géré pour Apache Flink émet les valeurs de temps de traitement des données internes d’Apache Flink.
+ [Utiliser le connecteur Flink et les propriétés de format](#how-zeppelin-glue-connector) : ces propriétés fournissent des informations sur vos flux de données.

Pour ajouter une propriété à une AWS Glue table, procédez comme suit :

1. Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Dans la liste des tables, choisissez la table que votre application utilise pour stocker ses informations de connexion de données. Choisissez **Action**, puis **Modifier les détails de la table**.

1. Sous **Propriétés de la table**, saisissez **managed-flink.proctime** pour **la clé** et **user\$1action\$1time** pour **la valeur**.

## Définir les valeurs temporelles d'Apache Flink
<a name="how-zeppelin-glue-timestamp"></a>

Apache Flink fournit des valeurs temporelles qui décrivent le moment où les événements de traitement des flux se sont produits, tels que le [temps de traitement](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#processing-time) et l’[heure de l’événement](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#event-time). Pour inclure ces valeurs dans la sortie de votre application, vous définissez des propriétés sur votre AWS Glue table qui indiquent au runtime Managed Service for Apache Flink d'émettre ces valeurs dans les champs spécifiés. 

Les clés et les valeurs que vous utilisez dans les propriétés de votre table sont les suivantes :


| Type d’horodatage | Clé | Valeur | 
| --- |--- |--- |
| [Délai de traitement](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#processing-time) | managed-flink.proctime | Le nom de colonne qui AWS Glue sera utilisé pour exposer la valeur. Ce nom de colonne ne correspond pas à une colonne de table existante. | 
| [Heure de l'événement](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#event-time) | managed-flink.rowtime | Le nom de colonne qui AWS Glue sera utilisé pour exposer la valeur. Ce nom de colonne correspond à une colonne de table existante. | 
| managed-flink.watermark. *column\$1name*.millisecondes | L'intervalle entre les filigranes en millisecondes | 

## Utiliser le connecteur Flink et les propriétés de format
<a name="how-zeppelin-glue-connector"></a>

Vous fournissez des informations sur vos sources de données aux connecteurs Flink de votre application à l’aide des propriétés de table AWS Glue . Voici quelques exemples des propriétés que le service géré pour Apache Flink utilise pour les connecteurs :


| Type de connecteur | Clé | Valeur | 
| --- |--- |--- |
| [Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/kafka.html#connector-options) | format | Le format utilisé pour désérialiser et sérialiser les messages Kafka, par exemple ou. json csv | 
| scan.startup.mode | Le mode de démarrage pour le consommateur de Kafka, par exemple earliest-offset outimestamp. | 
| [Kinésis](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kinesis.html#connector-options) | format | Format utilisé pour désérialiser et sérialiser les enregistrements du flux de données Kinesis, par exemple ou. json csv | 
| aws.region |  AWS Région dans laquelle le flux est défini.  | 
| [S3 (système de fichiers)](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html) | format | Le format utilisé pour désérialiser et sérialiser les fichiers, par exemple ou. json csv | 
| path | Le chemin Amazon S3, par s3://mybucket/ ex. | 

Pour plus d’informations sur les autres connecteurs autres que Kinesis et Apache Kafka, consultez la documentation de votre connecteur.

# Exemples et didacticiels pour les blocs-notes Studio dans Managed Service for Apache Flink
<a name="how-zeppelin-examples"></a>

**Topics**
+ [

# Tutoriel : Création d'un bloc-notes Studio dans Managed Service pour Apache Flink
](example-notebook.md)
+ [

# Tutoriel : Déployer un bloc-notes Studio en tant que service géré pour une application Apache Flink à état durable
](example-notebook-deploy.md)
+ [

# Afficher des exemples de requêtes pour analyser des données dans un bloc-notes Studio
](how-zeppelin-sql-examples.md)

# Tutoriel : Création d'un bloc-notes Studio dans Managed Service pour Apache Flink
<a name="example-notebook"></a>

Le didacticiel suivant explique comment créer un bloc-notes Studio qui lit les données d'un flux de données Kinesis ou d'un cluster Amazon MSK.

**Topics**
+ [

## Remplir les conditions préalables
](#example-notebook-setup)
+ [

## Création d'une AWS Glue base de données
](#example-notebook-glue)
+ [

## Étapes suivantes : créer un bloc-notes Studio avec Kinesis Data Streams ou Amazon MSK
](#examples-notebook-nextsteps)
+ [

# Créer un bloc-notes Studio avec Kinesis Data Streams
](example-notebook-streams.md)
+ [

# Créer un bloc-notes Studio avec Amazon MSK
](example-notebook-msk.md)
+ [

# Nettoyez votre application et les ressources dépendantes
](example-notebook-cleanup.md)

## Remplir les conditions préalables
<a name="example-notebook-setup"></a>

Assurez-vous qu'il s' AWS CLI agit de la version 2 ou d'une version ultérieure. Pour installer la dernière version AWS CLI, voir [Installation, mise à jour et désinstallation de la AWS CLI version 2.](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html)

## Création d'une AWS Glue base de données
<a name="example-notebook-glue"></a>

Votre bloc-notes Studio utilise une base de données [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) pour les métadonnées relatives à votre source de données Amazon MSK.

**Création d'une AWS Glue base de données**

1. Ouvrez la AWS Glue console à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Choisissez **Ajouter une base de données**. Dans la fenêtre **Ajouter une base de données**, saisissez **default** comme **nom de la base de données**. Choisissez **Créer**. 

## Étapes suivantes : créer un bloc-notes Studio avec Kinesis Data Streams ou Amazon MSK
<a name="examples-notebook-nextsteps"></a>

Avec ce didacticiel, vous pouvez créer un bloc-notes Studio qui utilise Kinesis Data Streams ou Amazon MSK :
+ [Créer un bloc-notes Studio avec Kinesis Data Streams](example-notebook-streams.md) : avec Kinesis Data Streams, vous créez rapidement une application qui utilise un flux de données Kinesis comme source. Il vous suffit de créer un flux de données Kinesis en tant que ressource dépendante.
+ [Créer un bloc-notes Studio avec Amazon MSK](example-notebook-msk.md) : avec Amazon MSK, vous créez une application qui utilise un cluster Amazon MSK comme source. Vous devez créer un Amazon VPC, une instance client Amazon EC2 et un cluster Amazon MSK en tant que ressources dépendantes.

# Créer un bloc-notes Studio avec Kinesis Data Streams
<a name="example-notebook-streams"></a>

Ce didacticiel explique comment créer un bloc-notes Studio qui utilise un flux de données Kinesis comme source.

**Topics**
+ [

## Remplir les conditions préalables
](#example-notebook-streams-setup)
+ [

## Création d'une AWS Glue table
](#example-notebook-streams-glue)
+ [

## Créer un bloc-notes Studio avec Kinesis Data Streams
](#example-notebook-streams-create)
+ [

## Envoyer des données vers votre flux de données Kinesis
](#example-notebook-streams-send)
+ [

## Tester votre bloc-notes Studio
](#example-notebook-streams-test)

## Remplir les conditions préalables
<a name="example-notebook-streams-setup"></a>

Avant de créer un bloc-notes Studio, créez un flux de données Kinesis (`ExampleInputStream`). Votre application utilise ce flux comme source de l’application.

Vous pouvez créer ce flux à l’aide de la console Amazon Kinesis ou de la commande AWS CLI suivante. Pour obtenir des instructions sur la console, consultez [Création et mise à jour de flux de données](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) dans le *Guide du développeur Amazon Kinesis Data Streams*. Nommez le flux **ExampleInputStream** et définissez le **Nombre de partitions ouvertes** sur **1**.

Pour créer le stream (`ExampleInputStream`) à l'aide de AWS CLI, utilisez la commande Amazon Kinesis `create-stream` AWS CLI suivante.

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## Création d'une AWS Glue table
<a name="example-notebook-streams-glue"></a>

Votre bloc-notes Studio utilise une base de données [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) pour les métadonnées relatives à votre source de données Kinesis Data Streams.

**Note**  
Vous pouvez d’abord créer la base de données manuellement ou laisser le service géré pour Apache Flink la créer pour vous lors de la création du bloc-notes. De même, vous pouvez soit créer la table manuellement comme décrit dans cette section, soit utiliser le code du connecteur de création de table pour le service géré pour Apache Flink dans votre bloc-notes dans Apache Zeppelin pour créer votre table via une instruction DDL. Vous pouvez ensuite vous enregistrer AWS Glue pour vous assurer que la table a été correctement créée.

**Création d’une table**

1. Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Si vous n'avez pas encore de AWS Glue base de données, choisissez **Bases de données** dans la barre de navigation de gauche. Choisissez **Ajouter une base de données**. Dans la fenêtre **Ajouter une base de données**, saisissez **default** comme **nom de la base de données**. Sélectionnez **Create** (Créer).

1. Dans la barre de navigation de gauche, choisissez **Tables**. Sur la page **Tables**, choisissez **Ajouter des tables**, **Ajouter une table manuellement**.

1. Sur la page **Configurer les propriétés de votre table**, saisissez **stock** pour **Nom de la table**. Assurez-vous de sélectionner la base de données que vous avez créée précédemment. Choisissez **Suivant**.

1. Sur la page **Ajouter un magasin de données**, choisissez **Kinesis**. Pour le **Nom du flux**, saisissez **ExampleInputStream**. Pour **URL source Kinesis**, saisissez **https://kinesis.us-east-1.amazonaws.com**. Si vous copiez et collez l’**URL source Kinesis**, veillez à supprimer les espaces de début ou de fin. Choisissez **Suivant**.

1. Sur la page **Classification**, choisissez **JSON**. Choisissez **Suivant**.

1. Sur la page **Définir un schéma**, choisissez Ajouter une colonne pour ajouter une colonne. Ajoutez des colonnes avec les propriétés suivantes :    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/managed-flink/latest/java/example-notebook-streams.html)

   Choisissez **Suivant**.

1. Sur la page suivante, vérifiez vos paramètres, puis choisissez **Terminer**.

1. Choisissez la table que vous venez de créer dans la liste des tables.

1. Choisissez **Modifier la table** et ajoutez une propriété avec la clé `managed-flink.proctime` et la valeur `proctime`.

1. Choisissez **Appliquer**.

## Créer un bloc-notes Studio avec Kinesis Data Streams
<a name="example-notebook-streams-create"></a>

Maintenant que vous avez créé les ressources utilisées par votre application, vous pouvez créer votre bloc-notes Studio. 

**Topics**
+ [

### Créez un bloc-notes Studio à l'aide du AWS Management Console
](#example-notebook-create-streams-console)
+ [

### Créez un bloc-notes Studio à l'aide du AWS CLI
](#example-notebook-msk-create-api)

### Créez un bloc-notes Studio à l'aide du AWS Management Console
<a name="example-notebook-create-streams-console"></a>

1. Ouvrez le service géré pour la console Apache Flink [ https://console.aws.amazon.com/managed-flink/chez vous ? region=us-east-1\$1/applications/tableau](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) de bord. 

1. Sur la page **Applications de service géré pour Apache Flink**, choisissez l’onglet **Studio**. Choisissez **Créer un bloc-notes Studio**.
**Note**  
Vous pouvez aussi créer un bloc-notes Studio à partir des consoles Amazon MSK ou Kinesis Data Streams en sélectionnant votre cluster Amazon MSK ou votre flux de données Kinesis d’entrée, puis en choisissant **Traiter les données en temps réel**.

1. Sur la page **Créer un bloc-notes Studio**, fournissez les informations suivantes :
   + Saisissez **MyNotebook** comme nom du bloc-notes.
   + Pour **Base de données AWS Glue**, choisissez **Par défaut**.

   Choisissez **Créer un bloc-notes Studio**.

1. Sur la **MyNotebook**page, choisissez **Exécuter**. Attendez que **État** indique **En cours d’exécution**. Des frais s’appliquent lorsque le bloc-notes fonctionne.

### Créez un bloc-notes Studio à l'aide du AWS CLI
<a name="example-notebook-msk-create-api"></a>

Pour créer votre bloc-notes Studio à l'aide du AWS CLI, procédez comme suit :

1. Vérifiez votre identifiant de compte. Vous avez besoin de cette valeur pour créer votre application.

1. Créez le rôle `arn:aws:iam::AccountID:role/ZeppelinRole` et ajoutez les autorisations suivantes au rôle créé automatiquement par la console.

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. Créez un fichier nommé `create.json` avec le contenu suivant. Remplacez les valeurs des espaces réservés par vos informations.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Pour créer votre application, exécutez la commande suivante.

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. Lorsque la commande est terminée, vous voyez une sortie contenant les détails de votre nouveau bloc-notes Studio. Voici un exemple de la sortie.

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Pour lancer votre application, exécutez la commande suivante. Remplacez les exemples de valeur par l’identifiant de votre compte.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Envoyer des données vers votre flux de données Kinesis
<a name="example-notebook-streams-send"></a>

Pour envoyer des données de test vers votre flux de données Kinesis, procédez comme suit :

1. Utilisez le [Kinesis Data Generator](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). 

1. Choisissez **Créer un utilisateur Cognito** avec. CloudFormation

1. La CloudFormation console s'ouvre avec le modèle Kinesis Data Generator. Choisissez **Suivant**.

1. Sur la page **Spécifier les détails de la pile**, saisissez le nom d’utilisateur et le mot de passe de votre utilisateur Cognito. Choisissez **Suivant**.

1. Sur la page **Configurer les options de pile**, choisissez **Suivant**.

1. Sur la page **Review Kinesis-Data-Generator-Cognito -User**, sélectionnez l'option **J'accuse réception AWS CloudFormation susceptible de créer des ressources IAM**. case à cocher. Sélectionnez **Créer une pile**.

1. Attendez la fin de la création de la CloudFormation pile. **Une fois la pile terminée, ouvrez la pile **Kinesis-Data-Generator-Cognito-User** dans la console et choisissez l'onglet Sorties. CloudFormation ** Ouvrez l'URL répertoriée pour la valeur **KinesisDataGeneratorUrl**de sortie.

1. Sur la page **Amazon Kinesis Data Generator**, connectez-vous avec les informations d’identification que vous avez créées à l’étape 4.

1. Sur la page suivante, renseignez les valeurs suivantes :    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/managed-flink/latest/java/example-notebook-streams.html)

   Pour **Modèle d’enregistrement**, collez le code suivant :

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. Choisissez **Envoyer les données**.

1. Le générateur enverra les données à votre flux de données Kinesis. 

   Laissez le générateur tourner pendant que vous terminez la section suivante.

## Tester votre bloc-notes Studio
<a name="example-notebook-streams-test"></a>

Dans cette section, vous utilisez votre bloc-notes Studio pour interroger les données de votre flux de données Kinesis.

1. Ouvrez le service géré pour la console Apache Flink [ https://console.aws.amazon.com/managed-flink/chez vous ? region=us-east-1\$1/applications/tableau](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) de bord.

1. Sur la page **Applications de service géré pour Apache Flink**, choisissez l’onglet **Bloc-notes Studio**. Sélectionnez **MyNotebook**.

1. Sur la **MyNotebook**page, choisissez **Ouvrir dans Apache Zeppelin**.

   L’interface Apache Zeppelin s’ouvre dans un nouvel onglet.

1. Sur la page **Bienvenue sur Zeppelin \$1**, choisissez **Note Zeppelin**.

1. Sur la page **Note Zeppelin**, entrez la requête suivante dans une nouvelle note :

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Choisissez l’icône d’exécution.

   Après un court instant, la note affiche les données du flux de données Kinesis.

Pour ouvrir le tableau de bord Apache Flink de votre application afin de visualiser les aspects opérationnels, choisissez **FLINK JOB**. Pour plus d’informations sur le tableau de bord Flink, consultez [Tableau de bord Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) dans le [guide du développeur du service géré pour Apache Flink](https://docs.aws.amazon.com/).

Pour d’autres exemples de requêtes SQL Flink Streaming, consultez la section [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) de la documentation [Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Créer un bloc-notes Studio avec Amazon MSK
<a name="example-notebook-msk"></a>

Ce didacticiel explique comment créer un bloc-notes Studio qui utilise un cluster Amazon MSK comme source.

**Topics**
+ [

## Configuration d'un cluster Amazon MSK
](#example-notebook-msk-setup)
+ [

## Ajoutez une passerelle NAT à votre VPC
](#example-notebook-msk-nat)
+ [

## Création d'une AWS Glue connexion et d'une table
](#example-notebook-msk-glue)
+ [

## Créer un bloc-notes Studio avec Amazon MSK
](#example-notebook-msk-create)
+ [

## Envoyer des données à votre cluster Amazon MSK
](#example-notebook-msk-send)
+ [

## Tester votre bloc-notes Studio
](#example-notebook-msk-test)

## Configuration d'un cluster Amazon MSK
<a name="example-notebook-msk-setup"></a>

Pour ce didacticiel, vous avez besoin d’un cluster Amazon MSK qui autorise l’accès en texte brut. Si vous n'avez pas encore configuré de cluster Amazon MSK, suivez le didacticiel [Getting Started Using Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) pour créer un Amazon VPC, un cluster Amazon MSK, une rubrique et une instance client Amazon. EC2 

Lorsque vous suivez le didacticiel, procédez comme suit :
+ Dans l’[étape 3 : créer un cluster Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html), à l’étape 4, modifiez la valeur `ClientBroker` de `TLS` à **PLAINTEXT**.

## Ajoutez une passerelle NAT à votre VPC
<a name="example-notebook-msk-nat"></a>

Si vous avez créé un cluster Amazon MSK en suivant le didacticiel [Mise en route avec Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html), ou si votre VPC Amazon existant ne possède pas encore de passerelle NAT pour ses sous-réseaux privés, vous devez ajouter une passerelle NAT à votre VPC Amazon. Le schéma suivant illustre l’architecture. 

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/fr_fr/managed-flink/latest/java/images/vpc_05.png)


Pour créer une passerelle NAT pour votre Amazon VPC, procédez comme suit :

1. Ouvrez la console Amazon VPC à l’adresse [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Dans la barre de navigation de gauche, choisissez **Passerelles NAT**.

1. Sur la page **Passerelles NAT**, choisissez **Créer une passerelle NAT**.

1. Sur la page **Créer une passerelle NAT**, renseignez les valeurs suivantes :    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/managed-flink/latest/java/example-notebook-msk.html)

   Choisissez **Créer une passerelle NAT**.

1. Dans le volet de navigation de gauche, choisissez **Tables de routage**.

1. Choisissez **Créer une table de routage**.

1. Sur la page **Créer une table de routage**, fournissez les informations suivantes :
   + **Balise de nom** : **ZeppelinRouteTable**
   + **VPC** **: Choisissez votre VPC (par exemple, VPC).AWS KafkaTutorial**

   Sélectionnez **Create** (Créer).

1. Dans la liste des tables de routage, choisissez **ZeppelinRouteTable**. Choisissez l’onglet **Routes**, puis **Modifier les routes**.

1. Sur la page **Modifier les routes**, choisissez **Ajouter une route**.

1. Dans le **** Pour **Destination**, saisissez **0.0.0.0/0**. Pour **Target**, choisissez **NAT Gateway**, **ZeppelinGateway**. Choisissez **Enregistrer les routes**. Choisissez **Close** (Fermer).

1. Sur la page Tables de routage, lorsque cette option est **ZeppelinRouteTable**sélectionnée, choisissez l'onglet **Associations de sous-réseaux**. Choisissez **Modifier les associations de sous-réseaux**.

1. Sur la page **Modifier les associations de sous-réseaux**, choisissez **AWS KafkaTutorialSubnet2** et **AWS KafkaTutorialSubnet3**. Choisissez **Save** (Enregistrer).

## Création d'une AWS Glue connexion et d'une table
<a name="example-notebook-msk-glue"></a>

Votre bloc-notes Studio utilise une base de données [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) pour les métadonnées relatives à votre source de données Amazon MSK. Dans cette section, vous créez une AWS Glue connexion qui décrit comment accéder à votre cluster Amazon MSK et un AWS Glue tableau qui décrit comment présenter les données de votre source de données à des clients tels que votre bloc-notes Studio. 

**Créer une connexion**

1. Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Si vous n'avez pas encore de AWS Glue base de données, choisissez **Bases de données** dans la barre de navigation de gauche. Choisissez **Ajouter une base de données**. Dans la fenêtre **Ajouter une base de données**, saisissez **default** comme **nom de la base de données**. Sélectionnez **Create** (Créer).

1. Dans le menu de navigation de gauche, sélectionnez **Connexions**. Choisissez **Ajouter une connexion**.

1. Dans la fenêtre **Ajouter une connexion**, indiquez les valeurs suivantes :
   + Pour **Nom de connexion**, saisissez **ZeppelinConnection**.
   + Pour **Type de connexion**, choisissez **Kafka**.
   + Pour le **serveur bootstrap Kafka URLs**, fournissez la chaîne de broker bootstrap de votre cluster. Vous pouvez obtenir les agents d’amorçage depuis la console MSK ou en saisissant la commande CLI suivante :

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + Décochez la case **Exiger une connexion SSL**.

   Choisissez **Suivant**.

1. Sur la page **VPC**, renseignez les valeurs suivantes :
   + **Pour le **VPC**, choisissez le nom de votre VPC (par exemple, VPC.) AWS KafkaTutorial**
   + Pour **Sous-réseau**, choisissez **AWS KafkaTutorialSubnet2.**
   + Pour **Groupes de sécurité**, sélectionnez tous les groupes disponibles.

   Choisissez **Suivant**.

1. Sur la page **Propriétés de la connexion**/**Accès à la connexion**, choisissez **Terminer**.

**Création d’une table**
**Note**  
Vous pouvez soit créer la table manuellement comme décrit dans les étapes suivantes, soit utiliser le code du connecteur de création de table pour le service géré pour Apache Flink dans votre bloc-notes dans Apache Zeppelin pour créer votre table via une instruction DDL. Vous pouvez ensuite vous enregistrer AWS Glue pour vous assurer que la table a été correctement créée.

1. Dans la barre de navigation de gauche, choisissez **Tables**. Sur la page **Tables**, choisissez **Ajouter des tables**, **Ajouter une table manuellement**.

1. Sur la page **Configurer les propriétés de votre table**, saisissez **stock** pour **Nom de la table**. Assurez-vous de sélectionner la base de données que vous avez créée précédemment. Choisissez **Suivant**.

1. Sur la page **Ajouter un magasin de données**, choisissez **Kafka**. Pour le **nom du sujet**, entrez le nom de votre sujet (par exemple **AWS KafkaTutorialTopic**). Pour **Connection**, choisissez **ZeppelinConnection**.

1. Sur la page **Classification**, choisissez **JSON**. Choisissez **Suivant**.

1. Sur la page **Définir un schéma**, choisissez Ajouter une colonne pour ajouter une colonne. Ajoutez des colonnes avec les propriétés suivantes :    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/managed-flink/latest/java/example-notebook-msk.html)

   Choisissez **Suivant**.

1. Sur la page suivante, vérifiez vos paramètres, puis choisissez **Terminer**.

1. Choisissez la table que vous venez de créer dans la liste des tables.

1. Choisissez **Modifier le tableau** et ajoutez les propriétés suivantes :
   + clé :`managed-flink.proctime`, valeur : `proctime`
   + clé :`flink.properties.group.id`, valeur : `test-consumer-group`
   + clé :`flink.properties.auto.offset.reset`, valeur : `latest`
   + clé :`classification`, valeur : `json`

   Sans ces paires clé/valeur, le bloc-notes Flink rencontre une erreur. 

1. Choisissez **Appliquer**.

## Créer un bloc-notes Studio avec Amazon MSK
<a name="example-notebook-msk-create"></a>

Maintenant que vous avez créé les ressources utilisées par votre application, vous pouvez créer votre bloc-notes Studio. 

**Topics**
+ [

### Créez un bloc-notes Studio à l'aide du AWS Management Console
](#example-notebook-create-msk-console)
+ [

### Créez un bloc-notes Studio à l'aide du AWS CLI
](#example-notebook-msk-create-api)

**Note**  
Vous pouvez également créer un bloc-notes Studio à partir de la console Amazon MSK en choisissant un cluster existant, puis en choisissant **Traiter les données en temps réel**.

### Créez un bloc-notes Studio à l'aide du AWS Management Console
<a name="example-notebook-create-msk-console"></a>

1. Ouvrez le service géré pour la console Apache Flink [ https://console.aws.amazon.com/managed-flink/chez vous ? region=us-east-1\$1/applications/tableau](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) de bord.

1. Sur la page **Applications de service géré pour Apache Flink**, choisissez l’onglet **Studio**. Choisissez **Créer un bloc-notes Studio**.
**Note**  
Pour créer un bloc-notes Studio à partir des consoles Amazon MSK ou Kinesis Data Streams, sélectionnez votre cluster Amazon MSK ou votre flux de données Kinesis d’entrée, puis choisissez **Traiter les données en temps réel**.

1. Sur la page **Créer un bloc-notes Studio**, fournissez les informations suivantes :
   + Pour **Nom du bloc-notes Studio**, saisissez **MyNotebook**.
   + Pour **Base de données AWS Glue**, choisissez **Par défaut**.

   Choisissez **Créer un bloc-notes Studio**.

1. Sur la **MyNotebook**page, choisissez l'onglet **Configuration**. Dans la section **Mise en réseau**, choisissez **Modifier**.

1. Dans la MyNotebook page **Modifier le réseau pour**, choisissez la **configuration VPC basée sur le cluster Amazon MSK**. Choisissez votre cluster Amazon MSK pour **Cluster Amazon MSK**. Sélectionnez **Enregistrer les modifications**.

1. Sur la **MyNotebook**page, choisissez **Exécuter**. Attendez que **État** indique **En cours d’exécution**.

### Créez un bloc-notes Studio à l'aide du AWS CLI
<a name="example-notebook-msk-create-api"></a>

Pour créer votre bloc-notes Studio à l'aide du AWS CLI, procédez comme suit :

1. Assurez-vous de disposer des informations suivantes. Vous avez besoin de ces valeurs pour créer votre application.
   + Votre ID de compte.
   + ID de sous-réseau IDs et de groupe de sécurité pour l'Amazon VPC qui contient votre cluster Amazon MSK.

1. Créez un fichier nommé `create.json` avec le contenu suivant. Remplacez les valeurs des espaces réservés par vos informations.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Pour créer votre application, exécutez la commande suivante.

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. Lorsque la commande est terminée, vous devriez obtenir une sortie similaire à celle qui suit, montrant les détails de votre nouveau bloc-notes Studio :

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Pour lancer votre application, exécutez la commande suivante. Remplacez les exemples de valeur par l’identifiant de votre compte.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Envoyer des données à votre cluster Amazon MSK
<a name="example-notebook-msk-send"></a>

Dans cette section, vous allez exécuter un script Python dans votre EC2 client Amazon pour envoyer des données à votre source de données Amazon MSK.

1. Connectez-vous à votre EC2 client Amazon.

1. Exécutez les commandes suivantes pour installer Python version 3, Pip et le package Kafka pour Python, puis confirmez les actions :

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. Configurez le AWS CLI sur votre machine cliente en saisissant la commande suivante :

   ```
   aws configure
   ```

   Fournissez les informations d’identification de votre compte, et **us-east-1** pour la `region`.

1. Créez un fichier nommé `stock.py` avec le contenu suivant. Remplacez la valeur de l'échantillon par la chaîne Bootstrap Brokers de votre cluster Amazon MSK et mettez à jour le nom du sujet si celui-ci n'est pas : **AWS KafkaTutorialTopic**

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. Exécutez le script avec la commande suivante :

   ```
   $ python3 stock.py
   ```

1. Laissez le script s’exécuter pendant que vous complétez la section suivante.

## Tester votre bloc-notes Studio
<a name="example-notebook-msk-test"></a>

Dans cette section, vous utilisez votre bloc-notes Studio pour interroger les données de votre cluster Amazon MSK.

1. Ouvrez le service géré pour la console Apache Flink [ https://console.aws.amazon.com/managed-flink/chez vous ? region=us-east-1\$1/applications/tableau](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) de bord.

1. Sur la page **Applications de service géré pour Apache Flink**, choisissez l’onglet **Bloc-notes Studio**. Sélectionnez **MyNotebook**.

1. Sur la **MyNotebook**page, choisissez **Ouvrir dans Apache Zeppelin**.

   L’interface Apache Zeppelin s’ouvre dans un nouvel onglet.

1. Sur la page **Bienvenue sur Zeppelin \$1**, choisissez **Nouvelle note Zeppelin**.

1. Sur la page **Note Zeppelin**, entrez la requête suivante dans une nouvelle note :

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Choisissez l’icône d’exécution.

   L’application affiche les données du cluster Amazon MSK.

Pour ouvrir le tableau de bord Apache Flink de votre application afin de visualiser les aspects opérationnels, choisissez **FLINK JOB**. Pour plus d’informations sur le tableau de bord Flink, consultez [Tableau de bord Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) dans le [guide du développeur du service géré pour Apache Flink](https://docs.aws.amazon.com/).

Pour d’autres exemples de requêtes SQL Flink Streaming, consultez la section [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) de la documentation [Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Nettoyez votre application et les ressources dépendantes
<a name="example-notebook-cleanup"></a>

## Configurez votre bloc-notes Studio.
<a name="example-notebook-cleanup-app"></a>

1. Ouvrez la console du service géré pour Apache Flink.

1. Sélectionnez **MyNotebook**.

1. Choisissez **Actions**, puis **Supprimer**.

## Supprimer votre AWS Glue base de données et votre connexion
<a name="example-notebook-cleanup-glue"></a>

1. Ouvrez la AWS Glue console à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Dans la barre de navigation de gauche, choisissez **Bases de données**. Cochez la case à côté de **Défaut** pour le sélectionner. Choisissez **Action**, **Supprimer la base de données**. Confirmez votre sélection.

1. Dans le menu de navigation de gauche, sélectionnez **Connexions**. Cochez la case à côté **ZeppelinConnection**pour le sélectionner. Choisissez **Action**, puis **Supprimer la connexion**. Confirmez votre sélection.

## Pour supprimer la politique et le rôle IAM
<a name="example-notebook-msk-cleanup-iam"></a>

1. Ouvrez la console IAM à l’adresse [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Dans le menu de navigation de gauche, choisissez **Rôles**.

1. Utilisez la barre de recherche pour rechercher le **ZeppelinRole**rôle.

1. Choisissez le **ZeppelinRole**rôle. Choisissez **Supprimer le rôle**. Confirmez la suppression.

## Supprimer votre groupe de CloudWatch journaux
<a name="example-notebook-cleanup-cw"></a>

La console crée un groupe de CloudWatch journaux et un flux de journaux pour vous lorsque vous créez votre application à l'aide de la console. Vous n’avez pas de groupe ni flux de journaux si vous avez créé votre application à l’aide de l’interface AWS CLI.

1. Ouvrez la CloudWatch console à l'adresse [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. Dans le menu de navigation de gauche, choisissez **Groupe de journaux**.

1. Choisissez le groupe**/AWS/KinesisAnalytics/MyNotebook**log.

1. Sélectionnez **Actions**, **Supprimer le ou les groupes de journaux**. Confirmez la suppression.

## Nettoyez les ressources de Kinesis Data Streams
<a name="example-notebook-cleanup-streams"></a>

Pour supprimer votre flux Kinesis, ouvrez la console Kinesis Data Streams, sélectionnez votre flux Kinesis, puis choisissez **Actions**, **Supprimer.**

## Nettoyage des ressources MSK
<a name="example-notebook-cleanup-msk"></a>

Suivez les étapes de cette section si vous avez créé un cluster Amazon MSK pour ce didacticiel. Cette section contient des instructions pour nettoyer votre instance client Amazon EC2, Amazon VPC et votre cluster Amazon MSK.

### Supprimer votre cluster Amazon MSK
<a name="example-notebook-msk-cleanup-msk"></a>

Suivez ces étapes si vous avez créé un cluster Amazon MSK pour ce didacticiel.

1. Vous voulez ouvrir la console Amazon MSK à la [https://console.aws.amazon.com/msk/maison ? region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. Sélectionnez **AWS KafkaTutorialCluster**. Sélectionnez **Delete (Supprimer)**. Saisissez **delete** dans la fenêtre qui apparaît et confirmez votre sélection.

### Résilier une instance client
<a name="example-notebook-msk-cleanup-client"></a>

Suivez ces étapes si vous avez créé une instance de client Amazon EC2 pour ce didacticiel.

1. Ouvrez la console Amazon EC2 à l’adresse [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Dans la barre de navigation de gauche, choisissez **Instances**.

1. Cochez la case à côté **ZeppelinClient**pour le sélectionner.

1. Choisissez **État de l’instance**, **Résilier l’instance**.

### Supprimer votre Amazon VPC
<a name="example-notebook-msk-cleanup-vpc"></a>

Suivez ces étapes si vous avez créé un Amazon VPC pour ce didacticiel.

1. Ouvrez la console Amazon EC2 à l’adresse [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Choisissez **Interfaces réseau** dans la barre de navigation de gauche.

1. Saisissez l’identifiant de VPC dans la barre de recherche, puis appuyez sur Entrée.

1. Cochez la case dans l’en-tête du tableau pour sélectionner toutes les interfaces réseau affichées.

1. Sélectionnez **Actions**, **Détacher**. Dans la fenêtre qui apparaît, choisissez **Activer** sous **Forcer le détachement**. Choisissez **Détacher** et attendez que toutes les interfaces réseau atteignent l’état **Disponible**.

1. Cochez la case dans l’en-tête du tableau pour sélectionner à nouveau toutes les interfaces réseau affichées.

1. Sélectionnez **Actions**, **Supprimer**. Confirmez l’action.

1. Ouvrez la console Amazon VPC à l’adresse [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Sélectionnez **AWS KafkaTutorialVPC**. Choisissez **Actions**, **Supprimer le VPC**. Saisissez **delete** et confirmez la suppression.

# Tutoriel : Déployer un bloc-notes Studio en tant que service géré pour une application Apache Flink à état durable
<a name="example-notebook-deploy"></a>

Le didacticiel suivant explique comment déployer un bloc-notes Studio en tant qu’application état durable de service géré pour Apache Flink.

**Topics**
+ [

## Exécuter les opérations prérequises
](#example-notebook-durable-setup)
+ [

## Déployez une application durable à l'aide du AWS Management Console
](#example-notebook-deploy-console)
+ [

## Déployez une application durable à l'aide du AWS CLI
](#example-notebook-deploy-cli)

## Exécuter les opérations prérequises
<a name="example-notebook-durable-setup"></a>

Créez un nouveau bloc-notes Studio en suivant [Tutoriel : Création d'un bloc-notes Studio dans Managed Service pour Apache Flink](example-notebook.md), à l’aide de Kinesis Data Streams ou d’Amazon MSK. Nommez le bloc-notes Studio `ExampleTestDeploy`.

## Déployez une application durable à l'aide du AWS Management Console
<a name="example-notebook-deploy-console"></a>

1. Ajoutez un emplacement de compartiment S3 là où vous souhaitez que le code empaqueté soit stocké sous **Emplacement du code d’application (*facultatif***) dans la console. Cela permet de suivre les étapes de déploiement et d’exécution de votre application directement depuis le bloc-notes.

1. Ajoutez les autorisations requises au rôle d’application pour activer le rôle que vous utilisez pour lire et écrire dans un compartiment Amazon S3 et pour lancer une application de service géré pour Apache Flink :
   + Amazon S3 FullAccess
   + Amazon a géré- flinkFullAccess
   + Accès à vos sources, destinations et, VPCs le cas échéant. Pour de plus amples informations, veuillez consulter [Vérifiez les autorisations IAM pour les blocs-notes Studio](how-zeppelin-iam.md).

1. Utilisez l’exemple de code suivant :

   ```
   %flink.ssql(type=update) 
   CREATE TABLE exampleoutput (
     'ticket' VARCHAR,
     'price' DOUBLE
   )
   WITH (
     'connector' = 'kinesis',
     'stream' = 'ExampleOutputStream',
     'aws.region' = 'us-east-1',
     'scan.stream.initpos' = 'LATEST',
     'format' = 'json'
   );
   
   INSERT INTO exampleoutput SELECT ticker, price FROM exampleinputstream
   ```

1. Avec le lancement de cette fonctionnalité, vous verrez une nouvelle liste déroulante dans le coin supérieur droit de chaque note de votre bloc-notes avec le nom du bloc-notes. Vous pouvez effectuer les opérations suivantes :
   + Consultez les paramètres du bloc-notes Studio dans la AWS Management Console.
   + Créez votre note Zeppelin et exportez-la sur Amazon S3. À ce stade, donnez un nom à votre application et choisissez **Générer et exporter**. Vous recevrez une notification lorsque l’exportation sera terminée.
   + Si nécessaire, vous pouvez consulter et exécuter des tests supplémentaires sur l’exécutable dans Amazon S3.
   + Une fois la compilation terminée, vous pourrez déployer votre code sous la forme d’une application de streaming Kinesis dotée d’un état durable et de l’autoscaling.
   + Utilisez le menu déroulant et choisissez **Déployer la note Zeppelin en tant qu’application de streaming Kinesis**. Vérifiez le nom de l'application et choisissez **Déployer via AWS la console**.
   + Cela vous mènera à la AWS Management Console page de création d'un service géré pour l'application Apache Flink. Notez que le nom de l’application, le parallélisme, l’emplacement du code, Glue DB par défaut, le VPC (le cas échéant) et les rôles IAM ont été préremplis. Vérifiez que les rôles IAM disposent des autorisations requises pour accéder à vos sources et destinations. Les instantanés sont activés par défaut pour la gestion des applications à état durable.
   + Choisissez **Créer une application**.
   + Vous pouvez choisir de **configurer** et de modifier tous les paramètres, puis choisir **Exécuter** pour démarrer votre application de streaming.

## Déployez une application durable à l'aide du AWS CLI
<a name="example-notebook-deploy-cli"></a>

Pour déployer une application à l'aide de AWS CLI, vous devez mettre à jour votre modèle de service AWS CLI afin d'utiliser le modèle de service fourni avec vos informations sur la version bêta 2. Pour obtenir des informations sur l’utilisation du modèle de service mis à jour, consultez [Remplir les conditions préalablesExécuter les opérations prérequises](example-notebook.md#example-notebook-setup).

L’exemple suivant permet de créer un nouveau bloc-notes Studio :

```
aws kinesisanalyticsv2 create-application \
     --application-name <app-name> \
     --runtime-environment ZEPPELIN-FLINK-3_0 \
     --application-mode INTERACTIVE \
     --service-execution-role <iam-role>
     --application-configuration '{ 
       "ZeppelinApplicationConfiguration": { 
         "CatalogConfiguration": { 
           "GlueDataCatalogConfiguration": { 
             "DatabaseARN": "arn:aws:glue:us-east-1:<account>:database/<glue-database-name>" 
           } 
         } 
       },
       "FlinkApplicationConfiguration": {
         "ParallelismConfiguration": {
           "ConfigurationType": "CUSTOM",
           "Parallelism": 4,
           "ParallelismPerKPU": 4
         }
       },
       "DeployAsApplicationConfiguration": {
            "S3ContentLocation": { 
               "BucketARN": "arn:aws:s3:::<s3bucket>",
               "BasePath": "/something/"
            }
        },
       "VpcConfigurations": [
         {
           "SecurityGroupIds": [
             "<security-group>"
           ],
           "SubnetIds": [
             "<subnet-1>",
             "<subnet-2>"
           ]
         }
       ]
     }' \
     --region us-east-1
```

L’exemple suivant permet de lancer un bloc-notes Studio :

```
aws kinesisanalyticsv2 start-application \
    --application-name <app-name> \
    --region us-east-1 \
    --no-verify-ssl
```

Le code suivant renvoie l’URL de la page du bloc-notes Apache Zeppelin d’une application :

```
aws kinesisanalyticsv2 create-application-presigned-url \
    --application-name <app-name> \
    --url-type ZEPPELIN_UI_URL \

    --region us-east-1 \
    --no-verify-ssl
```

# Afficher des exemples de requêtes pour analyser des données dans un bloc-notes Studio
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [

## Création de tableaux avec Amazon MSK/Apache Kafka
](#how-zeppelin-examples-creating-tables)
+ [

## Création de tables avec Kinesis
](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [

## Interrogez une fenêtre qui clignote
](#how-zeppelin-examples-tumbling)
+ [

## Interroger une fenêtre coulissante
](#how-zeppelin-examples-sliding)
+ [

## Utiliser du SQL interactif
](#how-zeppelin-examples-interactive-sql)
+ [

## Utiliser le connecteur BlackHole SQL
](#how-zeppelin-examples-blackhole-connector-sql)
+ [

## Utiliser Scala pour générer des exemples de données
](#notebook-example-data-generator)
+ [

## Utilisez Scala interactif
](#notebook-example-interactive-scala)
+ [

## Utiliser du Python interactif
](#notebook-example-interactive-python)
+ [

## Utilisez une combinaison de Python, SQL et Scala interactifs
](#notebook-example-interactive-pythonsqlscala)
+ [

## Utiliser un flux de données Kinesis entre comptes
](#notebook-example-crossaccount-kds)

Pour obtenir des informations sur les paramètres de requête SQL d’Apache Flink, consultez [Flink on Zeppelin Notebooks for Interactive Data Analysis](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html).

Pour afficher votre application dans le tableau de bord Apache Flink, choisissez **FLINK JOB** sur la page **Note Zeppelin** de votre application.

Pour plus d’informations sur les requêtes de fenêtre, consultez [Windows](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html) dans la [documentation Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

Pour d’autres exemples de requêtes SQL Apache Flink Streaming, consultez la section [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) de la [documentation Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

## Création de tableaux avec Amazon MSK/Apache Kafka
<a name="how-zeppelin-examples-creating-tables"></a>

Vous pouvez utiliser le connecteur Amazon MSK Flink avec le service géré pour Apache Flink Studio afin d’authentifier votre connexion à l’aide de l’authentification en texte brut, SSL ou IAM. Créez vos tables en utilisant les propriétés spécifiques selon vos besoins.

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

Vous pouvez les combiner avec d’autres propriétés sur le [connecteur SQL Apache Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/).

## Création de tables avec Kinesis
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

Dans l’exemple suivant, vous créez une table à l’aide de Kinesis :

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

Pour plus d’informations sur les autres propriétés que vous pouvez utiliser, consultez [Amazon Kinesis Data Streams SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/).

## Interrogez une fenêtre qui clignote
<a name="how-zeppelin-examples-tumbling"></a>

La requête SQL Flink Streaming suivante sélectionne le prix le plus élevé pour chaque fenêtre bascule de cinq secondes dans la table `ZeppelinTopic` :

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## Interroger une fenêtre coulissante
<a name="how-zeppelin-examples-sliding"></a>

La requête SQL Apache Flink Streaming suivante sélectionne le prix le plus élevé pour chaque fenêtre défilante de cinq secondes dans la table `ZeppelinTopic` :

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## Utiliser du SQL interactif
<a name="how-zeppelin-examples-interactive-sql"></a>

Cet exemple imprime la durée maximale de l’événement et le temps de traitement, ainsi que la somme des valeurs de la table des valeurs clés. Assurez-vous que vous disposez de l’exemple de script de génération de données de l’exécution [Utiliser Scala pour générer des exemples de données](#notebook-example-data-generator). Pour essayer d’autres requêtes SQL telles que le filtrage et les jointures dans votre bloc-notes Studio, consultez [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) dans la documentation Apache Flink.

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## Utiliser le connecteur BlackHole SQL
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

Le connecteur BlackHole SQL ne vous oblige pas à créer un flux de données Kinesis ou un cluster Amazon MSK pour tester vos requêtes. Pour plus d'informations sur le connecteur BlackHole SQL, consultez la section [Connecteur BlackHole SQL](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html) dans la documentation d'Apache Flink. Dans cet exemple, le catalogue par défaut est un catalogue en mémoire.

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## Utiliser Scala pour générer des exemples de données
<a name="notebook-example-data-generator"></a>

Cet exemple utilise Scala pour générer des exemples de données. Vous pouvez utiliser ces exemples de données pour tester différentes requêtes. Utilisez l’instruction create table pour créer la table des valeurs clés.

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## Utilisez Scala interactif
<a name="notebook-example-interactive-scala"></a>

Il s’agit de la traduction Scala de [Utiliser du SQL interactif](#how-zeppelin-examples-interactive-sql). Pour d’autres exemples de Scala, consultez [Table API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) dans la documentation d’Apache Flink.

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Utiliser du Python interactif
<a name="notebook-example-interactive-python"></a>

Il s’agit de la traduction Python de [Utiliser du SQL interactif](#how-zeppelin-examples-interactive-sql). Pour d’autres exemples de Python, consultez [Table API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) dans la documentation d’Apache Flink. 

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Utilisez une combinaison de Python, SQL et Scala interactifs
<a name="notebook-example-interactive-pythonsqlscala"></a>

Vous pouvez utiliser n’importe quelle combinaison de SQL, Python et Scala dans votre bloc-notes pour une analyse interactive. Dans un bloc-notes Studio que vous envisagez de déployer en tant qu’application à état durable, vous pouvez utiliser une combinaison de SQL et de Scala. Cet exemple montre les sections qui sont ignorées et celles qui sont déployées dans l’application à état durable.

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## Utiliser un flux de données Kinesis entre comptes
<a name="notebook-example-crossaccount-kds"></a>

Pour utiliser un flux de données Kinesis se trouvant dans un compte autre que le compte associé à votre bloc-notes Studio, créez un rôle d’exécution de service dans le compte sur lequel votre bloc-notes Studio est exécuté et une politique d’approbation des rôles dans le compte contenant le flux de données. Utilisez `aws.credentials.provider`, `aws.credentials.role.arn` et `aws.credentials.role.sessionName` dans le connecteur Kinesis dans votre instruction DDL de création de table pour créer une table par rapport au flux de données.

Utilisez le rôle d’exécution de service suivant pour le compte de bloc-notes Studio.

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

Utilisez la politique `AmazonKinesisFullAccess` et la politique d’approbation des rôles suivante pour le compte de flux de données.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

Utilisez le paragraphe suivant pour l’instruction de création de table.

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```

# Résoudre les problèmes liés aux blocs-notes Studio pour Managed Service pour Apache Flink
<a name="how-zeppelin-troubleshooting"></a>

Cette section contient des informations de dépannage pour les blocs-notes Studio.

## Arrêter une application bloquée
<a name="how-zeppelin-troubleshooting-stopping"></a>

Pour arrêter une application bloquée dans un état transitoire, appelez l'[StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)action avec le `Force` paramètre défini sur. `true` Pour plus d’informations, consultez [Application en cours d’exécution](https://docs.aws.amazon.com/managed-flink/latest/java/how-running-apps.html) dans le [Guide du développeur du service géré pour Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/).

## Déployez en tant qu'application à état durable dans un VPC sans accès à Internet
<a name="how-zeppelin-troubleshooting-deploying-no-internet"></a>

La deploy-as-application fonction Managed Service for Apache Flink Studio ne prend pas en charge les applications VPC sans accès à Internet. Nous vous recommandons de créer votre application dans Studio, puis d’utiliser le service géré pour Apache Flink pour créer manuellement une application Flink et sélectionner le fichier zip que vous avez créé dans votre bloc-notes.

La procédure suivante montre comment procéder : 

1. Créez et exportez votre application Studio vers Amazon S3. Il doit s’agir d’un fichier zip. 

1. Créez manuellement une application de service géré pour Apache Flink avec un chemin de code faisant référence à l’emplacement du fichier zip dans Amazon S3. En outre, vous devrez configurer l’application avec les variables `env` suivantes (2 `groupID`, 3 `var` au total) : 

1. kinesis.analytics.flink.run.options

   1. python : source/note.py

   1. fichier jar : PythonApplicationDependencies lib/ .jar

1. managed.deploy\$1as\$1app.options

   1. DatabasEarn : *<glue database ARN (Amazon Resource Name)>*

1. Vous devrez peut-être accorder des autorisations aux rôles IAM du service géré pour Apache Flink Studio et du service géré pour Apache Flink pour les services utilisés par votre application. Vous pouvez utiliser le même rôle IAM pour les deux applications.

## Deploy-as-app réduction de la taille et du temps de fabrication
<a name="how-zeppelin-troubleshooting-deploying-as-app-reduce-build-time"></a>

Les applications Studio deploy-as-app for Python regroupent tout ce qui est disponible dans l'environnement Python, car nous ne pouvons pas déterminer les bibliothèques dont vous avez besoin. Cela peut entraîner une taille plus grande que nécessaire deploy-as-app. La procédure suivante montre comment réduire la taille de l'application deploy-as-app Python en désinstallant les dépendances.

Si vous créez une application Python avec des deploy-as-app fonctionnalités de Studio, vous pouvez envisager de supprimer les packages Python préinstallés du système si vos applications n'en dépendent pas. Cela permettra non seulement de réduire la taille finale de l'artefact afin d'éviter de dépasser la limite de service pour la taille de l'application, mais également d'améliorer le temps de création des applications dotées de cette fonctionnalité. deploy-as-app

Vous pouvez exécuter la commande suivante pour répertorier tous les packages Python installés avec leur taille d’installation respective et supprimer de manière sélective les packages volumineux.

```
%flink.pyflink

!pip list --format freeze | awk -F = {'print $1'} | xargs pip show | grep -E 'Location:|Name:' | cut -d ' ' -f 2 | paste -d ' ' - - | awk '{gsub("-","_",$1); print $2 "/" tolower($1)}' | xargs du -sh 2> /dev/null | sort -hr
```

**Note**  
`apache-beam` est requis pour que Flink Python fonctionne. Vous ne devez jamais supprimer ce package et ses dépendances.

Voici la liste des packages Python préinstallés dans Studio V2 dont la suppression peut être envisagée :

```
scipy
statsmodels
plotnine
seaborn
llvmlite
bokeh
pandas
matplotlib
botocore
boto3
numba
```

**Pour supprimer un package Python du bloc-notes Zeppelin :**

1. Vérifiez si votre application dépend du package, ou de l’un de ses packages consommateurs, avant de le supprimer. Vous pouvez identifier les dépendances d’un package à l’aide de [pipdeptree](https://pypi.org/project/pipdeptree/).

1. Exécution de la commande suivante pour supprimer un package :

   ```
   %flink.pyflink
   !pip uninstall -y <package-to-remove>
   ```

1. Si vous devez récupérer un package que vous avez supprimé par erreur, exécutez la commande suivante :

   ```
   %flink.pyflink
   !pip install <package-to-install>
   ```

**Example Exemple : supprimez le `scipy` package avant de déployer votre application Python avec deploy-as-app une fonctionnalité.**  

1. Utilisez `pipdeptree` pour découvrir tous les consommateurs `scipy` et vérifier si vous pouvez supprimer `scipy` en toute sécurité.
   + Installez l’outil via un bloc-notes :

     ```
     %flink.pyflink             
     !pip install pipdeptree
     ```
   + Obtenez l’arbre de dépendance inversé de `scipy` en exécutant :

     ```
     %flink.pyflink
     !pip -r -p scipy
     ```

     Vous devriez voir des résultats similaires à ce qui suit (condensés par souci de brièveté) :

     ```
     ...
     ------------------------------------------------------------------------ 
     scipy==1.8.0 
     ├── plotnine==0.5.1 [requires: scipy>=1.0.0] 
     ├── seaborn==0.9.0 [requires: scipy>=0.14.0] 
     └── statsmodels==0.12.2 [requires: scipy>=1.1] 
         └── plotnine==0.5.1 [requires: statsmodels>=0.8.0]
     ```

1. Inspectez soigneusement l’utilisation de `seaborn`, `statsmodels` et `plotnine` dans vos applications. Si vos applications ne dépendent d’aucun des packages `scipy`, `seaborn`, `statemodels` ou `plotnine`, vous pouvez tous les supprimer ou uniquement ceux dont vos applications n’ont pas besoin.

1. Supprimez le package en exécutant :

   ```
   !pip uninstall -y scipy plotnine seaborn statemodels
   ```

## Annulation de tâches
<a name="how-notbook-canceling-jobs"></a>

Cette section explique comment annuler des tâches Apache Flink auxquelles vous ne pouvez pas accéder depuis Apache Zeppelin. Si vous souhaitez annuler une telle tâche, rendez-vous sur le tableau de bord Apache Flink, copiez l’ID de la tâche, puis utilisez-le dans l’un des exemples suivants.

Pour annuler une seule tâche :

```
%flink.pyflink
import requests

requests.patch("https://zeppelin-flink:8082/jobs/[job_id]", verify=False)
```

Pour annuler toutes les tâches en cours :

```
%flink.pyflink
import requests

r = requests.get("https://zeppelin-flink:8082/jobs", verify=False)
jobs = r.json()['jobs']

for job in jobs:
    if (job["status"] == "RUNNING"):
        print(requests.patch("https://zeppelin-flink:8082/jobs/{}".format(job["id"]), verify=False))
```

Pour annuler toutes les tâches :

```
%flink.pyflink
import requests

r = requests.get("https://zeppelin-flink:8082/jobs", verify=False)
jobs = r.json()['jobs']

for job in jobs:
    requests.patch("https://zeppelin-flink:8082/jobs/{}".format(job["id"]), verify=False)
```

## Redémarrez l'interpréteur Apache Flink
<a name="how-notbook-restarting-interpreter"></a>

Pour redémarrer l’interprète Apache Flink dans votre bloc-notes Studio

1. Choisissez **Configuration** dans le coin supérieur droit de l’écran.

1. Choisissez **Interprète**.

1. Choisissez **Redémarrer**, puis **OK**.

# Création de politiques IAM personnalisées pour le service géré pour les ordinateurs portables Apache Flink Studio
<a name="how-zeppelin-appendix-iam"></a>

Vous utilisez normalement des politiques IAM gérées pour permettre à votre application d’accéder à des ressources dépendantes. Si vous avez besoin d’un contrôle plus précis des autorisations de votre application, vous pouvez utiliser une politique IAM personnalisée. Cette section contient des exemples de politiques IAM personnalisées.

**Note**  
Dans les exemples de politique suivants, remplacez le texte de l’espace réservé par les valeurs de votre application.

**Topics**
+ [

## AWS Glue
](#how-zeppelin-iam-glue)
+ [

## CloudWatch Journaux
](#how-zeppelin-iam-cw)
+ [

## Flux Kinesis
](#how-zeppelin-iam-streams)
+ [

## Clusters Amazon MSK
](#how-zeppelin-iam-msk)

## AWS Glue
<a name="how-zeppelin-iam-glue"></a>

L'exemple de politique suivant accorde des autorisations pour accéder à une AWS Glue base de données.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "GlueTable",
            "Effect": "Allow",
            "Action": [
                "glue:GetConnection",
                "glue:GetTable",
                "glue:GetTables",
                "glue:GetDatabase",
                "glue:CreateTable",
                "glue:UpdateTable"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:123456789012:connection/*",
                "arn:aws:glue:us-east-1:123456789012:table/<database-name>/*",
                "arn:aws:glue:us-east-1:123456789012:database/<database-name>",
                "arn:aws:glue:us-east-1:123456789012:database/hive",
                "arn:aws:glue:us-east-1:123456789012:catalog"
            ]
        },
        {
            "Sid": "GlueDatabase",
            "Effect": "Allow",
            "Action": "glue:GetDatabases",
            "Resource": "*"
        }
    ]
}
```

------

## CloudWatch Journaux
<a name="how-zeppelin-iam-cw"></a>

La politique suivante accorde des autorisations d'accès aux CloudWatch journaux :

```
{
      "Sid": "ListCloudwatchLogGroups",
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogGroups"
      ],
      "Resource": [
        "arn:aws:logs:<region>:<accountId>:log-group:*"
      ]
    },
    {
      "Sid": "ListCloudwatchLogStreams",
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogStreams"
      ],
      "Resource": [
        "<logGroupArn>:log-stream:*"
      ]
    },
    {
      "Sid": "PutCloudwatchLogs",
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents"
      ],
      "Resource": [
        "<logStreamArn>"
      ]
    }
```

**Note**  
Si vous créez votre application à l'aide de la console, celle-ci ajoute les politiques nécessaires pour accéder aux CloudWatch journaux à votre rôle d'application.

## Flux Kinesis
<a name="how-zeppelin-iam-streams"></a>

Votre application peut utiliser un flux Kinesis pour une source ou une destination. Votre application a besoin d’autorisations de lecture pour lire à partir d’un flux source et d’autorisations d’écriture pour écrire dans un flux de destination.

La politique suivante accorde des autorisations de lecture à partir d’un flux Kinesis utilisé comme source :

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisShardDiscovery",
            "Effect": "Allow",
            "Action": "kinesis:ListShards",
            "Resource": "*"
        },
        {
            "Sid": "KinesisShardConsumption",
            "Effect": "Allow",
            "Action": [
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:DeregisterStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>"
        },
        {
            "Sid": "KinesisEfoConsumer",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamConsumer",
                "kinesis:SubscribeToShard"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>/consumer/*"
        }
    ]
}
```

------

La politique suivante accorde des autorisations d’écriture dans un flux Kinesis utilisé comme destination :

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisStreamSink",
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:DescribeStreamSummary",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>"
        }
    ]
}
```

------

Si votre application accède à un flux Kinesis crypté, vous devez accorder des autorisations supplémentaires pour accéder au flux et à la clé de chiffrement du flux. 

La politique suivante accorde des autorisations pour accéder à un flux source chiffré et à la clé de chiffrement du flux :

```
{
      "Sid": "ReadEncryptedKinesisStreamSource",
      "Effect": "Allow",
      "Action": [
        "kms:Decrypt"
      ],
      "Resource": [
        "<inputStreamKeyArn>"
      ]
    }
    ,
```

La politique suivante accorde des autorisations pour accéder à un flux de destination chiffré et à la clé de chiffrement du flux :

```
{
      "Sid": "WriteEncryptedKinesisStreamSink",
      "Effect": "Allow",
      "Action": [
        "kms:GenerateDataKey"
      ],
      "Resource": [
        "<outputStreamKeyArn>"
      ]
    }
```

## Clusters Amazon MSK
<a name="how-zeppelin-iam-msk"></a>

Pour accorder l’accès à un cluster Amazon MSK, vous accordez l’accès au VPC du cluster. Pour des exemples de stratégies d’accès à un VPC Amazon, consultez la section [Autorisations d’application VPC](https://docs.aws.amazon.com/managed-flink/latest/java/vpc-permissions.html).