Exemplo: agregação dos resultados parciais de uma consulta - Guia do desenvolvedor do Amazon Kinesis Data Analytics SQL para aplicativos

Para novos projetos, recomendamos que você use o novo Managed Service para Apache Flink Studio em vez do Kinesis Data Analytics for Applications. SQL O Managed Service for Apache Flink Studio combina facilidade de uso com recursos analíticos avançados, permitindo que você crie aplicativos sofisticados de processamento de stream em minutos.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Exemplo: agregação dos resultados parciais de uma consulta

Se um streaming de dados do Amazon Kinesis contiver registros que tenham um horário do evento que não corresponda exatamente ao horário da ingestão, uma seleção de resultados em uma janela em cascata contém os registros que chegaram, mas não necessariamente ocorreram, dentro da janela. Nesse caso, a janela em cascata conterá somente um conjunto parcial dos resultados que você deseja. Há várias abordagens que você pode usar para corrigir o problema:

  • Use somente uma janela em cascata e agregue resultados parciais no pós-processamento por meio de um banco de dados ou data warehouse usando upserts. Essa abordagem é eficiente no processamento de um aplicativo. Ela manipula os dados atrasados indefinidamente para operadores agregados (sum, min, max, e assim por diante). A desvantagem dessa abordagem é que você deve desenvolver e manter lógica adicional do aplicativo na camada de banco de dados.

  • Use uma janela deslizante e em cascata, que produz resultados parciais logo, mas também continua a produzir resultados completos ao longo do período da janela deslizante. Essa abordagem trata os dados tardios com uma substituição em vez de um upsert para que nenhuma lógica adicional do aplicativo precise ser adicionada à camada de banco de dados. A desvantagem dessa abordagem é que ela usa mais unidades de processamento do Kinesis (KPUs) e ainda produz dois resultados, o que pode não funcionar para alguns casos de uso.

Para obter mais informações sobre janelas deslizantes e em cascata, consulte Consultas em janelas.

No procedimento a seguir, a agregação de janela em cascata produz dois resultados parciais (enviados ao stream no aplicativo CALC_COUNT_SQL_STREAM) que devem ser combinados para produzir um resultado final. O aplicativo, então, produz uma segunda agregação (enviada ao stream no aplicativo DESTINATION_SQL_STREAM) que combina os dois resultados parciais.

Para criar um aplicativo que agrega resultados parciais usando um horário do evento
  1. Faça login no AWS Management Console e abra o console do Kinesis em https://console.aws.amazon.com/kinesis.

  2. Selecione Data Analytics (Análise de dados) no painel de navegação. Crie um aplicativo do Kinesis Data Analytics conforme descrito no tutorial Conceitos básicos do Amazon Kinesis Data Analytics para aplicativos SQL.

  3. No editor SQL, substitua o código de aplicativo pelo seguinte:

    CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM" (TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime", COUNT(*) AS "TickerCount" FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE), STEP("SOURCE_SQL_STREAM_001"."APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE), TICKER_SYMBOL; CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER", "TRADETIME", SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" FROM "CALC_COUNT_SQL_STREAM" WINDOW W1 AS (PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING);

    A instrução SELECT no código de aplicativo filtra as linhas no SOURCE_SQL_STREAM_001 por alterações de preços de ações maiores de 1% e insere essas linhas em outro stream no aplicativo CHANGE_STREAM usando uma bomba.

  4. Escolha Save and run SQL.

A primeira bomba produz um stream para CALC_COUNT_SQL_STREAM que é semelhante ao seguinte (observe que o conjunto de resultados está incompleto):

Captura de tela do console mostrando resultados parciais.

A segunda bomba produz um stream para DESTINATION_SQL_STREAM que contém o conjunto de resultados completo:

Captura de tela do console mostrando resultados completos.