Ejemplo: transformación DateTime de valores - Guía para desarrolladores de Amazon Kinesis Data Analytics SQL para aplicaciones

Tras considerarlo detenidamente, hemos decidido interrumpir Amazon Kinesis Data Analytics SQL para aplicaciones en dos pasos:

1. A partir del 15 de octubre de 2025, no podrá crear nuevos Kinesis Data Analytics SQL para aplicaciones.

2. Eliminaremos sus aplicaciones a partir del 27 de enero de 2026. No podrá iniciar ni utilizar Amazon Kinesis Data Analytics SQL para aplicaciones. A partir de ese momento, el soporte para Amazon Kinesis Data Analytics dejará SQL de estar disponible. Para obtener más información, consulte Suspensión de Amazon Kinesis Data Analytics SQL for Applications.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Ejemplo: transformación DateTime de valores

Amazon Kinesis Data Analytics admite la conversión de columnas en marcas temporales. Por ejemplo, es posible que desee utilizar su propia marca temporal como parte de una cláusula GROUP BY y utilizarla como otra ventana basada en el tiempo, además de la columna ROWTIME. Kinesis Data Analytics proporciona operaciones y funciones de SQL para trabajar con campos de fecha y hora.

  • Operadores de fecha y hora: puede realizar operaciones aritméticas en tipos de datos de intervalos, fechas y tiempos. Para obtener más información, consulte Operadores de fecha, tiempos e intervalos en la Referencia de SQL de Amazon Managed Service para Apache Flink.

     

  • Funciones de SQL: incluyen las siguientes. Para obtener más información, consulte Funciones de hora y tiempo en la Referencia de SQL de Amazon Managed Service para Apache Flink.

    • EXTRACT(): extrae un campo a partir de una expresión de intervalo, fecha, hora o marca temporal.

    • CURRENT_TIME: devuelve la hora a la que se ejecuta la consulta (UTC).

    • CURRENT_DATE: devuelve la fecha en la que se ejecuta la consulta (UTC).

    • CURRENT_TIMESTAMP: devuelve la marca temporal del momento en que se ejecuta la consulta (UTC).

    • LOCALTIME: devuelve la hora actual del momento en el que se ejecuta la consulta según lo establecido en el entorno en el que se está ejecutando Kinesis Data Analytics (UTC).

    • LOCALTIMESTAMP: devuelve la marca temporal actual según lo establecido en el entorno en el que se está ejecutando Kinesis Data Analytics (UTC).

       

  • Extensiones de SQL: incluyen las siguientes. Para obtener más información, consulte Funciones de fecha y hora y Funciones de conversión de fecha y hora en la Referencia de SQL de Amazon Managed Service para Apache Flink.

    • CURRENT_ROW_TIMESTAMP: devuelve una nueva marca temporal para cada fila de la secuencia.

    • TSDIFF: devuelve la diferencia entre dos marcas temporales en milisegundos.

    • CHAR_TO_DATE: convierte una cadena en una fecha.

    • CHAR_TO_TIME: convierte una cadena en una hora.

    • CHAR_TO_TIMESTAMP: convierte una cadena en una marca temporal.

    • DATE_TO_CHAR: convierte una fecha en una cadena.

    • TIME_TO_CHAR: convierte una hora en una cadena.

    • TIMESTAMP_TO_CHAR: convierte una marca temporal en una cadena.

La mayoría de las funciones de SQL anteriores utilizan un formato para convertir las columnas. El formato es flexible. Por ejemplo, puede especificar el formato yyyy-MM-dd hh:mm:ss para convertir una cadena de entrada 2009-09-16 03:15:24 en una marca temporal. Para obtener más información, consulte Char To Timestamp(Sys) en la Referencia de SQL de Amazon Managed Service para Apache Flink.

Ejemplo: Transformación de fechas

En este ejemplo, escribirá los siguientes registros en un flujo de datos de Amazon Kinesis.

{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"} {"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"} {"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"} ...

A continuación, creará una aplicación de análisis de datos de Kinesis Data Analytics en la consola, con el flujo de datos de Kinesis como origen de streaming. El proceso de detección lee los registros de muestra en el origen de streaming e infiere un esquema en la aplicación con dos columnas (EVENT_TIME y TICKER), tal como se muestra a continuación.

Imagen de pantalla de la consola que muestra el esquema en la aplicación con las columnas event time y ticker.​

A continuación, utilizará el código de la aplicación con funciones SQL para convertir el campo de marca temporal EVENT_TIME de diversas maneras. Por último, insertará los datos resultantes en otra secuencia en la aplicación, tal y como se muestra en la siguiente captura de pantalla:

Imagen de pantalla de la consola que muestra los datos resultantes en una secuencia en la aplicación.

Paso 1: crear un flujo de datos de Kinesis

Cree un flujo de datos de Amazon Kinesis y rellénela con registros event time y ticker de la siguiente manera:

  1. Inicie sesión en la consola Kinesis AWS Management Console y ábrala en https://console.aws.amazon.com/kinesis.

  2. Elija Flujos de datos en el panel de navegación.

  3. Elija Crear secuencia de Kinesis y a continuación cree una secuencia con una partición.

  4. Ejecute el siguiente código de Python para rellenar la secuencia con datos de muestra. Este código de ejemplo escribe continuamente un registro con un símbolo de cotización aleatorio y la marca temporal actual en la secuencia.

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

Paso 2: creación de una aplicación de Amazon Kinesis Data Analytics

Cree una aplicación de la siguiente manera:

  1. Abra la consola de Managed Service para Apache Flink en https://console.aws.amazon.com/kinesisanalytics.

  2. Elija Create application (Crear aplicación), escriba el nombre de la aplicación y elija Create application (Crear aplicación).

  3. En la página de detalles de la aplicación, elija Connect streaming data (Conectar datos de streaming) para conectarse al origen.

  4. En la página Connect to source (Conectarse al origen), haga lo siguiente:

    1. Elija la secuencia que ha creado en la sección anterior.

    2. Elija crear un rol de IAM.

    3. Seleccione Detectar esquema. Espere a que la consola muestre el esquema inferido y los registros de muestra utilizados para inferir en el esquema de la secuencia en la aplicación que ha creado. El esquema inferido cuenta con dos columnas.

    4. Elija Edit Schema (Editar esquema). Cambie el Column type (Tipo de columna) de la columna EVENT_TIME a TIMESTAMP.

    5. Elija Save schema and update stream samples. Después de que la consola guarde el esquema, elija Exit (Salir).

    6. Elija Guardar y continuar.

  5. En la página de detalles de la aplicación, elija Go to SQL editor (Ir al editor de SQL). Para iniciar la aplicación, elija Yes, start application (Sí, iniciar la aplicación) en el cuadro de diálogo que aparece.

  6. En el editor de SQL, escriba el código de la aplicación y verifique los resultados como se indica a continuación:

    1. Copie el siguiente código de la aplicación y péguelo en el editor.

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
    2. Elija Save and run SQL. En la pestaña Real-time analytics (Análisis en tiempo real), puede ver todas las secuencias en la aplicación creadas por esta y comprobar los datos.