Étape 1 : Créer les flux d'entrée et de sortie - Guide du développeur d'Amazon Kinesis Data Analytics SQL pour applications

Après mûre réflexion, nous avons décidé de mettre fin à Amazon Kinesis Data Analytics SQL pour les applications en deux étapes :

1. À compter du 15 octobre 2025, vous ne pourrez plus créer de nouveaux Kinesis Data Analytics SQL pour les applications.

2. Nous supprimerons vos candidatures à compter du 27 janvier 2026. Vous ne serez pas en mesure de démarrer ou d'utiliser votre Amazon Kinesis Data Analytics SQL pour les applications. Support ne sera plus disponible pour Amazon Kinesis Data Analytics à partir SQL de cette date. Pour de plus amples informations, veuillez consulter Arrêt d'Amazon Kinesis Data Analytics SQL pour applications.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Étape 1 : Créer les flux d'entrée et de sortie

Avant de créer une application Amazon Kinesis Data Analytics pour l’exemple pour les points chauds, vous créez deux flux de données Kinesis. Configurez l’un des flux en tant que source de streaming pour votre application et l’autre flux en tant que destination où Kinesis Data Analytics conserve la sortie de votre application.

Étape 1.1 : Création des flux de données Kinesis

Dans cette section, vous créez deux flux de données Kinesis : ExampleInputStream et ExampleOutputStream.

Créez ces flux de données à l'aide de la console ou de l'AWS CLI.

  • Pour créer les flux de données à l'aide de la console :

    1. Connectez-vous à la AWS Management Console et ouvrez la console Kinesis à partir de l'adresse https://console.aws.amazon.com/kinesis.

    2. Choisissez Data Streams (Flux de données) dans le volet de navigation.

    3. Choisissez Create Kinesis stream (Créer un flux Kinesis), puis créez un flux avec une partition nommée ExampleInputStream.

    4. Répétez l'étape précédente, en créant un flux avec une seule partition nommée ExampleOutputStream.

  • Pour créer des flux de données à l'aide de l'AWS CLI :

    • Créez des flux (ExampleInputStream et ExampleOutputStream) à l’aide de la commande AWS CLI create-stream Kinesis suivante. Pour créer le deuxième flux que l'application utilisera pour écrire la sortie, exécutez la même commande en remplaçant le nom du flux par ExampleOutputStream.

      $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

Étape 1.2 : Ecriture d'exemples d'enregistrements dans le flux d'entrée

Dans cette étape, vous exécutez du code Python pour générer en continu des exemples d'enregistrements et les écrire dans le flux ExampleInputStream.

{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"} {"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
  1. Installez Python et pip.

    Pour plus d'informations sur l'installation de Python, consultez le site web Python.

    Vous pouvez installer des dépendances à l'aide de pip. Pour plus d'informations sur l'installation de pip, consultez Installation sur le site web de pip.

  2. Exécutez le code Python suivant. Ce code effectue les opérations suivantes :

    • Génère un point chaud potentiel quelque part dans le plan (X, Y).

    • Génère un ensemble de 1 000 points pour chaque point chaud. Parmi ces points, 20 % sont regroupés autour du point chaud. Les autres sont générés de façon aléatoire dans l'ensemble de l'espace.

    • La commande put-record écrit les enregistrements JSON dans le flux.

    Important

    Ne chargez pas ce fichier sur un serveur web, car il contient vos informations d’identification AWS.

    import json from pprint import pprint import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_hotspot(field, spot_size): hotspot = { "left": field["left"] + random.random() * (field["width"] - spot_size), "width": spot_size, "top": field["top"] + random.random() * (field["height"] - spot_size), "height": spot_size, } return hotspot def get_record(field, hotspot, hotspot_weight): rectangle = hotspot if random.random() < hotspot_weight else field point = { "x": rectangle["left"] + random.random() * rectangle["width"], "y": rectangle["top"] + random.random() * rectangle["height"], "is_hot": "Y" if rectangle is hotspot else "N", } return {"Data": json.dumps(point), "PartitionKey": "partition_key"} def generate( stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client ): """ Generates points used as input to a hotspot detection algorithm. With probability hotspot_weight (20%), a point is drawn from the hotspot; otherwise, it is drawn from the base field. The location of the hotspot changes for every 1000 points generated. """ points_generated = 0 hotspot = None while True: if points_generated % 1000 == 0: hotspot = get_hotspot(field, hotspot_size) records = [ get_record(field, hotspot, hotspot_weight) for _ in range(batch_size) ] points_generated += len(records) pprint(records) kinesis_client.put_records(StreamName=stream_name, Records=records) time.sleep(0.1) if __name__ == "__main__": generate( stream_name=STREAM_NAME, field={"left": 0, "width": 10, "top": 0, "height": 10}, hotspot_size=1, hotspot_weight=0.2, batch_size=10, kinesis_client=boto3.client("kinesis"), )

Étape suivante

Étape 2 : Création d’une application Kinesis Data Analytics