

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Création d'une application à l'aide d'Apache Beam
<a name="examples-beam"></a>

Dans cet exercice, vous allez créer une application de service géré pour Apache Flink qui transforme les données à l’aide d’[Apache Beam](https://beam.apache.org/). Apache Beam est un modèle de programmation pour le traitement des données de streaming. Pour obtenir des informations sur l’utilisation d’Apache Beam avec le service géré pour Apache Flink, consultez [Utiliser Apache Beam avec un service géré pour les applications Apache Flink](how-creating-apps-beam.md).

**Note**  
Pour configurer les prérequis requis pour cet exercice, commencez par terminer l’exercice [Tutoriel : Commencez à utiliser l' DataStream API dans Managed Service pour Apache Flink](getting-started.md).

**Topics**
+ [Création de ressources dépendantes](#examples-beam-resources)
+ [Écrire des exemples d'enregistrements dans le flux d'entrée](#examples-beam-write)
+ [Téléchargez et examinez le code de l'application](#examples-beam-download)
+ [Compilez le code de l'application](#examples-beam-compile)
+ [Téléchargez le code Java de streaming Apache Flink](#examples-beam-upload)
+ [Création et exécution du service géré pour l'application Apache Flink](#examples-beam-create-run)
+ [Nettoyer les AWS ressources](#examples-beam-cleanup)
+ [Étapes suivantes](#examples-beam-nextsteps)

## Création de ressources dépendantes
<a name="examples-beam-resources"></a>

Avant de créer une application de service géré pour Apache Flink dans le cadre de cet exercice, vous commencez par créer les ressources dépendantes suivantes : 
+ Deux flux de données Kinesis (`ExampleInputStream` et `ExampleOutputStream`)
+ Un compartiment Amazon S3 pour stocker le code de l’application (`ka-app-code-<username>`) 

Vous pouvez créer les flux Kinesis et un compartiment S3 à l’aide de la console. Pour obtenir des instructions sur la création de ces ressources, consultez les rubriques suivantes :
+ [Création et mise à jour de flux de données](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) dans le *Guide du développeur Amazon Kinesis Data Streams*. Nommez vos flux de données **ExampleInputStream** et **ExampleOutputStream**.
+ [Comment créer un compartiment S3 ?](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html) dans le *Guide de l’utilisateur de la console Amazon Simple Storage Service*. Donnez au compartiment Amazon S3 un nom unique au monde en ajoutant votre nom de connexion, tel que **ka-app-code-*<username>***.

## Écrire des exemples d'enregistrements dans le flux d'entrée
<a name="examples-beam-write"></a>

Dans cette section, vous utilisez un script Python pour écrire des chaînes aléatoires dans le flux pour que l’application les traite.

**Note**  
Cette section nécessite le kit [AWS SDK pour Python (Boto)](https://aws.amazon.com/developers/getting-started/python/).

1. Créez un fichier nommé `ping.py` avec le contenu suivant :

   ```
   import json
   import boto3
   import random
   
   kinesis = boto3.client('kinesis')
   
   while True:
           data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat'])
           print(data)
           kinesis.put_record(
                   StreamName="ExampleInputStream",
                   Data=data,
                   PartitionKey="partitionkey")
   ```

1. Exécutez le script `ping.py` : 

   ```
   $ python ping.py
   ```

   Maintenez le script en cours d’exécution pendant que vous terminez le reste du didacticiel.

## Téléchargez et examinez le code de l'application
<a name="examples-beam-download"></a>

Le code de l'application Java pour cet exemple est disponible sur GitHub. Pour télécharger le code d’application, procédez comme suit :

1. Installez le client Git si vous ne l’avez pas déjà fait. Pour plus d’informations, consultez [Installation de Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git). 

1. Cloner le référentiel distant à l’aide de la commande suivante :

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
   ```

1. Accédez au répertoire `amazon-kinesis-data-analytics-java-examples/Beam`.

Le code d’application est situé dans le fichier `BasicBeamStreamingJob.java`. Notez les informations suivantes à propos du code d’application :
+ L'application utilise Apache Beam [ParDo](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/transforms/ParDo.html)pour traiter les enregistrements entrants en invoquant une fonction de transformation personnalisée appelée`PingPongFn`.

  Le code pour invoquer la fonction `PingPongFn` est le suivant :

  ```
  .apply("Pong transform",
      ParDo.of(new PingPongFn())
  ```
+ Les applications de service géré pour Apache Flink qui utilisent Apache Beam requièrent les composants suivants. Si vous n’incluez pas ces composants et versions dans votre fichier `pom.xml`, votre application charge des versions incorrectes à partir des dépendances de l’environnement, et comme les versions ne correspondent pas, votre application se bloque au moment de l’exécution.

  ```
  <jackson.version>2.10.2</jackson.version>
  ...
  <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-jaxb-annotations</artifactId>
      <version>2.10.2</version>
  </dependency>
  ```
+ La fonction de transformation `PingPongFn` transmet les données d’entrée dans le flux de sortie, sauf si les données d’entrée sont un **ping**, auquel cas elle émet la chaîne **pong\$1n** vers le flux de sortie. 

  Le code de la fonction de transformation est le suivant :

  ```
      private static class PingPongFn extends DoFn<KinesisRecord, byte[]> {
      private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class);
      
      @ProcessElement
      public void processElement(ProcessContext c) {
          String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8);
          if (content.trim().equalsIgnoreCase("ping")) {
              LOG.info("Ponged!");
              c.output("pong\n".getBytes(StandardCharsets.UTF_8));
          } else {
              LOG.info("No action for: " + content);
              c.output(c.element().getDataAsBytes());
          }
      }
  }
  ```

## Compilez le code de l'application
<a name="examples-beam-compile"></a>

Pour compiler l’application, procédez comme suit :

1. Installez Java et Maven si ce n’est pas déjà fait. Pour plus d’informations, consultez [Complétez les prérequis requis](getting-started.md#setting-up-prerequisites) dans le didacticiel [Tutoriel : Commencez à utiliser l' DataStream API dans Managed Service pour Apache Flink](getting-started.md).

1. Compilez l’application à l’aide de la commande suivante : 

   ```
   mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
   ```
**Note**  
Le code source fourni repose sur les bibliothèques de Java 11. 

La compilation de l’application crée le fichier JAR de l’application (`target/basic-beam-app-1.0.jar`).

## Téléchargez le code Java de streaming Apache Flink
<a name="examples-beam-upload"></a>

Dans cette section, vous allez charger votre code d’application dans le compartiment Amazon S3 que vous avez créé dans la section [Création de ressources dépendantes](#examples-beam-resources).

1. Dans la console Amazon S3, choisissez le *<username>* compartiment **ka-app-code-**, puis **Upload**.

1. À l’étape **Sélectionner les fichiers**, choisissez **Ajouter des fichiers**. Accédez au fichier `basic-beam-app-1.0.jar` que vous avez créé à l’étape précédente. 

1. Vous n’avez pas besoin de modifier les paramètres de l’objet, donc choisissez **Charger**.

Votre code d’application est désormais stocké dans un compartiment Amazon S3 auquel votre application peut accéder.

## Création et exécution du service géré pour l'application Apache Flink
<a name="examples-beam-create-run"></a>

Suivez ces étapes pour créer, configurer, mettre à jour et exécuter l’application à l’aide de la console.

### Pour créer l’application
<a name="examples-beam-create"></a>

1. Connectez-vous à la AWS Management Console console Amazon MSF et ouvrez-la à https://console.aws.amazon.com l'adresse /flink.

1. Dans le tableau de bord du service géré pour Apache Flink, choisissez **Créer une application d’analyse**.

1. Sur la page **Service géré pour Apache Flink - Créer une application**, fournissez les détails de l’application comme suit :
   + Pour **Nom de l’application**, saisissez **MyApplication**.
   + Pour **Exécution**, choisissez **Apache Flink**.
**Note**  
Apache Beam n'est actuellement pas compatible avec Apache Flink version 1.19 ou ultérieure.
   + Sélectionnez **Apache Flink version 1.15 dans le menu** déroulant des versions.

1. Pour **Autorisations d’accès**, choisissez **Créer/mettre à jour un rôle IAM) `kinesis-analytics-MyApplication-us-west-2`**.

1. Choisissez **Créer une application**.

**Note**  
Lorsque vous créez une application de service géré pour Apache Flink à l’aide de la console, vous avez la possibilité de créer un rôle et une politique IAM pour votre application. Votre application utilise ce rôle et cette politique pour accéder à ses ressources dépendantes. Ces ressources IAM sont nommées en utilisant le nom de votre application et la région, comme suit :  
Stratégie : `kinesis-analytics-service-MyApplication-us-west-2`
Rôle : `kinesis-analytics-MyApplication-us-west-2`

### Modifier la politique IAM
<a name="get-started-exercise-7-console-iam"></a>

Modifiez la politique IAM pour ajouter des autorisations afin d'accéder aux flux de données Kinesis.

1. Ouvrez la console IAM à l’adresse [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Choisissez **Stratégies**. Choisissez la politique **`kinesis-analytics-service-MyApplication-us-west-2`** créée pour vous par la console dans la section précédente. 

1. Sur la page **Récapitulatif**, choisissez **Modifier la politique**. Sélectionnez l’onglet **JSON**.

1. Ajoutez la section mise en surbrillance dans l’exemple de stratégie suivant à la politique. Remplacez le compte d'exemple IDs (*012345678901*) par votre identifiant de compte.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "logs:DescribeLogGroups",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*",
                   "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar"
               ]
           },
           {
               "Sid": "DescribeLogStreams",
               "Effect": "Allow",
               "Action": "logs:DescribeLogStreams",
               "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
           },
           {
               "Sid": "PutLogEvents",
               "Effect": "Allow",
               "Action": "logs:PutLogEvents",
               "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

### Configuration de l'application
<a name="examples-beam-configure"></a>

1. Sur la **MyApplication**page, choisissez **Configurer**.

1. Sur la page **Configurer l’application**, indiquez l’**emplacement du code**:
   + Pour le compartiment **Amazon S3**, saisissez **ka-app-code-*<username>***.
   + Pour le **chemin de l'objet Amazon S3**, saisissez **basic-beam-app-1.0.jar**.

1. Sous **Accéder aux ressources de l’application**, pour **Autorisations d’accès**, choisissez **Créer/mettre à jour un rôle IAM `kinesis-analytics-MyApplication-us-west-2`**.

1. Saisissez :    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/managed-flink/latest/java/examples-beam.html)

1. Sous **Surveillance**, assurez-vous que **Surveillance du niveau des métriques** est défini sur **Application**.

1. Pour la **CloudWatch journalisation**, cochez la case **Activer**.

1. Choisissez **Mettre à jour**.

**Note**  
Lorsque vous choisissez d'activer la CloudWatch journalisation, le service géré pour Apache Flink crée un groupe de journaux et un flux de journaux pour vous. Les noms de ces ressources sont les suivants :   
Groupe de journaux : `/aws/kinesis-analytics/MyApplication`
Flux de journaux : `kinesis-analytics-log-stream`
Ce flux de journaux est utilisé pour surveiller l’application. Il ne s’agit pas du même flux de journaux que celui utilisé par l’application pour envoyer les résultats.

### Exécutez l'application
<a name="examples-beam-run"></a>

Le graphique des tâches Flink peut être visualisé en exécutant l’application, en ouvrant le tableau de bord Apache Flink et en choisissant la tâche Flink souhaitée.

Vous pouvez vérifier les métriques du service géré pour Apache Flink sur la CloudWatch console pour vérifier que l'application fonctionne. 

## Nettoyer les AWS ressources
<a name="examples-beam-cleanup"></a>

Cette section inclut les procédures de nettoyage AWS des ressources créées dans le didacticiel Tumbling Window.

**Topics**
+ [Supprimer votre application Managed Service for Apache Flink](#examples-beam-cleanup-app)
+ [Supprimer vos flux de données Kinesis](#examples-beam-cleanup-stream)
+ [Supprimer votre objet et votre compartiment Amazon S3](#examples-beam-cleanup-s3)
+ [Supprimer vos ressources IAM](#examples-beam-cleanup-iam)
+ [Supprimer vos CloudWatch ressources](#examples-beam-cleanup-cw)

### Supprimer votre application Managed Service for Apache Flink
<a name="examples-beam-cleanup-app"></a>

1. Connectez-vous à la AWS Management Console console Amazon MSF et ouvrez-la à https://console.aws.amazon.com l'adresse /flink.

1. dans le panneau Managed Service for Apache Flink, sélectionnez **MyApplication**.

1. Sur la page de l’application, choisissez **Supprimer**, puis confirmez la suppression.

### Supprimer vos flux de données Kinesis
<a name="examples-beam-cleanup-stream"></a>

1. [Ouvrez la console Kinesis à l'adresse /kinesis. https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. Dans le panneau Kinesis Data Streams, **ExampleInputStream**sélectionnez.

1. Sur la **ExampleInputStream**page, choisissez **Supprimer Kinesis Stream**, puis confirmez la suppression.

1. Sur la page **Kinesis Streams**, choisissez le **ExampleOutputStream**, choisissez **Actions**, choisissez **Supprimer**, puis confirmez la suppression.

### Supprimer votre objet et votre compartiment Amazon S3
<a name="examples-beam-cleanup-s3"></a>

1. Ouvrez la console Amazon S3 à l'adresse [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Choisissez le ***<username>*compartiment ka-app-code -.**

1. Choisissez **Supprimer**, puis saisissez le nombre du compartiment pour confirmer la suppression.

### Supprimer vos ressources IAM
<a name="examples-beam-cleanup-iam"></a>

1. Ouvrez la console IAM à l’adresse [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Dans la barre de navigation, choisissez **Stratégies**.

1. Dans le contrôle du filtre, saisissez **kinesis**.

1. Choisissez la politique **kinesis-analytics-service- MyApplication -us-west-2**.

1. Choisissez **Actions de stratégie**, puis **Supprimer**.

1. Dans la barre de navigation, choisissez **Rôles**.

1. Choisissez le rôle **kinesis-analytics- MyApplication** -us-west-2.

1. Choisissez **Supprimer le rôle**, puis confirmez la suppression.

### Supprimer vos CloudWatch ressources
<a name="examples-beam-cleanup-cw"></a>

1. Ouvrez la CloudWatch console à l'adresse [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. Dans la barre de navigation, choisissez **Journaux**.

1. Choisissez le groupe**/aws/kinesis-analytics/MyApplication**log.

1. Choisissez **Supprimer le groupe de journaux**, puis confirmez la suppression.

## Étapes suivantes
<a name="examples-beam-nextsteps"></a>

Maintenant que vous avez créé et exécuté une application basique de service géré pour Apache Flink qui transforme les données à l’aide d’Apache Beam, consultez l’application suivante pour un exemple de solution plus avancée de service géré pour Apache Flink.
+ ** [Atelier de streaming Beam sur le service géré pour Apache Flink](https://streaming-analytics.workshop.aws/beam-on-kda/)** : dans cet atelier, nous explorons un exemple de bout en bout qui combine les aspects de lots et de streaming dans un pipeline Apache Beam uniforme. 