Création d'une application à l'aide d'Apache Beam - Service géré pour Apache Flink

Le service géré Amazon pour Apache Flink était auparavant connu sous le nom d’Amazon Kinesis Data Analytics pour Apache Flink.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Création d'une application à l'aide d'Apache Beam

Dans cet exercice, vous allez créer une application de service géré pour Apache Flink qui transforme les données à l’aide d’Apache Beam. Apache Beam est un modèle de programmation pour le traitement des données de streaming. Pour obtenir des informations sur l’utilisation d’Apache Beam avec le service géré pour Apache Flink, consultez Utiliser Apache Beam avec un service géré pour les applications Apache Flink.

Note

Pour configurer les prérequis requis pour cet exercice, commencez par terminer l’exercice Tutoriel : Commencez à utiliser le DataStream API service géré pour Apache Flink.

Création de ressources dépendantes

Avant de créer une application de service géré pour Apache Flink dans le cadre de cet exercice, vous commencez par créer les ressources dépendantes suivantes :

  • Deux flux de données Kinesis (ExampleInputStream et ExampleOutputStream)

  • Un compartiment Amazon S3 pour stocker le code de l’application (ka-app-code-<username>)

Vous pouvez créer les flux Kinesis et un compartiment S3 à l’aide de la console. Pour obtenir des instructions sur la création de ces ressources, consultez les rubriques suivantes :

  • Création et mise à jour de flux de données dans le Guide du développeur Amazon Kinesis Data Streams. Nommez vos flux de données ExampleInputStream et ExampleOutputStream.

  • Comment créer un compartiment S3 ? dans le Guide de l’utilisateur de la console Amazon Simple Storage Service. Donnez au compartiment Amazon S3 un nom unique au monde en ajoutant votre nom de connexion, tel que ka-app-code-<username>.

Écrire des exemples d'enregistrements dans le flux d'entrée

Dans cette section, vous utilisez un script Python pour écrire des chaînes aléatoires dans le flux pour que l’application les traite.

Note

Cette section nécessite le kit AWS SDK for Python (Boto).

  1. Créez un fichier nommé ping.py avec le contenu suivant :

    import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
  2. Exécutez le script ping.py :

    $ python ping.py

    Maintenez le script en cours d’exécution pendant que vous terminez le reste du didacticiel.

Téléchargez et examinez le code de l'application

Le code de l'application Java pour cet exemple est disponible sur GitHub. Pour télécharger le code d’application, procédez comme suit :

  1. Installez le client Git si vous ne l’avez pas déjà fait. Pour plus d’informations, consultez Installation de Git.

  2. Cloner le référentiel distant à l’aide de la commande suivante :

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Accédez au répertoire amazon-kinesis-data-analytics-java-examples/Beam.

Le code d’application est situé dans le fichier BasicBeamStreamingJob.java. Notez les informations suivantes à propos du code d’application :

  • L'application utilise Apache Beam ParDopour traiter les enregistrements entrants en invoquant une fonction de transformation personnalisée appeléePingPongFn.

    Le code pour invoquer la fonction PingPongFn est le suivant :

    .apply("Pong transform", ParDo.of(new PingPongFn())
  • Les applications de service géré pour Apache Flink qui utilisent Apache Beam requièrent les composants suivants. Si vous n’incluez pas ces composants et versions dans votre fichier pom.xml, votre application charge des versions incorrectes à partir des dépendances de l’environnement, et comme les versions ne correspondent pas, votre application se bloque au moment de l’exécution.

    <jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
  • La fonction de transformation PingPongFn transmet les données d’entrée dans le flux de sortie, sauf si les données d’entrée sont un ping, auquel cas elle émet la chaîne pong\n vers le flux de sortie.

    Le code de la fonction de transformation est le suivant :

    private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }

Compilez le code de l'application

