Implementar o produtor - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Implementar o produtor

O aplicativo no Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x usa o cenário real de monitoramento de negociações em bolsa de valores. Os princípios a seguir explicam brevemente como este cenário é mapeado para o produtor e a estrutura de código de apoio.

Consulte o código-fonte e analise as informações a seguir.

Classe StockTrade

Uma negociação de ação individual é representada por uma instância da classe StockTrade. Essa instância contém atributos como o símbolo ticker, o preço, o número de ações, o tipo da negociação (compra ou venda) e um ID que identifica a negociação com exclusividade. Essa classe é previamente implementada.

Registro de fluxo

Um fluxo é uma sequência de registros. Um registro é uma serialização de uma instância StockTrade no formato JSON. Por exemplo:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
Classe StockTradeGenerator

StockTradeGenerator tem um método denominado getRandomTrade(), que retorna uma nova negociação de ações gerada aleatoriamente sempre que ela é invocada. Essa classe é previamente implementada.

Classe StockTradesWriter

O método main do produtor, StockTradesWriter, recupera continuamente uma negociação aleatória e a envia ao Kinesis Data Streams executando as seguintes tarefas:

  1. Lê o nome do fluxo e o nome da região como entrada.

  2. Cria um AmazonKinesisClientBuilder.

  3. Usa o criador do cliente para definir região, credenciais e configuração do cliente.

  4. Cria um cliente AmazonKinesis usando o criador do cliente.

  5. Verifica se o fluxo existe e está ativo (se não, ele será encerrado com um erro).

  6. Em um loop contínuo, chama o método StockTradeGenerator.getRandomTrade() e o método sendStockTrade para enviar a negociação ao fluxo a cada 100 milissegundos.

O método sendStockTrade da classe StockTradesWriter tem o seguinte código:

private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }

Consulte o desmembramento do código a seguir:

  • A API de PutRecord espera uma matriz de bytes, e é necessário converter trade para o formato JSON. Essa única linha de código executa a seguinte operação:

    byte[] bytes = trade.toJsonAsBytes();
  • Antes de enviar a negociação, crie uma nova instância de PutRecordRequest (denominada putRecord neste caso):

    PutRecordRequest putRecord = new PutRecordRequest();

    Cada chamada a PutRecord requer o nome do fluxo, uma chave de partição e um blob de dados. O código a seguir preenche esses campos no objeto putRecord usando seus métodos setXxxx():

    putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));

    O exemplo usa um tíquete de ações como uma chave de partição, que mapeia o registro para um determinado fragmento. Na prática, deve haver centenas ou milhares de chaves de partição por fragmento, de forma que os registros sejam uniformemente disseminados no fluxo. Para obter mais informações sobre como adicionar dados a um fluxo, consulte Adicionar dados a um fluxo.

    Agora putRecord está pronto para enviar para o cliente (operação put):

    kinesisClient.putRecord(putRecord);
  • A verificação e o registro de erros são sempre inclusões úteis. Este código registra condições de erro:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    Adicione o bloco try/catch ao redor da operação put:

    try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }

    Isso ocorre porque uma operação put do Kinesis Data Streams pode falhar devido a um erro de rede ou porque o fluxo de dados atinge o limite de throughput e tem sua utilização controlada. Recomendamos considerar cuidadosamente sua política de tentativa para operações put a fim de evitar perda de dados, usando como uma nova tentativa.

  • O registro de status é útil mas opcional:

    LOG.info("Putting trade: " + trade.toString());

O produtor mostrado aqui usa a funcionalidade de registro único da API do Kinesis Data Streams, PutRecord. Na prática, se um produtor individual gerar muitos registros, costuma ser mais eficiente usar a funcionalidade de vários registros de PutRecords e enviar lotes de registros por vez. Para obter mais informações, consulte Adicionar dados a um fluxo.

Como executar o produtor
  1. Verifique se o par de chave de acesso e chave secreta recuperado anteriormente (durante a criação do usuário do IAM) foi salvo no arquivo ~/.aws/credentials.

  2. Execute a classe StockTradeWriter com os seguintes argumentos:

    StockTradeStream us-west-2

    Se o fluxo foi criado em uma região diferente de us-west-2, é necessário especificar essa região aqui.

A saída deve ser semelhante a:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Seu fluxo de negociações de ações agora está sendo ingerido pelo Kinesis Data Streams.

Próximas etapas

Implementar o consumidor