Exemplo: janela em cascata usando ROWTIME - Guia do Desenvolvedor de Amazon Kinesis Data Analytics para aplicativos SQL

Após uma análise cuidadosa, decidimos descontinuar as aplicações do Amazon Kinesis Data Analytics para SQL em duas etapas:

1. A partir de 15 de outubro de 2025, você não poderá mais criar aplicações do Kinesis Data Analytics para SQL.

2. Excluiremos as aplicações a partir de 27 de janeiro de 2026. Você não poderá mais iniciar nem operar as aplicações do Amazon Kinesis Data Analytics para SQL. A partir dessa data, não haverá mais suporte ao Amazon Kinesis Data Analytics para SQL. Para ter mais informações, consulte Descontinuação de aplicações do Amazon Kinesis Data Analytics para SQL.

Exemplo: janela em cascata usando ROWTIME

Quando uma consulta em janela processa cada janela de forma não sobreposta, é chamada de janela em cascata. Para obter detalhes, consulte Janelas em cascata (Agregações usando GROUP BY). Este exemplo do Amazon Kinesis Data Analytics usa a coluna ROWTIME para criar janelas em cascata. A coluna ROWTIME representa o momento em que o registro foi lido pelo aplicativo.

Neste exemplo, você grava os registros a seguir em um streaming de dados do Kinesis.

{"TICKER": "TBV", "PRICE": 33.11} {"TICKER": "INTC", "PRICE": 62.04} {"TICKER": "MSFT", "PRICE": 40.97} {"TICKER": "AMZN", "PRICE": 27.9} ...

Em seguida, crie um aplicativo do Kinesis Data Analytics no AWS Management Console, com o streaming de dados do Kinesis como a origem de streaming. O processo de descoberta lê os registros de exemplo na origem de streaming e infere um esquema no aplicativo com duas colunas (TICKER e PRICE) conforme mostrado a seguir.

Captura de tela do console mostrando o esquema no aplicativo com as colunas de preço e de marcador.

Você usa o código do aplicativo com as funções MIN e MAX para criar uma agregação em janela dos dados. Em seguida, insira os dados resultantes em outro stream no aplicativo, conforme mostrado na captura de tela a seguir:

Captura de tela do console mostrando os dados resultantes em um stream no aplicativo.

No procedimento a seguir, você cria um aplicativo do Kinesis Data Analytics que agrega valores no stream de entrada em uma janela em cascata com base em ROWTIME.

Etapa 1: Criar um fluxo de dados Kinesis

Crie um fluxo de dados do Amazon Kinesis e preencha registros da seguinte forma:

  1. Faça login no AWS Management Console e abra o console do Kinesis em https://console.aws.amazon.com/kinesis.

  2. Selecione Data Streams (Fluxos de dados) no painel de navegação.

  3. Selecione Create Kinesis stream (Criar stream do Kinesis) e crie um stream com um estilhaço. Para obter mais informações, consulte Criar um fluxo no Guia do desenvolvedor do Amazon Kinesis Data Streams.

  4. Para gravar registros em um streaming de dados do Kinesis em um ambiente de produção, recomendamos usar o Kinesis Client Library ou a API do Kinesis Data Streams. Para simplificar, este exemplo usa o script Python a seguir para gerar registros. Execute o código para preencher os registros de marcador de exemplo. Esse código simples grava continuamente um registro de marcador aleatório no stream. Mantenha o script em execução para que você possa gerar o esquema do aplicativo em uma etapa posterior.

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

Etapa 2: Criar o aplicativo Kinesis Data Analytics

Crie um aplicativo Kinesis Data Analytics, da seguinte maneira:

  1. Abra o Managed Service for Apache Flink console em https://console.aws.amazon.com/kinesisanalytics.

  2. Escolha Create application (Criar aplicativo), insira um nome para o aplicativo e selecione Create application (Criar aplicativo).

  3. Na página de detalhes do aplicativo, escolha Connect streaming data (Conectar dados de streaming) para se conectar com a fonte.

  4. Na página Connect to source (Conectar com a fonte), faça o seguinte:

    1. Escolha o stream criado na seção anterior.

    2. Selecione Discover schema (Descobrir esquema). Aguarde o console mostrar o esquema inferido e os registros de exemplos usados para inferir o esquema do stream do aplicativo criado. O esquema inferido tem duas colunas.

    3. Escolha Save schema and update stream samples. Depois que o console salvar o esquema, escolha Exit (Sair).

    4. Escolha Save and continue.

  5. Na página de detalhes de aplicativo, escolha Go to SQL editor (Ir para o editor de SQL). Para iniciar o aplicativo, escolha Yes, start application (Sim, iniciar o aplicativo) na caixa de diálogo exibida.

  6. No editor SQL, escreva o código do aplicativo e verifique os resultados da seguinte forma:

    1. Copie o código de aplicativo a seguir e cole-o no editor.

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), MIN_PRICE REAL, MAX_PRICE REAL); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, MIN(PRICE), MAX(PRICE) FROM "SOURCE_SQL_STREAM_001" GROUP BY TICKER, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
    2. Escolha Save and run SQL.

      Na guia Real-time analytics (Análise em tempo real), você pode ver todos os fluxos de aplicativo criados pelo aplicativo e verificar os dados.