Pour compiler l’application, procédez comme suit :

  1. Installez Java et Maven si ce n’est pas déjà fait. Pour plus d’informations, consultez Remplissez les prérequis requis dans le didacticiel Tutoriel : Commencez à utiliser le DataStream API service géré pour Apache Flink.

  2. Compilez l’application à l’aide de la commande suivante :

    mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
    Note

    Le code source fourni repose sur les bibliothèques de Java 11.

La compilation de l'application crée le JAR fichier de l'application (target/basic-beam-app-1.0.jar).

Téléchargez le code Java de streaming Apache Flink

Dans cette section, vous allez charger votre code d’application dans le compartiment Amazon S3 que vous avez créé dans la section Création de ressources dépendantes.

  1. Dans la console Amazon S3, choisissez le ka-app-code -<username>bucket, puis choisissez Upload.

  2. À l’étape Sélectionner les fichiers, choisissez Ajouter des fichiers. Accédez au fichier basic-beam-app-1.0.jar que vous avez créé à l’étape précédente.

  3. Vous n’avez pas besoin de modifier les paramètres de l’objet, donc choisissez Charger.

Votre code d’application est désormais stocké dans un compartiment Amazon S3 auquel votre application peut accéder.

Création et exécution du service géré pour l'application Apache Flink

Suivez ces étapes pour créer, configurer, mettre à jour et exécuter l’application à l’aide de la console.

Pour créer l’application

  1. Ouvrez le service géré pour la console Apache Flink à https://console.aws.amazon.com l'adresse /flink

  2. Dans le tableau de bord du service géré pour Apache Flink, choisissez Créer une application d’analyse.

  3. Sur la page Service géré pour Apache Flink - Créer une application, fournissez les détails de l’application comme suit :

    • Pour Nom de l’application, saisissez MyApplication.

    • Pour Exécution, choisissez Apache Flink.

      Note

      Apache Beam n'est actuellement pas compatible avec Apache Flink version 1.19 ou ultérieure.

    • Sélectionnez Apache Flink version 1.15 dans le menu déroulant des versions.

  4. Pour les autorisations d'accès, choisissez Create/update IAM role kinesis-analytics-MyApplication-us-west-2.

  5. Choisissez Créer une application.

Note

Lorsque vous créez un service géré pour une application Apache Flink à l'aide de la console, vous avez la possibilité de créer un IAM rôle et une politique pour votre application. Votre application utilise ce rôle et cette politique pour accéder à ses ressources dépendantes. Ces IAM ressources sont nommées à l'aide du nom de votre application et de votre région, comme suit :

  • Stratégie : kinesis-analytics-service-MyApplication-us-west-2

  • Rôle : kinesis-analytics-MyApplication-us-west-2

Modifier la IAM politique

Modifiez la IAM politique pour ajouter des autorisations d'accès aux flux de données Kinesis.

  1. Ouvrez la IAM console à l'adresse https://console.aws.amazon.com/iam/.

  2. Choisissez Stratégies. Choisissez la politique kinesis-analytics-service-MyApplication-us-west-2 créée pour vous par la console dans la section précédente.

  3. Sur la page Récapitulatif, choisissez Modifier la politique. Choisissez l'JSONonglet.

  4. Ajoutez la section mise en surbrillance dans l’exemple de stratégie suivant à la politique. Remplacez le compte d'exemple IDs (012345678901) avec votre identifiant de compte.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Configuration de l'application

  1. Sur la MyApplicationpage, choisissez Configurer.

  2. Sur la page Configurer l’application, indiquez l’emplacement du code:

    • Pour le compartiment Amazon S3, saisissez ka-app-code-<username>.

    • Pour le chemin de l'objet Amazon S3, saisissez basic-beam-app-1.0.jar.

  3. Sous Accès aux ressources de l'application, pour les autorisations d'accès, choisissez Create/update IAM role kinesis-analytics-MyApplication-us-west-2.

  4. Saisissez :

    ID du groupe Clé Valeur
    BeamApplicationProperties InputStreamName ExampleInputStream
    BeamApplicationProperties OutputStreamName ExampleOutputStream
    BeamApplicationProperties AwsRegion us-west-2
  5. Sous Surveillance, assurez-vous que Surveillance du niveau des métriques est défini sur Application.

  6. Pour la CloudWatch journalisation, cochez la case Activer.

  7. Choisissez Mettre à jour.

