Créez un bloc-notes Studio avec Amazon MSK - Service géré pour Apache Flink

Le service géré Amazon pour Apache Flink était auparavant connu sous le nom d’Amazon Kinesis Data Analytics pour Apache Flink.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Créez un bloc-notes Studio avec Amazon MSK

Ce didacticiel explique comment créer un bloc-notes Studio qui utilise un MSK cluster Amazon comme source.

Configuration d'un MSK cluster Amazon

Pour ce didacticiel, vous avez besoin d'un MSK cluster Amazon qui autorise l'accès en texte brut. Si vous n'avez pas encore configuré de MSK cluster Amazon, suivez le MSK didacticiel Getting Started Using Amazon pour créer un AmazonVPC, un MSK cluster Amazon, un sujet et une instance EC2 client Amazon.

Lorsque vous suivez le didacticiel, procédez comme suit :

Ajoutez une NAT passerelle à votre VPC

Si vous avez créé un MSK cluster Amazon en suivant le MSK didacticiel Getting Started Using Amazon, ou si votre Amazon actuel VPC ne possède pas encore de NAT passerelle pour ses sous-réseaux privés, vous devez ajouter une NAT passerelle à votre AmazonVPC. Le schéma suivant illustre l’architecture.

AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.

Pour créer une NAT passerelle pour votre AmazonVPC, procédez comme suit :

  1. Ouvrez la VPC console Amazon à l'adresse https://console.aws.amazon.com/vpc/.

  2. Choisissez NATGateways dans la barre de navigation de gauche.

  3. Sur la page NATPasserelles, choisissez Create NAT Gateway.

  4. Sur la page Create NAT Gateway, indiquez les valeurs suivantes :

    Nom - facultatif ZeppelinGateway
    Sous-réseau AWS KafkaTutorialSubnet1
    ID d'allocation IP élastique Choisissez une adresse IP élastique disponible. Si aucun Elastic n'IPsest disponible, choisissez Allocation Elastic IP, puis choisissez l'IP Elasic créée par la console.

    Choisissez Create NAT Gateway.

  5. Dans le volet de navigation de gauche, choisissez Tables de routage.

  6. Choisissez Créer une table de routage.

  7. Sur la page Créer une table de routage, fournissez les informations suivantes :

    • Balise de nom : ZeppelinRouteTable

    • VPC: Choisissez votre VPC (par exemple AWS KafkaTutorialVPC).

    Sélectionnez Create (Créer).

  8. Dans la liste des tables de routage, choisissez ZeppelinRouteTable. Choisissez l’onglet Routes, puis Modifier les routes.

  9. Sur la page Modifier les routes, choisissez Ajouter une route.

  10. Dans le Pour Destination, saisissez 0.0.0.0/0. Pour Target, choisissez NATGateway, ZeppelinGateway. Choisissez Enregistrer les routes. Choisissez Close (Fermer).

  11. Sur la page Tables de routage, ZeppelinRouteTablesélectionnez l'onglet Associations de sous-réseaux. Choisissez Modifier les associations de sous-réseaux.

  12. Sur la page Modifier les associations de sous-réseaux, choisissez AWS KafkaTutorialSubnet2 et AWS KafkaTutorialSubnet3. Choisissez Save (Enregistrer).

Création d'une AWS Glue connexion et d'une table

Votre bloc-notes Studio utilise une AWS Gluebase de données pour les métadonnées relatives à votre source de MSK données Amazon. Dans cette section, vous créez une AWS Glue connexion qui décrit comment accéder à votre MSK cluster Amazon et un AWS Glue tableau qui décrit comment présenter les données de votre source de données à des clients tels que votre bloc-notes Studio.

