Exemplo: transformação de vários tipos de dados - Guia do desenvolvedor do Amazon Kinesis Data Analytics SQL para aplicativos

Para novos projetos, recomendamos que você use o novo Managed Service para Apache Flink Studio em vez do Kinesis Data Analytics for Applications. SQL O Managed Service for Apache Flink Studio combina facilidade de uso com recursos analíticos avançados, permitindo que você crie aplicativos sofisticados de processamento de stream em minutos.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Exemplo: transformação de vários tipos de dados

Um requisito comum nos aplicativos ETL (extração, transformação e carregamento) é o processamento de vários tipos de registro em uma origem de streaming. Você pode criar aplicativos Kinesis Data Analytics para processar esses tipos de origens de streaming. O processo é o seguinte:

  1. Primeiro, mapeie a origem de streaming para um fluxo de entrada de aplicativo, semelhante a todos os outros aplicativos Kinesis Data Analytics.

  2. Em seguida, no código do seu aplicativo, grave instruções SQL para recuperar linhas de tipos específicos do fluxo de entrada no aplicativo. Em seguida, insira-as em fluxos separados no aplicativo. (Você pode criar fluxos adicionais no aplicativo no código do seu aplicativo.)

Neste exercício, há uma origem de streaming que recebe registros de dois tipos (Order e Trade). Esses são pedidos de ações e suas negociações correspondentes. Para cada pedido, pode haver zero ou mais negociações. Veja a seguir registros de exemplo de cada tipo:

Order record

{"RecordType": "Order", "Oprice": 9047, "Otype": "Sell", "Oid": 3811, "Oticker": "AAAA"}

Trade record

{"RecordType": "Trade", "Tid": 1, "Toid": 3812, "Tprice": 2089, "Tticker": "BBBB"}

Quando você cria um aplicativo usando o AWS Management Console, o console exibe o seguinte esquema inferido para o fluxo de entrada no aplicativo criado. Por padrão, o console nomeia esse fluxo de aplicativo como SOURCE_SQL_STREAM_001.

Captura de tela do console mostrando o exemplo de um fluxo no aplicativo formatado.

Quando você salvar a configuração, o Amazon Kinesis Data Analytics lerá continuamente dados na origem de streaming e inserirá linhas no stream no aplicativo. Agora você pode fazer a análise de dados no stream no aplicativo.

Neste exemplo, primeiro você cria dois fluxos de aplicativo adicionais no código do aplicativo, que são Order_Stream e Trade_Stream. Depois, filtra as linhas do fluxo SOURCE_SQL_STREAM_001 com base no tipo de registro e as insere nos fluxos recém-criados usando bombas. Para obter informações sobre esse padrão de codificação, consulte Código do aplicativo.

  1. Filtragem de linhas de pedidos e negociações em fluxos de aplicativo separados:

    1. Filtre os registros de pedidos no SOURCE_SQL_STREAM_001 e salve os pedidos no Order_Stream.

      --Create Order_Stream. CREATE OR REPLACE STREAM "Order_Stream" ( order_id integer, order_type varchar(10), ticker varchar(4), order_price DOUBLE, record_type varchar(10) ); CREATE OR REPLACE PUMP "Order_Pump" AS INSERT INTO "Order_Stream" SELECT STREAM oid, otype,oticker, oprice, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Order';
    2. Filtre os registros de negociações no SOURCE_SQL_STREAM_001 e salve os pedidos no Trade_Stream.

      --Create Trade_Stream. CREATE OR REPLACE STREAM "Trade_Stream" (trade_id integer, order_id integer, trade_price DOUBLE, ticker varchar(4), record_type varchar(10) ); CREATE OR REPLACE PUMP "Trade_Pump" AS INSERT INTO "Trade_Stream" SELECT STREAM tid, toid, tprice, tticker, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Trade';
  2. Agora você pode executar análises adicionais nesses fluxos. Neste exemplo, conte o número de negociações pelo índice em uma janela em cascata de um minuto e salve os resultados em outro fluxo, DESTINATION_SQL_STREAM.

    --do some analytics on the Trade_Stream and Order_Stream. -- To see results in console you must write to OPUT_SQL_STREAM. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker varchar(4), trade_count integer ); CREATE OR REPLACE PUMP "Output_Pump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker, count(*) as trade_count FROM "Trade_Stream" GROUP BY ticker, FLOOR("Trade_Stream".ROWTIME TO MINUTE);

    Você verá o resultado a seguir:

    Captura de tela do console mostrando os resultados na guia de resultados do SQL.
Próxima etapa

Etapa 1: Preparar os dados