Note

Lorsque vous choisissez d'activer la CloudWatch journalisation, le service géré pour Apache Flink crée un groupe de journaux et un flux de journaux pour vous. Les noms de ces ressources sont les suivants :

  • Groupe de journaux : /aws/kinesis-analytics/MyApplication

  • Flux de journaux : kinesis-analytics-log-stream

Ce flux de journaux est utilisé pour surveiller l’application. Il ne s’agit pas du même flux de journaux que celui utilisé par l’application pour envoyer les résultats.

Exécutez l'application

Le graphique des tâches Flink peut être visualisé en exécutant l’application, en ouvrant le tableau de bord Apache Flink et en choisissant la tâche Flink souhaitée.

Vous pouvez vérifier les métriques du service géré pour Apache Flink sur la CloudWatch console pour vérifier que l'application fonctionne.

Nettoyer les AWS ressources

Cette section inclut les procédures de nettoyage AWS des ressources créées dans le didacticiel Tumbling Window.

Supprimer votre application Managed Service for Apache Flink

  1. Ouvrez le service géré pour la console Apache Flink à https://console.aws.amazon.com l'adresse /flink

  2. dans le panneau Managed Service for Apache Flink, sélectionnez MyApplication.

  3. Sur la page de l’application, choisissez Supprimer, puis confirmez la suppression.

Supprimer vos flux de données Kinesis

  1. Ouvrez la console Kinesis à l'adresse /kinesis. https://console.aws.amazon.com

  2. Dans le panneau Kinesis Data Streams, ExampleInputStreamsélectionnez.

  3. Sur la ExampleInputStreampage, choisissez Supprimer Kinesis Stream, puis confirmez la suppression.

  4. Sur la page Kinesis Streams, choisissez le ExampleOutputStream, choisissez Actions, choisissez Supprimer, puis confirmez la suppression.

Supprimer votre objet et votre compartiment Amazon S3

  1. Ouvrez la console Amazon S3 à l'adresse https://console.aws.amazon.com/s3/.

  2. Choisissez le ka-app-code -<username> seau.

  3. Choisissez Supprimer, puis saisissez le nombre du compartiment pour confirmer la suppression.

Supprimer vos IAM ressources

  1. Ouvrez la IAM console à l'adresse https://console.aws.amazon.com/iam/.

  2. Dans la barre de navigation, choisissez Stratégies.

  3. Dans le contrôle du filtre, saisissez kinesis.

  4. Choisissez la politique kinesis-analytics-service- MyApplication -us-west-2.

  5. Choisissez Actions de stratégie, puis Supprimer.

  6. Dans la barre de navigation, choisissez Rôles.

  7. Choisissez le rôle kinesis-analytics- MyApplication -us-west-2.

  8. Choisissez Supprimer le rôle, puis confirmez la suppression.

Supprimer vos CloudWatch ressources

  1. Ouvrez la CloudWatch console à l'adresse https://console.aws.amazon.com/cloudwatch/.

  2. Dans la barre de navigation, choisissez Journaux.

  3. Choisissez le groupe de journaux MyApplication/aws/kinesis-analytics/.

  4. Choisissez Supprimer le groupe de journaux, puis confirmez la suppression.

Étapes suivantes

Maintenant que vous avez créé et exécuté une application basique de service géré pour Apache Flink qui transforme les données à l’aide d’Apache Beam, consultez l’application suivante pour un exemple de solution plus avancée de service géré pour Apache Flink.