Étape 1 : Créer les flux d'entrée et de sortie - Manuel du développeur des applications Amazon Kinesis Data Analytics pour SQL

Pour les nouveaux projets, nous vous recommandons d’utiliser le nouveau service géré pour Apache Flink Studio plutôt que les applications Kinesis Data Analytics pour SQL. Le service géré pour Apache Flink Studio allie facilité d’utilisation et capacités analytiques avancées, ce qui vous permet de créer des applications sophistiquées de traitement des flux en quelques minutes.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Étape 1 : Créer les flux d'entrée et de sortie

Avant de créer une application Amazon Kinesis Data Analytics pour l’exemple pour les points chauds, vous créez deux flux de données Kinesis. Configurez l’un des flux en tant que source de streaming pour votre application et l’autre flux en tant que destination où Kinesis Data Analytics conserve la sortie de votre application.

Étape 1.1 : Création des flux de données Kinesis

Dans cette section, vous créez deux flux de données Kinesis : ExampleInputStream et ExampleOutputStream.

Créez ces flux de données à l'aide de la console ou de l'AWS CLI.

  • Pour créer les flux de données à l'aide de la console :

    1. Connectez-vous à la AWS Management Console et ouvrez la console Kinesis à partir de l'adresse https://console.aws.amazon.com/kinesis.

    2. Choisissez Data Streams (Flux de données) dans le volet de navigation.

    3. Choisissez Create Kinesis stream (Créer un flux Kinesis), puis créez un flux avec une partition nommée ExampleInputStream.

    4. Répétez l'étape précédente, en créant un flux avec une seule partition nommée ExampleOutputStream.

  • Pour créer des flux de données à l'aide de l'AWS CLI :

    • Créez des flux (ExampleInputStream et ExampleOutputStream) à l’aide de la commande AWS CLI create-stream Kinesis suivante. Pour créer le deuxième flux que l'application utilisera pour écrire la sortie, exécutez la même commande en remplaçant le nom du flux par ExampleOutputStream.

      $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

Étape 1.2 : Ecriture d'exemples d'enregistrements dans le flux d'entrée

Dans cette étape, vous exécutez du code Python pour générer en continu des exemples d'enregistrements et les écrire dans le flux ExampleInputStream.

{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"} {"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
  1. Installez Python et pip.

    Pour plus d'informations sur l'installation de Python, consultez le site web Python.

    Vous pouvez installer des dépendances à l'aide de pip. Pour plus d'informations sur l'installation de pip, consultez Installation sur le site web de pip.

  2. Exécutez le code Python suivant. Ce code effectue les opérations suivantes :

    • Génère un point chaud potentiel quelque part dans le plan (X, Y).

    • Génère un ensemble de 1 000 points pour chaque point chaud. Parmi ces points, 20 % sont regroupés autour du point chaud. Les autres sont générés de façon aléatoire dans l'ensemble de l'espace.

    • La commande put-record écrit les enregistrements JSON dans le flux.

    Important

    Ne chargez pas ce fichier sur un serveur web, car il contient vos informations d’identification AWS.

    import json from pprint import pprint import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_hotspot(field, spot_size): hotspot = { "left": field["left"] + random.random() * (field["width"] - spot_size), "width": spot_size, "top": field["top"] + random.random() * (field["height"] - spot_size), "height": spot_size, } return hotspot def get_record(field, hotspot, hotspot_weight): rectangle = hotspot if random.random() < hotspot_weight else field point = { "x": rectangle["left"] + random.random() * rectangle["width"], "y": rectangle["top"] + random.random() * rectangle["height"], "is_hot": "Y" if rectangle is hotspot else "N", } return {"Data": json.dumps(point), "PartitionKey": "partition_key"} def generate( stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client ): """ Generates points used as input to a hotspot detection algorithm. With probability hotspot_weight (20%), a point is drawn from the hotspot; otherwise, it is drawn from the base field. The location of the hotspot changes for every 1000 points generated. """ points_generated = 0 hotspot = None while True: if points_generated % 1000 == 0: hotspot = get_hotspot(field, hotspot_size) records = [ get_record(field, hotspot, hotspot_weight) for _ in range(batch_size) ] points_generated += len(records) pprint(records) kinesis_client.put_records(StreamName=stream_name, Records=records) time.sleep(0.1) if __name__ == "__main__": generate( stream_name=STREAM_NAME, field={"left": 0, "width": 10, "top": 0, "height": 10}, hotspot_size=1, hotspot_weight=0.2, batch_size=10, kinesis_client=boto3.client("kinesis"), )

Étape suivante

Étape 2 : Création d’une application Kinesis Data Analytics