Exemple : fenêtre bascule utilisant ROWTIME - Guide du développeur d'Amazon Kinesis Data Analytics SQL pour applications

Après mûre réflexion, nous avons décidé de mettre fin à Amazon Kinesis Data Analytics SQL pour les applications en deux étapes :

1. À compter du 15 octobre 2025, vous ne pourrez plus créer de nouveaux Kinesis Data Analytics SQL pour les applications.

2. Nous supprimerons vos candidatures à compter du 27 janvier 2026. Vous ne serez pas en mesure de démarrer ou d'utiliser votre Amazon Kinesis Data Analytics SQL pour les applications. Support ne sera plus disponible pour Amazon Kinesis Data Analytics à partir SQL de cette date. Pour de plus amples informations, veuillez consulter Arrêt d'Amazon Kinesis Data Analytics SQL pour applications.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exemple : fenêtre bascule utilisant ROWTIME

Lorsqu'une requête à fenêtres traite chaque fenêtre sans chevauchement, la fenêtre est appelée fenêtre bascule. Pour plus de détails, veuillez consulter Fenêtres bascules (regroupements à l'aide de GROUP BY). Cet exemple Amazon Kinesis Data Analytics utilise la colonne ROWTIME pour créer des fenêtres bascules. La colonne ROWTIME représente le moment où l'enregistrement a été lu par l'application.

Dans cet exemple, vous écrivez les enregistrements suivants dans un flux de données Kinesis.

{"TICKER": "TBV", "PRICE": 33.11} {"TICKER": "INTC", "PRICE": 62.04} {"TICKER": "MSFT", "PRICE": 40.97} {"TICKER": "AMZN", "PRICE": 27.9} ...

Vous créez ensuite une application Kinesis Data Analytics dans l’AWS Management Console, avec le flux de données Kinesis comme source de streaming. Le processus de découverte lit les exemples d'enregistrements sur la source de streaming et en déduit un schéma intégré à l'application avec deux colonnes (TICKER et PRICE), comme illustré ci-dessous.

Capture d'écran de la console montrant le schéma intégré à l'application avec les colonnes de prix et de symbole boursier.

Vous utilisez le code de l'application à l'aide du MIN et des fonctions MAX pour créer un regroupement des données avec fenêtres. Vous insérez ensuite les données obtenues dans un autre flux intégré à l'application, comme indiqué dans la capture d'écran suivante :

Capture d'écran de la console montrant les données obtenues dans un flux intégré à l'application.

Dans la procédure suivante, vous créez une application Kinesis Data Analytics qui regroupe les valeurs dans le flux d’entrée dans une fenêtre bascule basée sur ROWTIME.

Étape 1 : Création d’un flux de données Kinesis

Créez un flux de données Amazon Kinesis et remplissez les enregistrements comme suit :

  1. Connectez-vous à la AWS Management Console et ouvrez la console Kinesis à partir de l'adresse https://console.aws.amazon.com/kinesis.

  2. Choisissez Data Streams (Flux de données) dans le volet de navigation.

  3. Choisissez Create Kinesis stream (Créer un flux Kinesis), puis créez un flux avec une seule partition. Pour de plus amples informations, consultez Créer un flux dans le Guide du développeur Amazon Kinesis Data Streams.

  4. Pour écrire des enregistrements sur un flux de données Kinesis dans un environnement de production, nous vous recommandons d'utiliser Kinesis Client Library ou les API de flux de données Kinesis. Pour plus de simplicité, cet exemple utilise le script Python ci-dessous pour générer des enregistrements. Exécutez le code pour remplir les exemples d'enregistrements du symbole boursier. Ce code simple écrit de façon continue un enregistrement du symbole boursier aléatoire dans le flux. Laissez le script s'exécuter pour pouvoir générer le schéma d'application lors d'une étape ultérieure.

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

Étape 2 : Création d’une application Kinesis Data Analytics

Créez une application Kinesis Data Analytics comme suit :

  1. Ouvrez la console du service géré pour Apache Flink à l’adresse https://console.aws.amazon.com/kinesisanalytics.

  2. Choisissez Créer une application, saisissez un nom d'application, puis sélectionnez Créer une application.

  3. Sur la page de détails de l'application, choisissez Connect streaming data (Connecter des données de diffusion) pour vous connecter à la source.

  4. Sur la page Connect to source (Se connecter à la source), procédez comme suit :

    1. Choisissez le flux que vous avez créé dans la section précédente.

    2. Choisissez Discover schema (Découvrir le schéma). Attendez que la console affiche le schéma déduit et les exemples d'enregistrements qui sont utilisés pour déduire le schéma pour le flux intégré à l'application créé. Le schéma déduit comporte deux colonnes.

    3. Choisissez Save schema and update stream samples (Enregistrer le schéma et mettre à jour les exemples de flux). Une fois que la console a enregistré le schéma, choisissez Exit (Quitter).

    4. Choisissez Save and continue (Enregistrer et continuer).

  5. Sur la page de détails de l'application, choisissez Go to SQL editor (Accéder à l'éditeur SQL). Pour lancer l'application, choisissez Yes, start application (Oui, démarrer l'application) dans la boîte de dialogue qui s'affiche.

  6. Dans l'éditeur SQL, écrivez le code d'application et vérifiez les résultats comme suit :

    1. Copiez le code d'application suivant et collez-le dans l'éditeur.

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), MIN_PRICE REAL, MAX_PRICE REAL); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, MIN(PRICE), MAX(PRICE) FROM "SOURCE_SQL_STREAM_001" GROUP BY TICKER, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
    2. Choisissez Save and run SQL (Enregistrer et exécuter SQL).

      Dans l'onglet Real-time analytics (Analyse en temps réel), vous pouvez voir tous les flux intégrés à l'application que l'application a créés et vérifier les données.