Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Dans cet exercice, vous allez créer une application de service géré pour Apache Flink avec des flux de données comme source et comme récepteur.
Cette section contient les étapes suivantes :
- Création de deux flux de données Amazon Kinesis
- Écriture d’exemples d’enregistrements dans le flux d’entrée
- Téléchargez et examinez le code Java de streaming d'Apache Flink
- Compilez le code de l'application
- Téléchargez le code Java de streaming Apache Flink
- Création et exécution du service géré pour l'application Apache Flink
Création de deux flux de données Amazon Kinesis
Avant de créer un Amazon Managed Service pour Apache Flink dans le cadre de cet exercice, créez deux flux de données Kinesis ExampleInputStream
(ExampleOutputStream
et). Votre application utilise ces flux pour les flux source et de destination de l’application.
Vous pouvez créer ces flux à l'aide de la console Amazon Kinesis ou de ce qui suit AWS CLI commande. Pour de plus amples informations, veuillez consulter Création et mise à jour des flux de données.
Pour créer les flux de données (AWS CLI)
-
Pour créer le premier flux (
ExampleInputStream
), utilisez le Amazon Kinesis suivantcreate-stream
AWS CLI commande.$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
-
Pour créer le second flux utilisé par l’application pour écrire la sortie, exécutez la même commande en remplaçant le nom du flux par
ExampleOutputStream
.$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
Écriture d’exemples d’enregistrements dans le flux d’entrée
Dans cette section, vous utilisez un script Python pour écrire les exemples d’enregistrements dans le flux pour que l’application les traite.
Note
Cette section nécessite AWS SDK for Python (Boto)
-
Créez un fichier nommé
stock.py
avec le contenu suivant :import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
-
Plus loin dans ce didacticiel, vous exécutez le script
stock.py
pour envoyer des données à l’application.$ python stock.py
Téléchargez et examinez le code Java de streaming d'Apache Flink
Le code de l'application Java pour ces exemples est disponible sur GitHub. Pour télécharger le code d’application, procédez comme suit :
-
Cloner le référentiel distant à l’aide de la commande suivante :
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
-
Accédez au répertoire
GettingStarted
.
Le code d'application est situé dans les fichiers CustomSinkStreamingJob.java
et CloudWatchLogSink.java
. Notez les informations suivantes à propos du code d’application :
-
L'application utilise une source Kinesis pour lire à partir du flux source. L'extrait de code suivant crée le récepteur Kinesis :
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
Compilez le code de l'application
Dans cette section, vous allez utiliser le compilateur Apache Maven pour créer le code Java pour l’application. Pour plus d'informations sur l'installation d'Apache Maven et du kit de développement Java (JDK), consultezConditions préalables pour terminer les exercices.
Votre application Java nécessite les composants suivants :
-
Un fichier de modèle d'objet du projet (pom.xml)
. Ce fichier contient des informations sur la configuration et les dépendances de l'application, y compris le service Amazon Managed Service pour les bibliothèques Apache Flink. -
Une méthode
main
qui contient la logique de l'application.
Note
Pour utiliser le connecteur Kinesis pour l'application suivante, vous devez télécharger le code source du connecteur et le créer comme décrit dans la documentation Apache Flink
Pour créer et compiler le code d'application
-
Créez une application Java/Maven dans votre environnement de développement. Pour de plus amples informations sur la création d'une application, veuillez consultez la documentation relative à votre environnement de développement :
-
Création de votre premier projet Java (Eclipse Java Neon)
(français non garanti) -
Création, exécution et mise en package de votre première application Java (IntelliJ Idea)
(français non garanti)
-
-
Utilisez le code suivant dans un fichier nommé
StreamingJob.java
.package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); /* * if you would like to use runtime configuration properties, uncomment the * lines below * input.addSink(createSinkFromApplicationProperties()) */ input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }
Notez les informations suivantes à propos de l'exemple de code précédent :
-
Ce fichier contient la méthode
main
qui définit la fonctionnalité de l'application. -
Votre application crée les connecteurs source et récepteur pour accéder aux ressources externes à l’aide d’un objet
StreamExecutionEnvironment
. -
L’application crée les connecteurs source et récepteur à l’aide de propriétés statiques. Pour utiliser les propriétés de l’application dynamique, utilisez les méthodes
createSourceFromApplicationProperties
etcreateSinkFromApplicationProperties
pour créer les connecteurs. Ces méthodes lisent les propriétés de l’application pour configurer les connecteurs.
-
-
Pour utiliser le code de votre application, vous devez le compiler et le regrouper dans un JAR fichier. Vous pouvez compiler et intégrer votre code de deux manières :
-
À l'aide de l'outil de ligne de commande Maven. Créez votre JAR fichier en exécutant la commande suivante dans le répertoire qui contient le
pom.xml
fichier :mvn package
-
À l’aide de votre environnement de développement. Consultez la documentation de votre environnement de développement pour plus de détails.
Vous pouvez soit télécharger votre package sous forme de JAR fichier, soit compresser votre package et le télécharger sous forme de ZIP fichier. Si vous créez votre application à l'aide du AWS CLI, vous spécifiez le type de contenu de votre code (JARouZIP).
-
-
En cas d’erreur lors de la compilation, vérifiez que votre variable d’environnement
JAVA_HOME
est correctement définie.
Si la compilation de l’application aboutit, le fichier suivant est créé :
target/java-getting-started-1.0.jar
Téléchargez le code Java de streaming Apache Flink
Dans cette section, vous allez créer un compartiment Amazon Simple Storage Service (Amazon S3) et charger votre code d'application.
Pour charger le code d’application
Ouvrez la console Amazon S3 à l'adresse https://console.aws.amazon.com/s3/
. -
Choisissez Créer un compartiment.
-
Saisissez
ka-app-code-
dans le champ Nom du compartiment. Ajoutez un suffixe au nom du compartiment, par exemple votre nom d’utilisateur, pour qu’il soit unique. Choisissez Suivant.<username>
-
À l’étape Configurer les options, conservez les paramètres, puis choisissez Suivant.
-
À l’étape Définir des autorisations, conservez les paramètres, puis choisissez Suivant.
-
Choisissez Créer un compartiment.
-
Dans la console Amazon S3, choisissez le ka-app-code -
<username>
bucket, puis choisissez Upload. -
À l’étape Sélectionner les fichiers, choisissez Ajouter des fichiers. Accédez au fichier
java-getting-started-1.0.jar
que vous avez créé à l’étape précédente. Choisissez Suivant. -
À l'étape Définir des autorisations, conservez les paramètres. Choisissez Suivant.
-
À l'étape Définir les propriétés, conservez les paramètres. Sélectionnez Charger.
Votre code d'application est désormais stocké dans un compartiment Amazon S3 auquel votre application peut accéder.
Création et exécution du service géré pour l'application Apache Flink
Vous pouvez créer et exécuter un service géré pour l'application Apache Flink à l'aide de la console ou du AWS CLI.
Note
Lorsque vous créez l'application à l'aide de la console, votre AWS Identity and Access Management (IAM) et les ressources Amazon CloudWatch Logs sont créées pour vous. Lorsque vous créez l'application à l'aide du AWS CLI, vous créez ces ressources séparément.
Création et exécution de l'application (console)
Suivez ces étapes pour créer, configurer, mettre à jour et exécuter l’application à l’aide de la console.
Pour créer l’application
Ouvrez la console Kinesis à l'adresse /kinesis. https://console.aws.amazon.com
-
Dans le tableau de bord Amazon Kinesis, choisissez Création d'une application d'analyse.
-
Sur la page Kinesis Analytics - Créer une application, fournissez les détails de l'application comme suit :
-
Pour Nom de l’application, saisissez
MyApplication
. -
Pour Description, saisissez
My java test app
. -
Pour Runtime (Exécution), choisissez Apache Flink 1.6.
-
-
Pour les autorisations d'accès, choisissez Create/update IAM role
kinesis-analytics-MyApplication-us-west-2
. -
Choisissez Créer une application.
Note
Lorsque vous créez une application Amazon Managed Service pour Apache Flink à l'aide de la console, vous avez la possibilité de créer un IAM rôle et une politique pour votre application. Votre application utilise ce rôle et cette politique pour accéder à ses ressources dépendantes. Ces IAM ressources sont nommées à l'aide du nom de votre application et de votre région, comme suit :
-
Stratégie :
kinesis-analytics-service-
MyApplication
-us-west-2
-
Rôle :
kinesis-analytics-
MyApplication
-us-west-2
Modifier la IAM politique
Modifiez la IAM politique pour ajouter des autorisations d'accès aux flux de données Kinesis.
Ouvrez la IAM console à l'adresse https://console.aws.amazon.com/iam/
. -
Choisissez Stratégies. Choisissez la politique
kinesis-analytics-service-MyApplication-us-west-2
créée pour vous par la console dans la section précédente. -
Sur la page Récapitulatif, choisissez Modifier la politique. Choisissez l'JSONonglet.
-
Ajoutez la section mise en surbrillance dans l’exemple de stratégie suivant à la politique. Remplacez le compte d'exemple IDs (
012345678901
) avec votre identifiant de compte.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-
username
/java-getting-started-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" }
] }
Configuration de l'application
-
Sur la MyApplicationpage, choisissez Configurer.
-
Sur la page Configurer l’application, indiquez l’emplacement du code:
-
Pour le compartiment Amazon S3, saisissez
ka-app-code-
.<username>
-
Pour le chemin de l'objet Amazon S3, saisissez
java-getting-started-1.0.jar
.
-
-
Sous Accès aux ressources de l'application, pour les autorisations d'accès, choisissez Create/update IAM role
kinesis-analytics-MyApplication-us-west-2
. -
Sous Propriétés, pour ID de groupe, saisissez
ProducerConfigProperties
. -
Entrez les valeurs et propriétés d’application suivantes :
Clé Valeur flink.inputstream.initpos
LATEST
aws:region
us-west-2
AggregationEnabled
false
-
Sous Surveillance, assurez-vous que Surveillance du niveau des métriques est défini sur Application.
-
Pour la CloudWatch journalisation, cochez la case Activer.
-
Choisissez Mettre à jour.
Note
Lorsque vous choisissez d'activer la CloudWatch journalisation, le service géré pour Apache Flink crée un groupe de journaux et un flux de journaux pour vous. Les noms de ces ressources sont les suivants :
-
Groupe de journaux :
/aws/kinesis-analytics/MyApplication
-
Flux de journaux :
kinesis-analytics-log-stream
Exécutez l'application
-
Sur la MyApplicationpage, choisissez Exécuter. Confirmez l’action.
-
Lorsque l’application est en cours d’exécution, actualisez la page. La console affiche le graphique de l’application.
Arrêtez l'application
Sur la MyApplicationpage, choisissez Stop. Confirmez l’action.
Mise à jour de l'application
À l'aide de la console, vous pouvez mettre à jour les paramètres de l'application, tels que les propriétés de l'application, les paramètres de surveillance, ainsi que l'emplacement ou le nom de fichier de l'applicationJAR. Vous pouvez également recharger l'application JAR depuis le compartiment Amazon S3 si vous devez mettre à jour le code de l'application.
Sur la MyApplicationpage, choisissez Configurer. Mettez à jour les paramètres de l’application, puis choisissez Mettre à jour.
Créez et exécutez l'application (AWS CLI)
Dans cette section, vous utilisez AWS CLI pour créer et exécuter le service géré pour l'application Apache Flink. Le service géré pour Apache Flink utilise kinesisanalyticsv2
AWS CLI commande permettant de créer et d'interagir avec le service géré pour les applications Apache Flink.
Créer une stratégie d’autorisations
Vous commencez par créer une stratégie d’autorisations avec deux instructions : une qui accorde des autorisations pour l’action read
sur le flux source et une autre qui accorde des autorisations pour les actions write
sur le flux récepteur. Vous associez ensuite la politique à un IAM rôle (que vous créez dans la section suivante). Ainsi, lorsque le service géré pour Apache Flink assume le rôle, le service dispose des autorisations nécessaires pour lire à partir du flux source et écrire dans le flux récepteur.
Utilisez le code suivant pour créer la politique d’autorisations KAReadSourceStreamWriteSinkStream
. Remplacez
par le nom d’utilisateur que vous avez utilisé pour créer le compartiment Amazon S3 pour stocker le code d’application. Remplacez l'ID de compte dans Amazon Resource Names (ARNs) (username
) par votre identifiant de compte.012345678901
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObjectVersion"
],
"Resource": ["arn:aws:s3:::ka-app-code-username
",
"arn:aws:s3:::ka-app-code-username
/*"
]
},
{
"Sid": "ReadInputStream",
"Effect": "Allow",
"Action": "kinesis:*",
"Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleInputStream"
},
{
"Sid": "WriteOutputStream",
"Effect": "Allow",
"Action": "kinesis:*",
"Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream"
}
]
}
Pour step-by-step obtenir des instructions sur la création d'une politique d'autorisations, voir Tutoriel : créer et joindre votre première politique gérée par le client dans le guide de IAM l'utilisateur.
Note
Pour accéder à d'autres AWS services, vous pouvez utiliser le AWS SDK for Java. Le service géré pour Apache Flink définit automatiquement les informations d'identification requises par le SDK en fonction du IAM rôle d'exécution du service associé à votre application. Aucune étape supplémentaire n’est nécessaire.
Créez un rôle IAM.
Dans cette section, vous allez créer un IAM rôle que Managed Service for Apache Flink peut assumer pour lire un flux source et écrire dans le flux récepteur.
Le service géré pour Apache Flink ne peut pas accéder à votre flux sans autorisation. Vous accordez ces autorisations via un IAM rôle. Deux politiques sont associées à chaque IAM rôle. La politique d’approbation accorde au service géré pour Apache Flink l’autorisation d’assumer le rôle, et la politique d’autorisation détermine ce que le service géré pour Apache Flink peut faire après avoir assumé le rôle.
Vous attachez la politique d’autorisations que vous avez créée dans la section précédente à ce rôle.
Pour créer un rôle IAM
Ouvrez la IAM console à l'adresse https://console.aws.amazon.com/iam/
. -
Dans le volet de navigation, choisissez Rôles, puis Créer un rôle.
-
Sous Sélectionner le type d'identité fiable, choisissez AWS Un service. Sous Choisir le service qui utilisera ce rôle, choisissez EC2. Sous Sélectionner votre cas d’utilisation, choisissez Kinesis Analytics.
Sélectionnez Next: Permissions (Étape suivante : autorisations).
-
Dans la page Attacher des stratégies d’autorisations, choisissez Suivant : vérification. Vous attachez des stratégies d’autorisations après avoir créé le rôle.
-
Sur la page Créer un rôle, saisissez
KA-stream-rw-role
pour le Nom du rôle. Sélectionnez Créer un rôle.Vous avez maintenant créé un nouveau IAM rôle appelé
KA-stream-rw-role
. Ensuite, vous mettez à jour les stratégies d’approbation et d’autorisation pour le rôle. -
Attachez la politique d’autorisation au rôle.
Note
Dans le cadre de cet exercice, le service géré pour Apache Flink assume ce rôle à la fois pour la lecture des données à partir d’un flux de données Kinesis (source) et pour l’écriture des résultats dans un autre flux de données Kinesis. Vous attachez donc la politique que vous avez créée à l’étape précédente, Créer une stratégie d’autorisations.
-
Sur la page Récapitulatif, choisissez l’onglet Autorisations.
-
Choisissez Attacher des stratégies.
-
Dans la zone de recherche, saisissez
KAReadSourceStreamWriteSinkStream
(la politique que vous avez créée dans la section précédente). -
Choisissez la KAReadInputStreamWriteOutputStreampolitique, puis choisissez Attacher la politique.
-
Vous avez maintenant créé le rôle d’exécution de service que votre application utilise pour accéder aux ressources. Prenez note ARN du nouveau rôle.
Pour step-by-step obtenir des instructions sur la création d'un rôle, consultez la section Création d'un IAM rôle (console) dans le guide de IAM l'utilisateur.
Création de l’application de service géré pour Apache Flink
-
Enregistrez le JSON code suivant dans un fichier nommé
create_request.json
. Remplacez le rôle ARN d'exemple par le rôle que vous avez créé précédemment. ARN Remplacez le ARN suffixe du bucket (
) par le suffixe que vous avez choisi dans la section précédente. Remplacez l’exemple d’ID de compte (username
) dans le rôle d’exécution de service par votre ID de compte.012345678901
{ "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::
012345678901
:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username
", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } } -
Exécutez l’action
CreateApplication
avec la demande précédente pour créer l’application :aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
L’application est maintenant créée. Vous démarrez l’application dans l’étape suivante.
Démarrage de l’application
Dans cette section, vous utilisez l’action StartApplication
pour démarrer l’application.
Pour démarrer l’application
-
Enregistrez le JSON code suivant dans un fichier nommé
start_request.json
.{ "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
-
Exécutez l’action
StartApplication
avec la demande précédente pour démarrer l’application :aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
L’application est maintenant en cours d’exécution. Vous pouvez consulter les métriques du service géré pour Apache Flink sur la CloudWatch console Amazon pour vérifier que l'application fonctionne.
Arrêt de l’application
Dans cette section, vous allez utiliser l’action StopApplication
pour arrêter l’application.
Pour arrêter l’application
-
Enregistrez le JSON code suivant dans un fichier nommé
stop_request.json
.{"ApplicationName": "test" }
-
Exécutez l’action
StopApplication
avec la demande suivante pour arrêter l’application :aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
L’application est maintenant arrêtée.