Le service géré Amazon pour Apache Flink était auparavant connu sous le nom d’Amazon Kinesis Data Analytics pour Apache Flink.
Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Création d'une application à l'aide d'Apache Beam
Dans cet exercice, vous allez créer une application de service géré pour Apache Flink qui transforme les données à l’aide d’Apache Beam
Note
Pour configurer les prérequis requis pour cet exercice, commencez par terminer l’exercice Tutoriel : Commencez à utiliser le DataStream API service géré pour Apache Flink.
Cette rubrique contient les sections suivantes :
- Création de ressources dépendantes
- Écrire des exemples d'enregistrements dans le flux d'entrée
- Téléchargez et examinez le code de l'application
- Compilez le code de l'application
- Téléchargez le code Java de streaming Apache Flink
- Création et exécution du service géré pour l'application Apache Flink
- Nettoyer les AWS ressources
- Étapes suivantes
Création de ressources dépendantes
Avant de créer une application de service géré pour Apache Flink dans le cadre de cet exercice, vous commencez par créer les ressources dépendantes suivantes :
Deux flux de données Kinesis (
ExampleInputStream
etExampleOutputStream
)Un compartiment Amazon S3 pour stocker le code de l’application (
ka-app-code-
)<username>
Vous pouvez créer les flux Kinesis et un compartiment S3 à l’aide de la console. Pour obtenir des instructions sur la création de ces ressources, consultez les rubriques suivantes :
Création et mise à jour de flux de données dans le Guide du développeur Amazon Kinesis Data Streams. Nommez vos flux de données
ExampleInputStream
etExampleOutputStream
.Comment créer un compartiment S3 ? dans le Guide de l’utilisateur de la console Amazon Simple Storage Service. Donnez au compartiment Amazon S3 un nom unique au monde en ajoutant votre nom de connexion, tel que
ka-app-code-
.<username>
Écrire des exemples d'enregistrements dans le flux d'entrée
Dans cette section, vous utilisez un script Python pour écrire des chaînes aléatoires dans le flux pour que l’application les traite.
Note
Cette section nécessite le kit AWS SDK for Python (Boto)
-
Créez un fichier nommé
ping.py
avec le contenu suivant :import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
-
Exécutez le script
ping.py
:$ python ping.py
Maintenez le script en cours d’exécution pendant que vous terminez le reste du didacticiel.
Téléchargez et examinez le code de l'application
Le code de l'application Java pour cet exemple est disponible sur GitHub. Pour télécharger le code d’application, procédez comme suit :
Installez le client Git si vous ne l’avez pas déjà fait. Pour plus d’informations, consultez Installation de Git
. Cloner le référentiel distant à l’aide de la commande suivante :
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
Accédez au répertoire
amazon-kinesis-data-analytics-java-examples/Beam
.
Le code d’application est situé dans le fichier BasicBeamStreamingJob.java
. Notez les informations suivantes à propos du code d’application :
L'application utilise Apache Beam ParDo
pour traiter les enregistrements entrants en invoquant une fonction de transformation personnalisée appelée PingPongFn
.Le code pour invoquer la fonction
PingPongFn
est le suivant :.apply("Pong transform", ParDo.of(new PingPongFn())
Les applications de service géré pour Apache Flink qui utilisent Apache Beam requièrent les composants suivants. Si vous n’incluez pas ces composants et versions dans votre fichier
pom.xml
, votre application charge des versions incorrectes à partir des dépendances de l’environnement, et comme les versions ne correspondent pas, votre application se bloque au moment de l’exécution.<jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
La fonction de transformation
PingPongFn
transmet les données d’entrée dans le flux de sortie, sauf si les données d’entrée sont un ping, auquel cas elle émet la chaîne pong\n vers le flux de sortie.Le code de la fonction de transformation est le suivant :
private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }
Compilez le code de l'application
Pour compiler l’application, procédez comme suit :
Installez Java et Maven si ce n’est pas déjà fait. Pour plus d’informations, consultez Remplissez les prérequis requis dans le didacticiel Tutoriel : Commencez à utiliser le DataStream API service géré pour Apache Flink.
Compilez l’application à l’aide de la commande suivante :
mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
Note
Le code source fourni repose sur les bibliothèques de Java 11.
La compilation de l'application crée le JAR fichier de l'application (target/basic-beam-app-1.0.jar
).
Téléchargez le code Java de streaming Apache Flink
Dans cette section, vous allez charger votre code d’application dans le compartiment Amazon S3 que vous avez créé dans la section Création de ressources dépendantes.
-
Dans la console Amazon S3, choisissez le ka-app-code -
<username>
bucket, puis choisissez Upload. -
À l’étape Sélectionner les fichiers, choisissez Ajouter des fichiers. Accédez au fichier
basic-beam-app-1.0.jar
que vous avez créé à l’étape précédente. Vous n’avez pas besoin de modifier les paramètres de l’objet, donc choisissez Charger.
Votre code d’application est désormais stocké dans un compartiment Amazon S3 auquel votre application peut accéder.
Création et exécution du service géré pour l'application Apache Flink
Suivez ces étapes pour créer, configurer, mettre à jour et exécuter l’application à l’aide de la console.
Pour créer l’application
Ouvrez le service géré pour la console Apache Flink à https://console.aws.amazon.com l'adresse /flink
-
Dans le tableau de bord du service géré pour Apache Flink, choisissez Créer une application d’analyse.
-
Sur la page Service géré pour Apache Flink - Créer une application, fournissez les détails de l’application comme suit :
-
Pour Nom de l’application, saisissez
MyApplication
. -
Pour Exécution, choisissez Apache Flink.
Note
Apache Beam n'est actuellement pas compatible avec Apache Flink version 1.19 ou ultérieure.
Sélectionnez Apache Flink version 1.15 dans le menu déroulant des versions.
-
-
Pour les autorisations d'accès, choisissez Create/update IAM role
kinesis-analytics-MyApplication-us-west-2
. -
Choisissez Créer une application.
Note
Lorsque vous créez un service géré pour une application Apache Flink à l'aide de la console, vous avez la possibilité de créer un IAM rôle et une politique pour votre application. Votre application utilise ce rôle et cette politique pour accéder à ses ressources dépendantes. Ces IAM ressources sont nommées à l'aide du nom de votre application et de votre région, comme suit :
-
Stratégie :
kinesis-analytics-service-
MyApplication
-us-west-2
-
Rôle :
kinesis-analytics-MyApplication-
us-west-2
Modifier la IAM politique
Modifiez la IAM politique pour ajouter des autorisations d'accès aux flux de données Kinesis.
Ouvrez la IAM console à l'adresse https://console.aws.amazon.com/iam/
. -
Choisissez Stratégies. Choisissez la politique
kinesis-analytics-service-MyApplication-us-west-2
créée pour vous par la console dans la section précédente. -
Sur la page Récapitulatif, choisissez Modifier la politique. Choisissez l'JSONonglet.
-
Ajoutez la section mise en surbrillance dans l’exemple de stratégie suivant à la politique. Remplacez le compte d'exemple IDs (
012345678901
) avec votre identifiant de compte.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:
012345678901
:log-group:*", "arn:aws:s3:::ka-app-code-<username>
/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" }
Configuration de l'application
-
Sur la MyApplicationpage, choisissez Configurer.
-
Sur la page Configurer l’application, indiquez l’emplacement du code:
-
Pour le compartiment Amazon S3, saisissez
ka-app-code-
.<username>
-
Pour le chemin de l'objet Amazon S3, saisissez
basic-beam-app-1.0.jar
.
-
-
Sous Accès aux ressources de l'application, pour les autorisations d'accès, choisissez Create/update IAM role
kinesis-analytics-MyApplication-us-west-2
. -
Saisissez :
ID du groupe Clé Valeur BeamApplicationProperties
InputStreamName
ExampleInputStream
BeamApplicationProperties
OutputStreamName
ExampleOutputStream
BeamApplicationProperties
AwsRegion
us-west-2
-
Sous Surveillance, assurez-vous que Surveillance du niveau des métriques est défini sur Application.
-
Pour la CloudWatch journalisation, cochez la case Activer.
-
Choisissez Mettre à jour.
Note
Lorsque vous choisissez d'activer la CloudWatch journalisation, le service géré pour Apache Flink crée un groupe de journaux et un flux de journaux pour vous. Les noms de ces ressources sont les suivants :
-
Groupe de journaux :
/aws/kinesis-analytics/MyApplication
-
Flux de journaux :
kinesis-analytics-log-stream
Ce flux de journaux est utilisé pour surveiller l’application. Il ne s’agit pas du même flux de journaux que celui utilisé par l’application pour envoyer les résultats.
Exécutez l'application
Le graphique des tâches Flink peut être visualisé en exécutant l’application, en ouvrant le tableau de bord Apache Flink et en choisissant la tâche Flink souhaitée.
Vous pouvez vérifier les métriques du service géré pour Apache Flink sur la CloudWatch console pour vérifier que l'application fonctionne.
Nettoyer les AWS ressources
Cette section inclut les procédures de nettoyage AWS des ressources créées dans le didacticiel Tumbling Window.
Cette rubrique contient les sections suivantes :
Supprimer votre application Managed Service for Apache Flink
Ouvrez le service géré pour la console Apache Flink à https://console.aws.amazon.com l'adresse /flink
dans le panneau Managed Service for Apache Flink, sélectionnez MyApplication.
Sur la page de l’application, choisissez Supprimer, puis confirmez la suppression.
Supprimer vos flux de données Kinesis
Ouvrez la console Kinesis à l'adresse /kinesis. https://console.aws.amazon.com
Dans le panneau Kinesis Data Streams, ExampleInputStreamsélectionnez.
Sur la ExampleInputStreampage, choisissez Supprimer Kinesis Stream, puis confirmez la suppression.
Sur la page Kinesis Streams, choisissez le ExampleOutputStream, choisissez Actions, choisissez Supprimer, puis confirmez la suppression.
Supprimer votre objet et votre compartiment Amazon S3
Ouvrez la console Amazon S3 à l'adresse https://console.aws.amazon.com/s3/
. Choisissez le ka-app-code -
<username>
seau.Choisissez Supprimer, puis saisissez le nombre du compartiment pour confirmer la suppression.
Supprimer vos IAM ressources
Ouvrez la IAM console à l'adresse https://console.aws.amazon.com/iam/
. Dans la barre de navigation, choisissez Stratégies.
Dans le contrôle du filtre, saisissez kinesis.
Choisissez la politique kinesis-analytics-service- MyApplication -us-west-2.
Choisissez Actions de stratégie, puis Supprimer.
Dans la barre de navigation, choisissez Rôles.
Choisissez le rôle kinesis-analytics- MyApplication -us-west-2.
Choisissez Supprimer le rôle, puis confirmez la suppression.
Supprimer vos CloudWatch ressources
Ouvrez la CloudWatch console à l'adresse https://console.aws.amazon.com/cloudwatch/
. Dans la barre de navigation, choisissez Journaux.
Choisissez le groupe de journaux MyApplication/aws/kinesis-analytics/.
Choisissez Supprimer le groupe de journaux, puis confirmez la suppression.
Étapes suivantes
Maintenant que vous avez créé et exécuté une application basique de service géré pour Apache Flink qui transforme les données à l’aide d’Apache Beam, consultez l’application suivante pour un exemple de solution plus avancée de service géré pour Apache Flink.
Atelier de streaming Beam sur le service géré pour Apache Flink
: dans cet atelier, nous explorons un exemple de bout en bout qui combine les aspects de lots et de streaming dans un pipeline Apache Beam uniforme.