Exemplo: Transformação de valores DateTime - Guia do desenvolvedor do Amazon Kinesis Data Analytics SQL para aplicativos

Após uma análise cuidadosa, decidimos descontinuar o Amazon Kinesis Data Analytics SQL para aplicativos em duas etapas:

1. A partir de 15 de outubro de 2025, você não poderá criar um novo Kinesis Data Analytics SQL para aplicativos.

2. Excluiremos seus aplicativos a partir de 27 de janeiro de 2026. Você não poderá iniciar ou operar seu Amazon Kinesis Data Analytics SQL para aplicativos. O suporte não estará mais disponível para o Amazon Kinesis Data Analytics SQL a partir desse momento. Para obter mais informações, consulte Descontinuação do Amazon Kinesis Data Analytics SQL para aplicativos.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Exemplo: Transformação de valores DateTime

O Amazon Kinesis Data Analytics suporta a conversão de colunas em time stamps. Por exemplo, use seu próprio time stamp como parte de uma cláusula GROUP BY como outra janela baseado em tempo, além da coluna ROWTIME. O Kinesis Data Analytics fornece operações e funções SQL para trabalhar com campos de data e hora.

  • Operadores de data e hora: você pode executar operações aritméticas em datas, horas e tipos de dados de intervalo. Para obter mais informações, consulte Operadores de data, time stamp e intervalo em Amazon Managed Service for Apache Flink SQL Reference.

     

  • Funções SQL: incluem o seguinte. Para obter mais informações, consulte Funções de data e hora em Amazon Managed Service for Apache Flink SQL Reference.

    • EXTRACT() - Extrai um campo a partir de uma data, hora, time stamp ou expressão de intervalo.

    • CURRENT_TIME – Retorna a hora em que a consulta é executada (UTC).

    • CURRENT_DATE – Retorna a data em que a consulta é executada (UTC).

    • CURRENT_TIMESTAMP – Retorna o time stamp em que a consulta é executada (UTC).

    • LOCALTIME – Retorna a hora atual em que a consulta é executada, conforme definido pelo ambiente no qual o Kinesis Data Analytics está sendo executado (UTC).

    • LOCALTIMESTAMP - Retorna o time stamp atual, conforme definido pelo ambiente no qual o Kinesis Data Analytics está sendo executado (UTC).

       

  • Extensões SQL: incluem o seguinte. Para obter mais informações, consulte Funções de data e hora e Funções de conversão de data e hora em Amazon Managed Service for Apache Flink SQL Reference.

    • CURRENT_ROW_TIMESTAMP - Retorna um novo time stamp para cada linha no fluxo.

    • TSDIFF - Retorna a diferença de dois time stamps em milissegundos.

    • CHAR_TO_DATE - Converte uma string em data.

    • CHAR_TO_TIME – Converte uma string em hora.

    • CHAR_TO_TIMESTAMP – Converte uma string em time stamp.

    • DATE_TO_CHAR – Converte uma data em string.

    • TIME_TO_CHAR – Converte uma hora em string.

    • TIMESTAMP_TO_CHAR – Converte um time stamp em uma string.

A maioria das funções SQL anteriores usam um formato para converter as colunas. O formato é flexível. Por exemplo, você pode especificar o formato yyyy-MM-dd hh:mm:ss para converter a string de entrada 2009-09-16 03:15:24 em time stamp. Para obter mais informações, consulte Char To Timestamp (Sys) em Amazon Managed Service for Apache Flink SQL Reference.

Exemplo: transformação de datas

Neste exemplo, você grava os registros a seguir em um fluxo de dados do Amazon Kinesis.

{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"} {"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"} {"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"} ...

Em seguida, você criará um aplicativo Kinesis Data Analytics no console com o stream Kinesis como origem de streaming. O processo de descoberta lê registros de exemplo na origem de streaming e infere um esquema de aplicativo com duas colunas (EVENT_TIME e TICKER), como mostrado.

Captura de tela do console mostrando o esquema no aplicativo com o tempo do evento e as colunas do marcador.

Em seguida, use o código do aplicativo com funções SQL para converter o campo de timestamp EVENT_TIME de várias maneiras. Insira dos dados resultantes em outro fluxo de aplicativo, como mostramos na captura de tela a seguir:

Captura de tela do console mostrando os dados resultantes em um stream no aplicativo.

Etapa 1: Criar um fluxo de dados Kinesis

Crie um Amazon Kinesis Data Streams e preencha com hora do evento e os registros do marcador, da seguinte maneira:

  1. Faça login AWS Management Console e abra o console do Kinesis em https://console.aws.amazon.com/kinesis.

  2. Selecione Data Streams (Fluxos de dados) no painel de navegação.

  3. Escolha Create Kinesis stream (Criar fluxo do Kinesis) e crie um fluxo com um estilhaço.

  4. Execute o seguinte código Python para preencher o fluxo com dados de amostra. Esse código simples grava continuamente um registro com um símbolo de marcador aleatório e o timestamp atual no fluxo.

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

Etapa 2: Criar o aplicativo Amazon Kinesis Data Analytics

Crie um aplicativo da seguinte maneira:

  1. Abra o Managed Service for Apache Flink console em https://console.aws.amazon.com/kinesisanalytics.

  2. Escolha Create application (Criar aplicativo), digite um nome para o aplicativo e selecione Create application (Criar aplicativo).

  3. Na página de detalhes do aplicativo, escolha Connect streaming data (Conectar dados de streaming) para se conectar com a fonte.

  4. Na página Connect to source (Conectar com a fonte), faça o seguinte:

    1. Escolha o stream criado na seção anterior.

    2. Escolha para criar uma função do IAM.

    3. Escolha Discover schema (Descobrir esquema). Aguarde o console mostrar o esquema inferido e os registros de exemplo usados para inferir o esquema do fluxo do aplicativo criado. O esquema inferido tem duas colunas.

    4. Escolha Edit Schema (Editar esquema). Mude Column type (Tipo de coluna) da coluna EVENT_TIME para TIMESTAMP.

    5. Escolha Save schema and update stream samples. Depois que o console salvar o esquema, escolha Exit (Sair).

    6. Escolha Save and continue.

  5. Na página de detalhes de aplicativo, escolha Go to SQL editor (Ir para o editor de SQL). Para iniciar o aplicativo, escolha Yes, start application (Sim, iniciar o aplicativo) na caixa de diálogo exibida.

  6. No editor SQL, escreva o código do aplicativo e verifique os resultados da seguinte forma:

    1. Copie o código de aplicativo a seguir e cole-o no editor.

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
    2. Escolha Save and run SQL. Na guia Real-time analytics (Análise em tempo real), você pode ver todos os fluxos de aplicativo criados pelo aplicativo e verificar os dados.