As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Implementar o produtor
O aplicativo no Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x usa o cenário real de monitoramento de negociações em bolsa de valores. Os princípios a seguir explicam brevemente como este cenário é mapeado para o produtor e a estrutura de código de apoio.
Consulte o código-fonte e analise as informações a seguir.
- Classe StockTrade
-
Uma negociação de ação individual é representada por uma instância da classe
StockTrade
. Essa instância contém atributos como o símbolo ticker, o preço, o número de ações, o tipo da negociação (compra ou venda) e um ID que identifica a negociação com exclusividade. Essa classe é previamente implementada. - Registro de fluxo
-
Um fluxo é uma sequência de registros. Um registro é uma serialização de uma instância
StockTrade
no formato JSON. Por exemplo:{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
- Classe StockTradeGenerator
-
StockTradeGenerator
tem um método denominadogetRandomTrade()
, que retorna uma nova negociação de ações gerada aleatoriamente sempre que ela é invocada. Essa classe é previamente implementada. - Classe StockTradesWriter
-
O método
main
do produtor,StockTradesWriter
, recupera continuamente uma negociação aleatória e a envia ao Kinesis Data Streams executando as seguintes tarefas:-
Lê o nome do fluxo e o nome da região como entrada.
-
Cria um
AmazonKinesisClientBuilder
. -
Usa o criador do cliente para definir região, credenciais e configuração do cliente.
-
Cria um cliente
AmazonKinesis
usando o criador do cliente. -
Verifica se o fluxo existe e está ativo (se não, ele será encerrado com um erro).
-
Em um loop contínuo, chama o método
StockTradeGenerator.getRandomTrade()
e o métodosendStockTrade
para enviar a negociação ao fluxo a cada 100 milissegundos.
O método
sendStockTrade
da classeStockTradesWriter
tem o seguinte código:private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }
Consulte o desmembramento do código a seguir:
-
A API de
PutRecord
espera uma matriz de bytes, e é necessário convertertrade
para o formato JSON. Essa única linha de código executa a seguinte operação:byte[] bytes = trade.toJsonAsBytes();
-
Antes de enviar a negociação, crie uma nova instância de
PutRecordRequest
(denominadaputRecord
neste caso):PutRecordRequest putRecord = new PutRecordRequest();
Cada chamada a
PutRecord
requer o nome do fluxo, uma chave de partição e um blob de dados. O código a seguir preenche esses campos no objetoputRecord
usando seus métodossetXxxx()
:putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));
O exemplo usa um tíquete de ações como uma chave de partição, que mapeia o registro para um determinado fragmento. Na prática, deve haver centenas ou milhares de chaves de partição por fragmento, de forma que os registros sejam uniformemente disseminados no fluxo. Para obter mais informações sobre como adicionar dados a um fluxo, consulte Adicionar dados a um fluxo.
Agora
putRecord
está pronto para enviar para o cliente (operaçãoput
):kinesisClient.putRecord(putRecord);
-
A verificação e o registro de erros são sempre inclusões úteis. Este código registra condições de erro:
if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }
Adicione o bloco try/catch ao redor da operação
put
:try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }
Isso ocorre porque uma operação
put
do Kinesis Data Streams pode falhar devido a um erro de rede ou porque o fluxo de dados atinge o limite de throughput e tem sua utilização controlada. Recomendamos considerar cuidadosamente sua política de tentativa para operaçõesput
a fim de evitar perda de dados, usando como uma nova tentativa. -
O registro de status é útil mas opcional:
LOG.info("Putting trade: " + trade.toString());
O produtor mostrado aqui usa a funcionalidade de registro único da API do Kinesis Data Streams,
PutRecord
. Na prática, se um produtor individual gerar muitos registros, costuma ser mais eficiente usar a funcionalidade de vários registros dePutRecords
e enviar lotes de registros por vez. Para obter mais informações, consulte Adicionar dados a um fluxo. -
Como executar o produtor
-
Verifique se o par de chave de acesso e chave secreta recuperado anteriormente (durante a criação do usuário do IAM) foi salvo no arquivo
~/.aws/credentials
. -
Execute a classe
StockTradeWriter
com os seguintes argumentos:StockTradeStream us-west-2
Se o fluxo foi criado em uma região diferente de
us-west-2
, é necessário especificar essa região aqui.
A saída deve ser semelhante a:
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
Seu fluxo de negociações de ações agora está sendo ingerido pelo Kinesis Data Streams.