Le service géré Amazon pour Apache Flink était auparavant connu sous le nom d’Amazon Kinesis Data Analytics pour Apache Flink.
Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Création et exécution d'un service géré pour l'application Apache Flink
Au cours de cette étape, vous allez créer un service géré pour l'application Apache Flink avec les flux de données Kinesis comme source et récepteur.
Cette section contient les étapes suivantes :
- Création de ressources dépendantes
- Configuration de votre environnement de développement local
- Téléchargez et examinez le code Java de streaming d'Apache Flink
- Écrire des exemples d'enregistrements dans le flux d'entrée
- Exécutez votre application localement
- Observez les données d'entrée et de sortie dans les flux Kinesis
- Arrêtez l'exécution locale de votre application
- Compilez et empaquetez le code de votre application
- Téléchargez le JAR fichier de code de l'application
- Création et configuration du service géré pour l'application Apache Flink
- Étape suivante
Création de ressources dépendantes
Avant de créer une application de service géré pour Apache Flink dans le cadre de cet exercice, vous commencez par créer les ressources dépendantes suivantes :
-
Deux flux de données Kinesis pour l'entrée et la sortie
-
Un compartiment Amazon S3 pour stocker le code de l'application
Note
Ce didacticiel part du principe que vous déployez votre application dans la région us-east-1 USA Est (Virginie du Nord). Si vous utilisez une autre région, adaptez toutes les étapes en conséquence.
Créez deux flux de données Amazon Kinesis
Avant de créer une application de service géré pour Apache Flink dans le cadre de cet exercice, commencez par créer deux flux de données Kinesis (ExampleInputStream
et ExampleOutputStream
). Votre application utilise ces flux pour les flux source et de destination de l’application.
Vous pouvez créer ces flux à l'aide de la console Amazon Kinesis ou de la commande suivante AWS CLI . Pour obtenir des instructions sur la console, consultez Création et mise à jour de flux de données dans le Guide du développeur Amazon Kinesis Data Streams. Pour créer les flux à l'aide de AWS CLI, utilisez les commandes suivantes, en vous adaptant à la région que vous utilisez pour votre application.
Pour créer les flux de données (AWS CLI)
-
Pour créer le premier flux (
ExampleInputStream
), utilisez la commande Amazon Kinesiscreate-stream
AWS CLI suivante :$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \
-
Pour créer le deuxième flux que l'application utilise pour écrire la sortie, exécutez la même commande, en changeant le nom du flux en
ExampleOutputStream
:$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \
Création d'un compartiment Amazon S3 pour le code de l'application
Vous pouvez créer un compartiment Amazon S3 à l’aide de la console. Pour savoir comment créer un compartiment Amazon S3 à l'aide de la console, consultez la section Création d'un compartiment dans le guide de l'utilisateur Amazon S3. Nommez le compartiment Amazon S3 à l'aide d'un nom unique global, par exemple en ajoutant votre nom de connexion.
Note
Assurez-vous de créer le bucket dans la région que vous utilisez pour ce didacticiel (us-east-1).
Autres ressources
Lorsque vous créez votre application, Managed Service for Apache Flink crée automatiquement les CloudWatch ressources Amazon suivantes si elles n'existent pas déjà :
-
Un groupe de journaux appelé
/AWS/KinesisAnalytics-java/<my-application>
-
Un flux de journaux appelé
kinesis-analytics-log-stream
Configuration de votre environnement de développement local
Pour le développement et le débogage, vous pouvez exécuter l'application Apache Flink sur votre machine directement depuis celle IDE de votre choix. Toutes les dépendances d'Apache Flink sont gérées comme des dépendances Java classiques à l'aide d'Apache Maven.
Note
Java JDK 11, Maven et Git doivent être installés sur votre machine de développement. Nous vous recommandons d'utiliser un environnement de développement tel qu'Eclipse, Java Neon ou IntelliJ IDEA
Authentifiez votre session AWS
L'application utilise les flux de données Kinesis pour publier des données. Lors de l'exécution locale, vous devez disposer d'une session AWS authentifiée valide avec les autorisations nécessaires pour écrire dans le flux de données Kinesis. Procédez comme suit pour authentifier votre session :
-
Si vous n'avez pas configuré le profil AWS CLI et un profil nommé avec des informations d'identification valides, consultezConfigurez le AWS Command Line Interface (AWS CLI).
-
Vérifiez que vous êtes correctement AWS CLI configuré et que vos utilisateurs sont autorisés à écrire dans le flux de données Kinesis en publiant l'enregistrement de test suivant :
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
-
Si vous IDE avez un plugin à intégrer AWS, vous pouvez l'utiliser pour transmettre les informations d'identification à l'application qui s'exécute dans leIDE. Pour plus d'informations, consultez AWS Toolkit for IntelliJ IDEA
et AWS Toolkit for Eclipse.
Téléchargez et examinez le code Java de streaming d'Apache Flink
Le code de l'application Java pour cet exemple est disponible sur GitHub. Pour télécharger le code d’application, procédez comme suit :
-
Cloner le référentiel distant à l’aide de la commande suivante :
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
-
Accédez au répertoire
amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted
.
Vérifiez les composants de l'application
L'application est entièrement implémentée dans la com.amazonaws.services.msf.BasicStreamingJob
classe. La main()
méthode définit le flux de données pour traiter les données de streaming et les exécuter.
Note
Pour une expérience de développement optimisée, l'application est conçue pour s'exécuter sans aucune modification de code, à la fois sur Amazon Managed Service pour Apache Flink et localement, pour le développement dans votre IDE environnement.
-
Pour lire la configuration d'exécution afin qu'elle fonctionne lors de son exécution dans Amazon Managed Service pour Apache Flink et dans votreIDE, l'application détecte automatiquement si elle s'exécute de manière autonome localement dans le. IDE Dans ce cas, l'application charge la configuration d'exécution différemment :
-
Lorsque l'application détecte qu'elle s'exécute en mode autonome dans votreIDE, créez le
application_properties.json
fichier inclus dans le dossier de ressources du projet. Le contenu du fichier suit. -
Lorsque l'application s'exécute dans Amazon Managed Service pour Apache Flink, le comportement par défaut charge la configuration de l'application à partir des propriétés d'exécution que vous allez définir dans l'application Amazon Managed Service pour Apache Flink. Consultez Création et configuration du service géré pour l'application Apache Flink.
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
-
-
La
main()
méthode définit le flux de données de l'application et l'exécute.-
Initialise les environnements de streaming par défaut. Dans cet exemple, nous montrons comment créer
StreamExecutionEnvironment
à la fois le DataSteam APIStreamTableEnvironment
à utiliser avec SQL et le tableauAPI. Les deux objets d'environnement sont deux références distinctes au même environnement d'exécution, à utiliser différemmentAPIs.StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
Chargez les paramètres de configuration de l'application. Cela les chargera automatiquement depuis le bon endroit, en fonction de l'endroit où l'application est exécutée :
Map<String, Properties> applicationParameters = loadApplicationProperties(env);
-
L'application définit une source à l'aide du connecteur Kinesis Consumer
pour lire les données du flux d'entrée. La configuration du flux d'entrée est définie dans le PropertyGroupId
=InputStream0
. Le nom et la région du flux figurentaws.region
respectivement dans les propriétés nomméesstream.name
et. Pour des raisons de simplicité, cette source lit les enregistrements sous forme de chaîne.private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... }
-
L'application définit ensuite un récepteur à l'aide du connecteur Kinesis Streams Sink
pour envoyer des données au flux de sortie. Le nom du flux de sortie et la région sont définis dans le PropertyGroupId
=OutputStream0
, de la même manière que le flux d'entrée. Le récepteur est connecté directement à l'interface interneDataStream
qui reçoit les données de la source. Dans une application réelle, vous avez une certaine transformation entre la source et le récepteur.private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... }
-
Enfin, vous exécutez le flux de données que vous venez de définir. Il doit s'agir de la dernière instruction de la
main()
méthode, une fois que vous avez défini tous les opérateurs requis par le flux de données :env.execute("Flink streaming Java API skeleton");
-
Utilisez le fichier pom.xml
Le fichier pom.xml définit toutes les dépendances requises par l'application et configure le plugin Maven Shade pour créer le fat-jar qui contient toutes les dépendances requises par Flink.
-
Certaines dépendances ont une
provided
portée. Ces dépendances sont automatiquement disponibles lorsque l'application s'exécute dans Amazon Managed Service pour Apache Flink. Ils sont nécessaires pour compiler l'application ou pour exécuter l'application localement dans votreIDE. Pour de plus amples informations, veuillez consulter Exécutez votre application localement. Assurez-vous que vous utilisez la même version de Flink que celle du moteur d'exécution que vous utiliserez dans Amazon Managed Service pour Apache Flink.<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
Vous devez ajouter des dépendances Apache Flink supplémentaires au pom avec la portée par défaut, comme le connecteur Kinesis
utilisé par cette application. Pour de plus amples informations, veuillez consulter Utiliser les connecteurs Apache Flink avec le service géré pour Apache Flink. Vous pouvez également ajouter toute dépendance Java supplémentaire requise par votre application. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
-
Le plugin Maven Java Compiler s'assure que le code est compilé avec Java 11, la JDK version actuellement prise en charge par Apache Flink.
-
Le plugin Maven Shade empaquète le fat-jar, à l'exception de certaines bibliothèques fournies par le moteur d'exécution. Il spécifie également deux transformateurs :
ServicesResourceTransformer
et.ManifestResourceTransformer
Ce dernier configure la classe contenant lamain
méthode de démarrage de l'application. Si vous renommez la classe principale, n'oubliez pas de mettre à jour ce transformateur. -
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>
Écrire des exemples d'enregistrements dans le flux d'entrée
Dans cette section, vous allez envoyer des exemples d'enregistrements au flux pour que la demande soit traitée. Deux options s'offrent à vous pour générer des exemples de données, soit à l'aide d'un script Python, soit à l'aide du Kinesis Data
Générer des exemples de données à l'aide d'un script Python
Vous pouvez utiliser un script Python pour envoyer des exemples d'enregistrements au flux.
Note
Pour exécuter ce script Python, vous devez utiliser Python 3.x et installer la bibliothèque AWS SDKfor Python (Boto)
Pour commencer à envoyer des données de test vers le flux d'entrée Kinesis, procédez comme suit :
-
Téléchargez le script
stock.py
Python du générateur de données depuis le GitHub référentiel du générateur de données. -
Exécutez le script
stock.py
:$ python stock.py
Continuez à exécuter le script pendant que vous terminez le reste du didacticiel. Vous pouvez désormais exécuter votre application Apache Flink.
Génération d'échantillons de données à l'aide de Kinesis Data Generator
Au lieu d'utiliser le script Python, vous pouvez utiliser Kinesis Data Generator
Pour configurer et exécuter Kinesis Data Generator, procédez comme suit :
-
Suivez les instructions de la documentation de Kinesis Data Generator
pour configurer l'accès à l'outil. Vous allez exécuter un AWS CloudFormation modèle qui définit un utilisateur et un mot de passe. -
Accédez à Kinesis Data Generator via le modèle URL généré par le CloudFormation modèle. Vous pouvez les trouver URL dans l'onglet Sortie une fois le CloudFormation modèle terminé.
-
Configurez le générateur de données :
-
Région : Sélectionnez la région que vous utilisez pour ce didacticiel : us-east-1
-
Stream/flux de diffusion : sélectionnez le flux d'entrée que l'application utilisera :
ExampleInputStream
-
Enregistrements par seconde : 100
-
Modèle d'enregistrement : Copiez et collez le modèle suivant :
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
Testez le modèle : choisissez le modèle de test et vérifiez que l'enregistrement généré est similaire au suivant :
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
-
Démarrez le générateur de données : Choisissez Sélectionner envoyer les données.
Kinesis Data Generator envoie désormais des données au. ExampleInputStream
Exécutez votre application localement
Vous pouvez exécuter et déboguer votre application Flink localement dans votre. IDE
Note
Avant de continuer, vérifiez que les flux d'entrée et de sortie sont disponibles. Consultez Créez deux flux de données Amazon Kinesis. Vérifiez également que vous êtes autorisé à lire et à écrire à partir des deux flux. Consultez Authentifiez votre session AWS.
La configuration de l'environnement de développement local nécessite Java 11JDK, Apache Maven et IDE pour le développement Java. Vérifiez que vous remplissez les conditions requises. Consultez Remplir les conditions préalables pour terminer les exercices.
Importez le projet Java dans votre IDE
Pour commencer à travailler sur l'application dans votre IDE ordinateur, vous devez l'importer en tant que projet Java.
Le référentiel que vous avez cloné contient plusieurs exemples. Chaque exemple est un projet distinct. Pour ce didacticiel, importez le contenu du ./java/GettingStarted
sous-répertoire dans votreIDE.
Insérez le code en tant que projet Java existant à l'aide de Maven.
Note
Le processus exact d'importation d'un nouveau projet Java varie en fonction du projet IDE que vous utilisez.
Vérifiez la configuration de l'application locale
Lorsqu'elle est exécutée localement, l'application utilise la configuration contenue dans le application_properties.json
fichier situé dans le dossier des ressources du projet situé sous./src/main/resources
. Vous pouvez modifier ce fichier pour utiliser différents noms de flux Kinesis ou différentes régions.
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
Configurez votre configuration d'IDEexécution
Vous pouvez exécuter et déboguer l'application Flink IDE directement depuis votre ordinateur en exécutant la classe principalecom.amazonaws.services.msf.BasicStreamingJob
, comme vous le feriez pour n'importe quelle application Java. Avant d'exécuter l'application, vous devez configurer la configuration Exécuter. La configuration dépend de celle IDE que vous utilisez. Par exemple, voir les configurations Run/Debug
-
Ajoutez les
provided
dépendances au chemin de classe. Cela est nécessaire pour s'assurer que les dépendancesprovided
étendues sont transmises à l'application lors de l'exécution locale. Sans cette configuration, l'application affiche immédiatement uneclass not found
erreur. -
Transmettez les AWS informations d'identification pour accéder aux flux Kinesis à l'application. Le moyen le plus rapide est d'utiliser AWS Toolkit pour IntelliJ IDEA
. En utilisant ce IDE plugin dans la configuration Run, vous pouvez sélectionner un AWS profil spécifique. AWS l'authentification se fait à l'aide de ce profil. Vous n'avez pas besoin de transmettre directement les AWS informations d'identification. -
Vérifiez que l'application IDE est exécutée à l'aide de JDK11.
Exécutez l'application dans votre IDE
Après avoir configuré la configuration Run pour leBasicStreamingJob
, vous pouvez l'exécuter ou le déboguer comme une application Java classique.
Note
Vous ne pouvez pas exécuter le fat-jar généré par Maven directement java -jar
...
depuis la ligne de commande. Ce fichier jar ne contient pas les dépendances principales de Flink requises pour exécuter l'application de manière autonome.
Lorsque l'application démarre correctement, elle enregistre certaines informations concernant le minicluster autonome et l'initialisation des connecteurs. Viennent ensuite un certain nombre INFO de WARN journaux que Flink émet normalement au démarrage de l'application.
13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....
Une fois l'initialisation terminée, l'application n'émet aucune autre entrée de journal. Pendant le flux de données, aucun journal n'est émis.
Pour vérifier si l'application traite correctement les données, vous pouvez inspecter les flux Kinesis en entrée et en sortie, comme décrit dans la section suivante.
Note
L'absence d'enregistrement des flux de données est le comportement normal d'une application Flink. L'émission de journaux sur chaque enregistrement peut être pratique pour le débogage, mais elle peut entraîner une surcharge considérable lors de l'exécution en production.
Observez les données d'entrée et de sortie dans les flux Kinesis
Vous pouvez observer les enregistrements envoyés au flux d'entrée par le (générateur d'un exemple de Python) ou le générateur de données Kinesis (lien) à l'aide du visualiseur de données de la console Amazon Kinesis.
Pour observer les records
Ouvrez la console Kinesis à l'adresse /kinesis. https://console.aws.amazon.com
-
Vérifiez que la région est la même que celle dans laquelle vous exécutez ce didacticiel, à savoir us-east-1 USA East (Virginie du Nord) par défaut. Modifiez la région si elle ne correspond pas.
-
Choisissez Data Streams.
-
Sélectionnez le flux que vous souhaitez observer,
ExampleInputStream
soitExampleOutputStream.
-
Choisissez l'onglet Visionneuse de données.
-
Choisissez n'importe quel Shard, conservez Latest comme position de départ, puis choisissez Get records. Le message d'erreur « Aucun enregistrement trouvé pour cette demande » peut s'afficher. Si tel est le cas, choisissez Réessayer d'obtenir des enregistrements. Les derniers enregistrements publiés sur le stream s'affichent.
-
Choisissez la valeur dans la colonne Données pour inspecter le contenu de l'enregistrement au JSON format.
Arrêtez l'exécution locale de votre application
Arrêtez l'exécution de l'application dans votreIDE. Il fournit IDE généralement une option « d'arrêt ». L'emplacement exact et la méthode dépendent de celui que IDE vous utilisez.
Compilez et empaquetez le code de votre application
Dans cette section, vous allez utiliser Apache Maven pour compiler le code Java et l'empaqueter dans un JAR fichier. Vous pouvez compiler et empaqueter votre code à l'aide de l'outil de ligne de commande Maven ou de votreIDE.
Pour compiler et empaqueter à l'aide de la ligne de commande Maven :
Accédez au répertoire contenant le GettingStarted projet Java et exécutez la commande suivante :
$ mvn package
Pour compiler et empaqueter à l'aide de votre IDE :
Exécutez mvn package
à partir de votre intégration IDE Maven.
Dans les deux cas, le JAR fichier suivant est créé : target/amazon-msf-java-stream-app-1.0.jar
Note
L'exécution d'un « projet de construction » à partir de vous IDE risque de ne pas créer le JAR fichier.
Téléchargez le JAR fichier de code de l'application
Dans cette section, vous allez charger le JAR fichier que vous avez créé dans la section précédente dans le bucket Amazon Simple Storage Service (Amazon S3) que vous avez créé au début de ce didacticiel. Si vous n'avez pas terminé cette étape, consultez (lien).
Pour télécharger le JAR fichier de code de l'application
Ouvrez la console Amazon S3 à l'adresse https://console.aws.amazon.com/s3/
. -
Choisissez le bucket que vous avez créé précédemment pour le code de l'application.
-
Sélectionnez Charger.
-
Choisissez Add files.
-
Accédez au JAR fichier généré à l'étape précédente :
target/amazon-msf-java-stream-app-1.0.jar
. -
Choisissez Upload sans modifier les autres paramètres.
Avertissement
Assurez-vous de sélectionner le bon JAR fichier dans<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar
.
Le target
répertoire contient également d'autres JAR fichiers que vous n'avez pas besoin de télécharger.
Création et configuration du service géré pour l'application Apache Flink
Vous pouvez créer et exécuter une application de service géré pour Apache Flink à l’aide de la console ou de l’interface AWS CLI. Pour ce didacticiel, vous allez utiliser la console.
Note
Lorsque vous créez l'application à l'aide de la console, vos ressources AWS Identity and Access Management (IAM) et Amazon CloudWatch Logs sont créées pour vous. Lorsque vous créez l'application à l'aide du AWS CLI, vous créez ces ressources séparément.
Rubriques
Pour créer l’application
Pour créer l'application
Ouvrez le service géré pour la console Apache Flink à https://console.aws.amazon.com l'adresse /flink
-
Vérifiez que la bonne région est sélectionnée : us-east-1 US East (Virginie du Nord)
-
Ouvrez le menu de droite et choisissez Applications Apache Flink, puis Créer une application de streaming. Vous pouvez également choisir Créer une application de streaming dans le conteneur Get started de la page initiale.
-
Sur la page Créer une application de streaming :
-
Choisissez une méthode pour configurer l'application de traitement des flux : choisissez Create from scratch.
-
Configuration d'Apache Flink, version de l'application Flink : choisissez Apache Flink 1.19.
-
-
Configurez votre application
-
Nom de l'application : entrez
MyApplication
. -
Description : entrez
My java test app
. -
Accès aux ressources de l'application : choisissez Créer/mettre à jour IAM le rôle
kinesis-analytics-MyApplication-us-east-1
avec les politiques requises.
-
-
Configurez votre modèle pour les paramètres de l'application
-
Modèles : choisissez Développement.
-
-
Choisissez Créer une application de streaming au bas de la page.
Note
Lorsque vous créez un service géré pour une application Apache Flink à l'aide de la console, vous avez la possibilité de créer un IAM rôle et une politique pour votre application. Votre application utilise ce rôle et cette politique pour accéder à ses ressources dépendantes. Ces IAM ressources sont nommées en utilisant le nom de votre application et votre région comme suit :
-
Stratégie :
kinesis-analytics-service-
MyApplication
-us-east-1
-
Rôle :
kinesisanalytics-
MyApplication
-us-east-1
Amazon Managed Service pour Apache Flink était auparavant connu sous le nom de Kinesis Data Analytics. Le nom des ressources créées automatiquement est préfixé par un préfixe kinesis-analytics-
pour des raisons de rétrocompatibilité.
Modifier la IAM politique
Modifiez la IAM politique pour ajouter des autorisations d'accès aux flux de données Kinesis.
Pour modifier la politique
Ouvrez la IAM console à l'adresse https://console.aws.amazon.com/iam/
. -
Choisissez Stratégies. Choisissez la politique
kinesis-analytics-service-MyApplication-us-east-1
créée pour vous par la console dans la section précédente. -
Choisissez Modifier, puis choisissez l'JSONonglet.
-
Ajoutez la section mise en surbrillance dans l’exemple de stratégie suivant à la politique. Remplacez le compte d'exemple IDs (
012345678901
) avec votre identifiant de compte.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:
012345678901
:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901
:stream/ExampleOutputStream" } -
Choisissez Suivant au bas de la page, puis cliquez sur Enregistrer les modifications.
Configuration de l'application
Modifiez la configuration de l'application pour définir l'artefact du code de l'application.
Pour modifier la configuration
-
Sur la MyApplicationpage, choisissez Configurer.
-
Dans la section Emplacement du code de l'application :
-
Pour le compartiment Amazon S3, sélectionnez le compartiment que vous avez créé précédemment pour le code de l'application. Choisissez Parcourir et sélectionnez le compartiment approprié, puis sélectionnez Choisir. Ne cliquez pas sur le nom du bucket.
-
Pour le chemin de l'objet Amazon S3, saisissez
amazon-msf-java-stream-app-1.0.jar
.
-
-
Pour les autorisations d'accès, choisissez Créer/mettre à jour IAM le rôle
kinesis-analytics-MyApplication-us-east-1
avec les politiques requises. -
Dans la section Propriétés d'exécution, ajoutez les propriétés suivantes.
-
Choisissez Ajouter un nouvel article et ajoutez chacun des paramètres suivants :
ID du groupe Clé Valeur InputStream0
stream.name
ExampleInputStream
InputStream0
aws.region
us-east-1
OutputStream0
stream.name
ExampleOutputStream
OutputStream0
aws.region
us-east-1
-
Ne modifiez aucune des autres sections.
-
Sélectionnez Enregistrer les modifications.
Note
Lorsque vous choisissez d'activer la CloudWatch journalisation Amazon, Managed Service for Apache Flink crée un groupe de journaux et un flux de journaux pour vous. Les noms de ces ressources sont les suivants :
-
Groupe de journaux :
/aws/kinesis-analytics/MyApplication
-
Flux de journaux :
kinesis-analytics-log-stream
Exécutez l'application
L'application est maintenant configurée et prête à être exécutée.
Pour exécuter l’application
-
Sur la console d'Amazon Managed Service pour Apache Flink, choisissez My Application, puis Run.
-
Sur la page suivante, page de configuration de la restauration de l'application, choisissez Exécuter avec le dernier instantané, puis sélectionnez Exécuter.
Le statut dans l'application détaille les transitions entre
Ready
leStarting
et leRunning
moment où l'application a démarré.
Lorsque l'application est en Running
état, vous pouvez désormais ouvrir le tableau de bord Flink.
Pour ouvrir le tableau de bord d'
-
Choisissez Ouvrir le tableau de bord Apache Flink. Le tableau de bord s'ouvre sur une nouvelle page.
-
Dans la liste des tâches en cours, choisissez la tâche unique que vous pouvez voir.
Note
Si vous définissez les propriétés d'exécution ou si vous modifiez les IAM politiques de manière incorrecte, le statut de l'application peut devenir
Running
, mais le tableau de bord Flink indique que le travail redémarre continuellement. Il s'agit d'un scénario d'échec courant si l'application est mal configurée ou n'est pas autorisée à accéder aux ressources externes.Dans ce cas, consultez l'onglet Exceptions du tableau de bord Flink pour connaître la cause du problème.
Observez les métriques de l'application en cours d'exécution
Sur la MyApplicationpage, dans la section CloudWatch des métriques Amazon, vous pouvez voir certaines des mesures fondamentales de l'application en cours d'exécution.
Pour consulter les statistiques
-
À côté du bouton Actualiser, sélectionnez 10 secondes dans la liste déroulante.
-
Lorsque l'application est en cours d'exécution et saine, vous pouvez constater une augmentation continue de la métrique de disponibilité.
-
La métrique de redémarrage complet doit être égale à zéro. S'il augmente, la configuration peut présenter des problèmes. Pour étudier le problème, consultez l'onglet Exceptions du tableau de bord Flink.
-
La métrique du nombre de points de contrôle ayant échoué doit être égale à zéro dans une application saine.
Note
Ce tableau de bord affiche un ensemble fixe de mesures avec une granularité de 5 minutes. Vous pouvez créer un tableau de bord d'application personnalisé avec tous les indicateurs du CloudWatch tableau de bord.
Observez les données de sortie dans les flux Kinesis
Assurez-vous que vous publiez toujours les données en entrée, à l'aide du script Python ou du Kinesis Data Generator.
Vous pouvez désormais observer le résultat de l'application exécutée sur le service géré pour Apache Flink en utilisant le visualiseur de données dans le https://console.aws.amazon.com/kinesis/
Pour afficher le résultat
Ouvrez la console Kinesis à l'adresse /kinesis. https://console.aws.amazon.com
-
Vérifiez que la région est la même que celle que vous utilisez pour exécuter ce didacticiel. Par défaut, il s'agit de US-East-1US East (Virginie du Nord). Modifiez la région si nécessaire.
-
Choisissez Data Streams.
-
Sélectionnez le flux que vous souhaitez observer. Dans le cadre de ce tutoriel, utilisez
ExampleOutputStream
. -
Choisissez l'onglet Visionneuse de données.
-
Sélectionnez n'importe quelle partition, conservez Dernière comme position de départ, puis choisissez Obtenir des enregistrements. Le message d'erreur « aucun enregistrement trouvé pour cette demande » peut s'afficher. Si tel est le cas, choisissez Réessayer d'obtenir des enregistrements. Les derniers enregistrements publiés sur le stream s'affichent.
-
Sélectionnez la valeur dans la colonne Données pour inspecter le contenu de l'enregistrement au JSON format.
Arrêtez l'application
Pour arrêter l'application, rendez-vous sur la page de console de l'application Managed Service for Apache Flink nommée. MyApplication
Pour arrêter l’application
-
Dans la liste déroulante Action, choisissez Stop.
-
Le statut de l'application détaille les transitions entre
Running
le et leReady
moment où l'application est complètement arrêtée.Stopping
Note
N'oubliez pas d'arrêter également d'envoyer des données au flux d'entrée à partir du script Python ou du Kinesis Data Generator.