Créer une connexion
  1. Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse https://console.aws.amazon.com/glue/.

  2. Si vous n'avez pas encore de AWS Glue base de données, choisissez Bases de données dans la barre de navigation de gauche. Choisissez Ajouter une base de données. Dans la fenêtre Ajouter une base de données, saisissez default comme nom de la base de données. Sélectionnez Create (Créer).

  3. Dans le menu de navigation de gauche, sélectionnez Connexions. Choisissez Ajouter une connexion.

  4. Dans la fenêtre Ajouter une connexion, indiquez les valeurs suivantes :

    • Pour Nom de connexion, saisissez ZeppelinConnection.

    • Pour Type de connexion, choisissez Kafka.

    • Pour le serveur bootstrap Kafka URLs, fournissez la chaîne de broker bootstrap de votre cluster. Vous pouvez obtenir les courtiers bootstrap depuis la MSK console ou en saisissant la CLI commande suivante :

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • Décochez la case Exiger SSL une connexion.

    Choisissez Suivant.

  5. Dans la VPCpage, indiquez les valeurs suivantes :

    • Pour VPC, choisissez le nom de votre VPC (par exemple AWS KafkaTutorialVPC)

    • Pour Sous-réseau, choisissez AWS KafkaTutorialSubnet2.

    • Pour Groupes de sécurité, sélectionnez tous les groupes disponibles.

    Choisissez Suivant.

  6. Sur la page Propriétés de la connexion/Accès à la connexion, choisissez Terminer.

Création d’une table
Note

Vous pouvez soit créer la table manuellement comme décrit dans les étapes suivantes, soit utiliser le code du connecteur de création de table pour Managed Service for Apache Flink dans votre bloc-notes dans Apache Zeppelin pour créer votre table via une instruction. DDL Vous pouvez ensuite vous enregistrer AWS Glue pour vous assurer que la table a été correctement créée.

  1. Dans la barre de navigation de gauche, choisissez Tables. Sur la page Tables, choisissez Ajouter des tables, Ajouter une table manuellement.

  2. Sur la page Configurer les propriétés de votre table, saisissez stock pour Nom de la table. Assurez-vous de sélectionner la base de données que vous avez créée précédemment. Choisissez Suivant.

  3. Sur la page Ajouter un magasin de données, choisissez Kafka. Pour le nom du sujet, entrez le nom de votre sujet (par exemple AWS KafkaTutorialTopic). Pour Connection, choisissez ZeppelinConnection.

  4. Sur la page Classification, sélectionnez JSON. Choisissez Suivant.

  5. Sur la page Définir un schéma, choisissez Ajouter une colonne pour ajouter une colonne. Ajoutez des colonnes avec les propriétés suivantes :

    Nom de la colonne Type de données
    ticker string
    price double

    Choisissez Suivant.

  6. Sur la page suivante, vérifiez vos paramètres, puis choisissez Terminer.

  7. Choisissez la table que vous venez de créer dans la liste des tables.

  8. Choisissez Modifier le tableau et ajoutez les propriétés suivantes :

    • clé :managed-flink.proctime, valeur : proctime

    • clé :flink.properties.group.id, valeur : test-consumer-group

    • clé :flink.properties.auto.offset.reset, valeur : latest

    • clé :classification, valeur : json

    Sans ces paires clé/valeur, le bloc-notes Flink rencontre une erreur.

  9. Choisissez Appliquer.

Créez un bloc-notes Studio avec Amazon MSK

Maintenant que vous avez créé les ressources utilisées par votre application, vous pouvez créer votre bloc-notes Studio.

Vous pouvez créer votre application à l'aide du AWS Management Console ou du AWS CLI.
Note

Vous pouvez également créer un bloc-notes Studio depuis la MSK console Amazon en choisissant un cluster existant, puis en choisissant Traiter les données en temps réel.

