Le service géré Amazon pour Apache Flink était auparavant connu sous le nom d’Amazon Kinesis Data Analytics pour Apache Flink.
Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Création et exécution d'un service géré pour l'application Apache Flink pour Python
Dans cette section, vous allez créer un service géré pour une application Apache Flink pour Python avec un flux Kinesis comme source et comme récepteur.
Cette section contient les étapes suivantes.
- Création de ressources dépendantes
- Configuration de votre environnement de développement local
- Téléchargez et examinez le code Python de streaming d'Apache Flink
- Gérer les JAR dépendances
- Écrire des exemples d'enregistrements dans le flux d'entrée
- Exécutez votre application localement
- Observez les données d'entrée et de sortie dans les flux Kinesis
- Arrêtez l'exécution locale de votre application
- Package du code de votre application
- Téléchargez le package d'application dans un compartiment Amazon S3
- Création et configuration du service géré pour l'application Apache Flink
- Étape suivante
Création de ressources dépendantes
Avant de créer un service géré pour Apache Flink dans le cadre de cet exercice, vous commencez par créer les ressources dépendantes suivantes :
-
Deux flux Kinesis pour l’entrée et la sortie.
-
Un compartiment Amazon S3 pour stocker le code de l'application.
Note
Ce didacticiel part du principe que vous déployez votre application dans la région us-east-1. Si vous utilisez une autre région, vous devez adapter toutes les étapes en conséquence.
Création de deux flux Kinesis
Avant de créer une application Managed Service for Apache Flink pour cet exercice, créez deux flux de données Kinesis ExampleInputStream
(ExampleOutputStream
et) dans la même région que vous utiliserez pour déployer votre application (us-east-1 dans cet exemple). Votre application utilise ces flux pour les flux source et de destination de l’application.
Vous pouvez créer ces flux à l’aide de la console Amazon Kinesis ou de la commande AWS CLI suivante. Pour obtenir des instructions sur la console, consultez Création et mise à jour de flux de données dans le Guide du développeur Amazon Kinesis Data Streams.
Pour créer les flux de données (AWS CLI)
-
Pour créer le premier flux (
ExampleInputStream
), utilisez la commande Amazon Kinesiscreate-stream
AWS CLI suivante.$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1
-
Pour créer le second flux utilisé par l’application pour écrire la sortie, exécutez la même commande en remplaçant le nom du flux par
ExampleOutputStream
.$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1
Créer un compartiment Amazon S3
Vous pouvez créer un compartiment Amazon S3 à l’aide de la console. Pour obtenir les instructions relatives à la création de cette ressource, consultez les rubriques suivantes :
-
Comment créer un compartiment S3 ? dans le Guide de l’utilisateur de la console Amazon Simple Storage Service. Donnez au compartiment Amazon S3 un nom unique au monde, par exemple en ajoutant votre nom de connexion.
Note
Assurez-vous de créer le compartiment S3 dans la région que vous utilisez pour ce didacticiel (us-east-1).
Autres ressources
Lorsque vous créez votre application, Managed Service for Apache Flink crée les CloudWatch ressources Amazon suivantes si elles n'existent pas déjà :
-
Un groupe de journaux appelé
/AWS/KinesisAnalytics-java/<my-application>
. -
Un flux de journaux appelé
kinesis-analytics-log-stream
Configuration de votre environnement de développement local
Pour le développement et le débogage, vous pouvez exécuter l'application Python Flink sur votre machine. Vous pouvez démarrer l'application depuis la ligne de commande avec python
main.py
ou dans le Python IDE de votre choix.
Note
Python 3.10 ou 3.11, Java 11, Apache Maven et Git doivent être installés sur votre machine de développement. Nous vous recommandons d'utiliser un IDE tel que PyCharm
Installation de la PyFlink bibliothèque
Pour développer votre application et l'exécuter localement, vous devez installer la bibliothèque Python Flink.
-
Créez un environnement Python autonome à l'aide VirtualEnv de Conda ou de tout autre outil Python similaire.
-
Installez la PyFlink bibliothèque dans cet environnement. Utilisez la même version d'exécution d'Apache Flink que celle que vous utiliserez dans Amazon Managed Service pour Apache Flink. Actuellement, le temps d'exécution recommandé est 1.19.1.
$ pip install apache-flink==1.19.1
-
Assurez-vous que l'environnement est actif lorsque vous exécutez votre application. Si vous exécutez l'application dans leIDE, assurez-vous qu'IDEil utilise l'environnement comme environnement d'exécution. Le processus dépend de celui IDE que vous utilisez.
Note
Il vous suffit d'installer la PyFlink bibliothèque. Il n'est pas nécessaire d'installer un cluster Apache Flink sur votre machine.
Authentifiez votre session AWS
L'application utilise les flux de données Kinesis pour publier des données. Lors de l'exécution locale, vous devez disposer d'une session AWS authentifiée valide avec les autorisations nécessaires pour écrire dans le flux de données Kinesis. Procédez comme suit pour authentifier votre session :
-
Si vous n'avez pas configuré le profil AWS CLI et un profil nommé avec des informations d'identification valides, consultezConfigurez le AWS Command Line Interface (AWS CLI).
-
Vérifiez que vous êtes correctement AWS CLI configuré et que vos utilisateurs sont autorisés à écrire dans le flux de données Kinesis en publiant l'enregistrement de test suivant :
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
-
Si vous IDE avez un plugin à intégrer AWS, vous pouvez l'utiliser pour transmettre les informations d'identification à l'application qui s'exécute dans leIDE. Pour plus d'informations, consultez AWS Toolkit for PyCharm
, AWS Toolkit for Visual Studio Code et AWS Toolkit for IntelliJ IDEA .
Téléchargez et examinez le code Python de streaming d'Apache Flink
Le code de l'application Python pour cet exemple est disponible sur GitHub. Pour télécharger le code d’application, procédez comme suit :
-
Cloner le référentiel distant à l’aide de la commande suivante :
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
-
Accédez au répertoire
./python/GettingStarted
.
Vérifiez les composants de l'application
Le code de l'application se trouve dansmain.py
. Nous utilisons SQL Embedded in Python pour définir le flux de l'application.
Note
Pour une expérience de développement optimisée, l'application est conçue pour s'exécuter sans aucune modification de code à la fois sur Amazon Managed Service pour Apache Flink et localement, pour le développement sur votre machine. L'application utilise la variable IS_LOCAL =
true
d'environnement pour détecter si elle est exécutée localement. Vous devez définir la variable d'environnement IS_LOCAL = true
soit sur votre shell, soit dans la configuration d'exécution de votreIDE.
-
L'application configure l'environnement d'exécution et lit la configuration d'exécution. Pour fonctionner à la fois sur Amazon Managed Service pour Apache Flink et localement, l'application vérifie la
IS_LOCAL
variable.-
Voici le comportement par défaut lorsque l'application s'exécute dans Amazon Managed Service pour Apache Flink :
-
Chargez les dépendances fournies avec l'application. Pour plus d'informations, voir (lien)
-
Chargez la configuration à partir des propriétés d'exécution que vous définissez dans l'application Amazon Managed Service for Apache Flink. Pour plus d'informations, voir (lien)
-
-
Lorsque l'application détecte
IS_LOCAL = true
que vous l'exécutez localement :-
Charge les dépendances externes depuis le projet.
-
Charge la configuration à partir du
application_properties.json
fichier inclus dans le projet.... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
-
-
-
L'application définit une table source avec une
CREATE TABLE
instruction, à l'aide du connecteur Kinesis. Ce tableau lit les données du flux Kinesis en entrée. L'application prend le nom du flux, la région et la position initiale à partir de la configuration d'exécution. table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """)
-
Dans cet exemple, l'application définit également une table réceptrice à l'aide du connecteur Kinesis
. Ce conte envoie des données au flux Kinesis de sortie. table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""")
-
Enfin, l'application exécute une table SQL réceptrice
INSERT INTO...
à partir de la table source. Dans une application plus complexe, vous devez probablement effectuer des étapes supplémentaires pour transformer les données avant de les écrire dans le récepteur.table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""")
-
Vous devez ajouter une autre étape à la fin de la
main()
fonction pour exécuter l'application localement :if is_local: table_result.wait()
Sans cette instruction, l'application s'arrête immédiatement lorsque vous l'exécutez localement. Vous ne devez pas exécuter cette instruction lorsque vous exécutez votre application dans Amazon Managed Service pour Apache Flink.
Gérer les JAR dépendances
Une PyFlink application nécessite généralement un ou plusieurs connecteurs. L'application présentée dans ce didacticiel utilise le connecteur Kinesis
Dans cet exemple, nous montrons comment utiliser Apache Maven pour récupérer les dépendances et empaqueter l'application à exécuter sur le service géré pour Apache Flink.
Note
Il existe d'autres méthodes pour récupérer et empaqueter les dépendances. Cet exemple illustre une méthode qui fonctionne correctement avec un ou plusieurs connecteurs. Il vous permet également d'exécuter l'application localement, à des fins de développement et sur le service géré pour Apache Flink sans modifier le code.
Utilisez le fichier pom.xml
Apache Maven utilise le pom.xml
fichier pour contrôler les dépendances et le packaging des applications.
Toutes JAR les dépendances sont spécifiées dans le pom.xml
fichier du <dependencies>...</dependencies>
bloc.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...
Pour trouver l'artefact et la version du connecteur appropriés à utiliser, voirUtiliser les connecteurs Apache Flink avec le service géré pour Apache Flink. Assurez-vous de vous référer à la version d'Apache Flink que vous utilisez. Pour cet exemple, nous utilisons le connecteur Kinesis. Pour Apache Flink 1.19, la version du connecteur est. 4.3.0-1.19
Note
Si vous utilisez Apache Flink 1.19, aucune version de connecteur n'a été publiée spécifiquement pour cette version. Utilisez les connecteurs publiés pour la version 1.18.
Dépendances relatives aux téléchargements et aux packages
Utilisez Maven pour télécharger les dépendances définies dans le pom.xml
fichier et les empaqueter pour l'application Python Flink.
-
Accédez au répertoire qui contient le projet Python Getting Started appelé
python/GettingStarted
. -
Exécutez la commande suivante :
$ mvn package
Maven crée un nouveau fichier appelé./target/pyflink-dependencies.jar
. Lorsque vous développez localement sur votre machine, l'application Python recherche ce fichier.
Note
Si vous oubliez d'exécuter cette commande, lorsque vous essayez d'exécuter votre application, elle échouera avec le message d'erreur suivant : Impossible de trouver une usine pour l'identifiant « kinesis ».
Écrire des exemples d'enregistrements dans le flux d'entrée
Dans cette section, vous allez envoyer des exemples d'enregistrements au flux pour que la demande soit traitée. Deux options s'offrent à vous pour générer des exemples de données, soit à l'aide d'un script Python, soit à l'aide du Kinesis Data
Générer des exemples de données à l'aide d'un script Python
Vous pouvez utiliser un script Python pour envoyer des exemples d'enregistrements au flux.
Note
Pour exécuter ce script Python, vous devez utiliser Python 3.x et installer la bibliothèque AWS SDKfor Python (Boto)
Pour commencer à envoyer des données de test vers le flux d'entrée Kinesis, procédez comme suit :
-
Téléchargez le script
stock.py
Python du générateur de données depuis le GitHub référentiel du générateur de données. -
Exécutez le script
stock.py
:$ python stock.py
Continuez à exécuter le script pendant que vous terminez le reste du didacticiel. Vous pouvez désormais exécuter votre application Apache Flink.
Génération d'échantillons de données à l'aide de Kinesis Data Generator
Au lieu d'utiliser le script Python, vous pouvez utiliser Kinesis Data Generator
Pour configurer et exécuter Kinesis Data Generator, procédez comme suit :
-
Suivez les instructions de la documentation de Kinesis Data Generator
pour configurer l'accès à l'outil. Vous allez exécuter un AWS CloudFormation modèle qui définit un utilisateur et un mot de passe. -
Accédez à Kinesis Data Generator via le modèle URL généré par le CloudFormation modèle. Vous pouvez les trouver URL dans l'onglet Sortie une fois le CloudFormation modèle terminé.
-
Configurez le générateur de données :
-
Région : Sélectionnez la région que vous utilisez pour ce didacticiel : us-east-1
-
Stream/flux de diffusion : sélectionnez le flux d'entrée que l'application utilisera :
ExampleInputStream
-
Enregistrements par seconde : 100
-
Modèle d'enregistrement : Copiez et collez le modèle suivant :
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
Testez le modèle : choisissez le modèle de test et vérifiez que l'enregistrement généré est similaire au suivant :
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
-
Démarrez le générateur de données : Choisissez Sélectionner envoyer les données.
Kinesis Data Generator envoie désormais des données au. ExampleInputStream
Exécutez votre application localement
Vous pouvez tester l'application localement, en l'exécutant depuis la ligne de commande avec python main.py
ou depuis votreIDE.
Pour exécuter votre application localement, la version correcte de la PyFlink bibliothèque doit être installée, comme décrit dans la section précédente. Pour plus d'informations, voir (lien)
Note
Avant de continuer, vérifiez que les flux d'entrée et de sortie sont disponibles. Consultez Créez deux flux de données Amazon Kinesis. Vérifiez également que vous êtes autorisé à lire et à écrire à partir des deux flux. Consultez Authentifiez votre session AWS.
Importez le projet Python dans votre IDE
Pour commencer à travailler sur l'application dans votre IDE ordinateur, vous devez l'importer en tant que projet Python.
Le référentiel que vous avez cloné contient plusieurs exemples. Chaque exemple est un projet distinct. Pour ce didacticiel, importez le contenu du ./python/GettingStarted
sous-répertoire dans votreIDE.
Importez le code en tant que projet Python existant.
Note
Le processus exact pour importer un nouveau projet Python varie en fonction de celui IDE que vous utilisez.
Vérifiez la configuration de l'application locale
Lorsqu'elle est exécutée localement, l'application utilise la configuration contenue dans le application_properties.json
fichier situé dans le dossier des ressources du projet situé sous./src/main/resources
. Vous pouvez modifier ce fichier pour utiliser différents noms de flux Kinesis ou différentes régions.
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
Exécutez votre application Python localement
Vous pouvez exécuter votre application localement, soit à partir de la ligne de commande en tant que script Python normal, soit à partir duIDE.
Pour exécuter votre application à partir de la ligne de commande
-
Assurez-vous que l'environnement Python autonome tel que Conda ou celui dans VirtualEnv lequel vous avez installé la bibliothèque Python Flink est actuellement actif.
-
Assurez-vous d'avoir couru
mvn package
au moins une fois. -
Définissez la variable d'environnement :
IS_LOCAL = true
$ export IS_LOCAL=true
-
Exécutez l'application sous la forme d'un script Python normal.
$python main.py
Pour exécuter l'application depuis le IDE
-
Configurez votre ordinateur IDE pour exécuter le
main.py
script avec la configuration suivante :-
Utilisez l'environnement Python autonome tel que Conda ou celui dans VirtualEnv lequel vous avez installé la PyFlink bibliothèque.
-
Utilisez les AWS informations d'identification pour accéder aux flux de données Kinesis en entrée et en sortie.
-
Configurez
IS_LOCAL = true
.
-
-
Le processus exact pour définir la configuration d'exécution dépend de vous IDE et varie.
-
Lorsque vous avez configuré votreIDE, exécutez le script Python et utilisez les outils IDE que vous avez fournis pendant l'exécution de l'application.
Inspectez les journaux des applications localement
Lorsqu'elle est exécutée localement, l'application n'affiche aucun journal dans la console, à l'exception de quelques lignes imprimées et affichées au démarrage de l'application. PyFlink écrit les journaux dans un fichier du répertoire où la bibliothèque Python Flink est installée. L'application imprime l'emplacement des journaux au démarrage. Vous pouvez également exécuter la commande suivante pour trouver les journaux :
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
-
Répertoriez les fichiers dans le répertoire de journalisation. Vous ne trouvez généralement qu'un seul
.log
fichier. -
Suivez le fichier pendant que l'application est en cours d'exécution :
tail -f <log-path>/<log-file>.log
.
Observez les données d'entrée et de sortie dans les flux Kinesis
Vous pouvez observer les enregistrements envoyés au flux d'entrée par le (générateur d'un exemple de Python) ou le générateur de données Kinesis (lien) à l'aide du visualiseur de données de la console Amazon Kinesis.
Pour observer les enregistrements :
Arrêtez l'exécution locale de votre application
Arrêtez l'exécution de l'application dans votreIDE. Il fournit IDE généralement une option « d'arrêt ». L'emplacement exact et la méthode dépendent duIDE.
Package du code de votre application
Dans cette section, vous allez utiliser Apache Maven pour empaqueter le code de l'application et toutes les dépendances requises dans un fichier .zip.
Exécutez à nouveau la commande du package Maven :
$ mvn package
Cette commande génère le fichiertarget/managed-flink-pyflink-getting-started-1.0.0.zip
.
Téléchargez le package d'application dans un compartiment Amazon S3
Dans cette section, vous allez charger le fichier .zip que vous avez créé dans la section précédente dans le bucket Amazon Simple Storage Service (Amazon S3) que vous avez créé au début de ce didacticiel. Si vous n'avez pas terminé cette étape, consultez (lien).
Pour télécharger le JAR fichier de code de l'application
Ouvrez la console Amazon S3 à l'adresse https://console.aws.amazon.com/s3/
. -
Choisissez le bucket que vous avez créé précédemment pour le code de l'application.
-
Sélectionnez Charger.
-
Choisissez Add files.
-
Accédez au fichier .zip généré à l'étape précédente :
target/managed-flink-pyflink-getting-started-1.0.0.zip
. -
Choisissez Télécharger sans modifier les autres paramètres.
Création et configuration du service géré pour l'application Apache Flink
Vous pouvez créer et configurer un service géré pour l'application Apache Flink à l'aide de la console ou du AWS CLI. Pour ce tutoriel, nous allons utiliser la console.
Pour créer l’application
Ouvrez le service géré pour la console Apache Flink à https://console.aws.amazon.com l'adresse /flink
-
Vérifiez que la bonne région est sélectionnée : USA Est (Virginie du Nord) us-east-1.
-
Ouvrez le menu de droite et choisissez Applications Apache Flink, puis Créer une application de streaming. Vous pouvez également choisir Créer une application de streaming dans la section Commencer de la page initiale.
-
Sur la page Créer des applications de streaming :
-
Pour Choisir une méthode pour configurer l'application de traitement des flux, choisissez Créer à partir de zéro.
-
Pour la configuration d'Apache Flink, version de l'application Flink, choisissez Apache Flink 1.19.
-
Pour la configuration de l'application :
-
Pour Nom de l’application, saisissez
MyApplication
. -
Pour Description, saisissez
My Python test app
. -
Dans Accès aux ressources de l'application, choisissez Create/update IAM role kinesis-analytics- MyApplication -us-east-1 avec les politiques requises.
-
-
Pour les paramètres du modèle pour les applications :
-
Dans Modèles, sélectionnez Développement.
-
-
Choisissez Créer une application de streaming.
-
Note
Lorsque vous créez un service géré pour une application Apache Flink à l'aide de la console, vous avez la possibilité de créer un IAM rôle et une politique pour votre application. Votre application utilise ce rôle et cette politique pour accéder à ses ressources dépendantes. Ces IAM ressources sont nommées en utilisant le nom de votre application et votre région comme suit :
-
Stratégie :
kinesis-analytics-service-
MyApplication
-us-west-2
-
Rôle :
kinesisanalytics-
MyApplication
-us-west-2
Amazon Managed Service pour Apache Flink était auparavant connu sous le nom de Kinesis Data Analytics. Le nom des ressources générées automatiquement est préfixé par un préfixe kinesis-analytics
pour des raisons de rétrocompatibilité.
Modifier la IAM politique
Modifiez la IAM politique pour ajouter des autorisations d'accès au compartiment Amazon S3.
Pour modifier la IAM politique afin d'ajouter des autorisations au compartiment S3
Ouvrez la IAM console à l'adresse https://console.aws.amazon.com/iam/
. -
Choisissez Stratégies. Choisissez la politique
kinesis-analytics-service-MyApplication-us-east-1
créée pour vous par la console dans la section précédente. -
Choisissez Modifier, puis choisissez l'JSONonglet.
-
Ajoutez la section mise en surbrillance dans l’exemple de stratégie suivant à la politique. Remplacez le compte d'exemple IDs (
012345678901
) avec votre identifiant de compte.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:
012345678901
:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901
:stream/ExampleOutputStream" } -
Choisissez Suivant, puis Enregistrer les modifications.
Configuration de l'application
Modifiez la configuration de l'application pour définir l'artefact du code de l'application.
Pour configurer l’application
-
Sur la MyApplicationpage, choisissez Configurer.
-
Dans la section Emplacement du code de l'application :
-
Pour le compartiment Amazon S3, sélectionnez le compartiment que vous avez créé précédemment pour le code de l'application. Choisissez Parcourir et sélectionnez le compartiment approprié, puis choisissez Choisir. Ne sélectionnez pas le nom du bucket.
-
Pour le chemin de l'objet Amazon S3, saisissez
managed-flink-pyflink-getting-started-1.0.0.zip
.
-
-
Pour les autorisations d'accès, choisissez Créer/mettre à jour IAM le rôle
kinesis-analytics-MyApplication-us-east-1
avec les politiques requises. -
Accédez aux propriétés d'exécution et conservez les valeurs par défaut pour tous les autres paramètres.
-
Choisissez Ajouter un nouvel article et ajoutez chacun des paramètres suivants :
ID du groupe Clé Valeur InputStream0
stream.name
ExampleInputStream
InputStream0
flink.stream.initpos
LATEST
InputStream0
aws.region
us-east-1
OutputStream0
stream.name
ExampleOutputStream
OutputStream0
aws.region
us-east-1
kinesis.analytics.flink.run.options
python
main.py
kinesis.analytics.flink.run.options
jarfile
lib/pyflink-dependencies.jar
-
Ne modifiez aucune des autres sections et choisissez Enregistrer les modifications.
Note
Lorsque vous choisissez d'activer la CloudWatch journalisation Amazon, Managed Service for Apache Flink crée un groupe de journaux et un flux de journaux pour vous. Les noms de ces ressources sont les suivants :
-
Groupe de journaux :
/aws/kinesis-analytics/MyApplication
-
Flux de journaux :
kinesis-analytics-log-stream
Exécutez l'application
L'application est maintenant configurée et prête à être exécutée.
Pour exécuter l’application
-
Sur la console d'Amazon Managed Service pour Apache Flink, choisissez My Application, puis Run.
-
Sur la page suivante, page de configuration de la restauration de l'application, choisissez Exécuter avec le dernier instantané, puis sélectionnez Exécuter.
Le statut dans l'application détaille les transitions entre
Ready
leStarting
et leRunning
moment où l'application a démarré.
Lorsque l'application est en Running
état, vous pouvez désormais ouvrir le tableau de bord Flink.
Pour ouvrir le tableau de bord d'
-
Choisissez Ouvrir le tableau de bord Apache Flink. Le tableau de bord s'ouvre sur une nouvelle page.
-
Dans la liste des tâches en cours, choisissez la tâche unique que vous pouvez voir.
Note
Si vous définissez les propriétés d'exécution ou si vous modifiez les IAM politiques de manière incorrecte, le statut de l'application peut devenir
Running
, mais le tableau de bord Flink indique que le travail redémarre continuellement. Il s'agit d'un scénario d'échec courant si l'application est mal configurée ou n'est pas autorisée à accéder aux ressources externes.Dans ce cas, consultez l'onglet Exceptions du tableau de bord Flink pour connaître la cause du problème.
Observez les métriques de l'application en cours d'exécution
Sur la MyApplicationpage, dans la section CloudWatch des métriques Amazon, vous pouvez voir certaines des mesures fondamentales de l'application en cours d'exécution.
Pour consulter les statistiques
-
À côté du bouton Actualiser, sélectionnez 10 secondes dans la liste déroulante.
-
Lorsque l'application est en cours d'exécution et fonctionne correctement, vous pouvez constater une augmentation continue de la métrique de disponibilité.
-
La métrique de redémarrage complet doit être égale à zéro. S'il augmente, la configuration peut présenter des problèmes. Pour étudier le problème, consultez l'onglet Exceptions du tableau de bord Flink.
-
La métrique du nombre de points de contrôle ayant échoué doit être égale à zéro dans une application saine.
Note
Ce tableau de bord affiche un ensemble fixe de mesures avec une granularité de 5 minutes. Vous pouvez créer un tableau de bord d'application personnalisé avec tous les indicateurs du CloudWatch tableau de bord.
Observez les données de sortie dans les flux Kinesis
Assurez-vous que vous publiez toujours les données en entrée, à l'aide du script Python ou du Kinesis Data Generator.
Vous pouvez désormais observer le résultat de l'application exécutée sur le service géré pour Apache Flink en utilisant le visualiseur de données dans le https://console.aws.amazon.com/kinesis/
Pour afficher le résultat
Ouvrez la console Kinesis à l'adresse /kinesis. https://console.aws.amazon.com
-
Vérifiez que la région est la même que celle que vous utilisez pour exécuter ce didacticiel. Par défaut, il s'agit de US-East-1US East (Virginie du Nord). Modifiez la région si nécessaire.
-
Choisissez Data Streams.
-
Sélectionnez le flux que vous souhaitez observer. Dans le cadre de ce tutoriel, utilisez
ExampleOutputStream
. -
Choisissez l'onglet Visionneuse de données.
-
Sélectionnez n'importe quelle partition, conservez Dernière comme position de départ, puis choisissez Obtenir des enregistrements. Le message d'erreur « aucun enregistrement trouvé pour cette demande » peut s'afficher. Si tel est le cas, choisissez Réessayer d'obtenir des enregistrements. Les derniers enregistrements publiés sur le stream s'affichent.
-
Sélectionnez la valeur dans la colonne Données pour inspecter le contenu de l'enregistrement au JSON format.
Arrêtez l'application
Pour arrêter l'application, rendez-vous sur la page de console de l'application Managed Service for Apache Flink nommée. MyApplication
Pour arrêter l’application
-
Dans la liste déroulante Action, choisissez Stop.
-
Le statut de l'application détaille les transitions entre
Running
le et leReady
moment où l'application est complètement arrêtée.Stopping
Note
N'oubliez pas d'arrêter également d'envoyer des données au flux d'entrée à partir du script Python ou du Kinesis Data Generator.