Créez un bloc-notes Studio à l'aide du AWS Management Console

  1. Ouvrez le service géré pour la console Apache Flink https://console.aws.amazon.com/managed-flink/chez vous ? region=us-east-1#/applications/tableau de bord.

  2. Sur la page Applications de service géré pour Apache Flink, choisissez l’onglet Studio. Choisissez Créer un bloc-notes Studio.

    Note

    Pour créer un bloc-notes Studio à partir des consoles Amazon MSK ou Kinesis Data Streams, sélectionnez votre cluster MSK Amazon ou votre flux de données Kinesis d'entrée, puis choisissez Traiter les données en temps réel.

  3. Sur la page Créer un bloc-notes Studio, fournissez les informations suivantes :

    • Pour Nom du bloc-notes Studio, saisissez MyNotebook.

    • Pour Base de données AWS Glue, choisissez Par défaut.

    Choisissez Créer un bloc-notes Studio.

  4. Sur la MyNotebookpage, choisissez l'onglet Configuration. Dans la section Mise en réseau, choisissez Modifier.

  5. Dans la MyNotebook page Modifier le réseau pour, choisissez VPCla configuration basée sur le MSK cluster Amazon. Choisissez votre MSK cluster Amazon pour Amazon MSK Cluster. Sélectionnez Enregistrer les modifications.

  6. Sur la MyNotebookpage, choisissez Exécuter. Attendez que État indique En cours d’exécution.

Créez un bloc-notes Studio à l'aide du AWS CLI

Pour créer votre bloc-notes Studio à l'aide du AWS CLI, procédez comme suit :

  1. Assurez-vous de disposer des informations suivantes. Vous avez besoin de ces valeurs pour créer votre application.

    • Votre ID de compte.

    • L'ID du sous-réseau IDs et du groupe de sécurité pour l'Amazon VPC qui contient votre MSK cluster Amazon.

  2. Créez un fichier nommé create.json avec le contenu suivant. Remplacez les valeurs des espaces réservés par vos informations.

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. Pour créer votre application, exécutez la commande suivante.

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. Lorsque la commande est terminée, vous devriez obtenir une sortie similaire à celle qui suit, montrant les détails de votre nouveau bloc-notes Studio :

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. Pour lancer votre application, exécutez la commande suivante. Remplacez les exemples de valeur par l’identifiant de votre compte.

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

Envoyez des données à votre MSK cluster Amazon

Dans cette section, vous allez exécuter un script Python dans votre EC2 client Amazon pour envoyer des données à votre source de MSK données Amazon.

  1. Connectez-vous à votre EC2 client Amazon.

  2. Exécutez les commandes suivantes pour installer Python version 3, Pip et le package Kafka pour Python, puis confirmez les actions :

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. Configurez le AWS CLI sur votre machine cliente en saisissant la commande suivante :

    aws configure

    Fournissez les informations d’identification de votre compte, et us-east-1 pour la region.

  4. Créez un fichier nommé stock.py avec le contenu suivant. Remplacez la valeur d'échantillon par la chaîne Bootstrap Brokers de votre MSK cluster Amazon et mettez à jour le nom du sujet si ce n'est pas AWS KafkaTutorialTopicle cas :

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. Exécutez le script avec la commande suivante :

    $ python3 stock.py
  6. Laissez le script s’exécuter pendant que vous complétez la section suivante.

Tester votre bloc-notes Studio

Dans cette section, vous utilisez votre bloc-notes Studio pour interroger les données de votre MSK cluster Amazon.

  1. Ouvrez le service géré pour la console Apache Flink https://console.aws.amazon.com/managed-flink/chez vous ? region=us-east-1#/applications/tableau de bord.

  2. Sur la page Applications de service géré pour Apache Flink, choisissez l’onglet Bloc-notes Studio. Choisissez MyNotebook.

  3. Sur la MyNotebookpage, choisissez Ouvrir dans Apache Zeppelin.

    L’interface Apache Zeppelin s’ouvre dans un nouvel onglet.

  4. Sur la page Bienvenue sur Zeppelin !, choisissez Nouvelle note Zeppelin.

  5. Sur la page Note Zeppelin, entrez la requête suivante dans une nouvelle note :

    %flink.ssql(type=update) select * from stock

    Choisissez l’icône d’exécution.

    L'application affiche les données du MSK cluster Amazon.

Pour ouvrir le tableau de bord Apache Flink permettant à votre application de visualiser les aspects opérationnels, choisissez FLINKJOB. Pour plus d’informations sur le tableau de bord Flink, consultez Tableau de bord Apache Flink dans le guide du développeur du service géré pour Apache Flink.

Pour d'autres exemples de SQL requêtes Flink Streaming, consultez la section Requêtes de la documentation Apache Flink.