As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Implementar o produtor
Este tutorial usa o cenário do mundo real de monitoramento de transações da bolsa de valores. Os princípios a seguir explicam brevemente como este cenário é mapeado para o produtor e a estrutura de código de suporte.
Consulte o código-fonte
- Classe StockTrade
-
Uma transação de ações individual é representada por uma instância da classe StockTrade. Essa instância contém atributos como o símbolo ticker, o preço, o número de ações, o tipo da negociação (compra ou venda) e um ID que identifica a negociação com exclusividade. Essa classe é previamente implementada.
- Registro de fluxo
-
Um fluxo é uma sequência de registros. Um registro é uma serialização de uma instância
StockTrade
no formato JSON. Por exemplo:{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
- Classe StockTradeGenerator
-
StockTradeGenerator tem um método chamado
getRandomTrade()
, que retorna uma nova transação de ações gerada aleatoriamente sempre que ela é invocada. Essa classe é previamente implementada. - Classe StockTradesWriter
-
O método
main
do produtor, StockTradesWriter, recupera continuamente uma negociação aleatória e a envia ao Kinesis Data Streams executando as seguintes tarefas:-
Lê o nome do fluxo de dados e o nome da região como entrada.
-
Usa o
KinesisAsyncClientBuilder
para definir região, credenciais e configuração do cliente. -
Verifica se o fluxo existe e está ativo (se não, ele será encerrado com um erro).
-
Em um loop contínuo, chama o método
StockTradeGenerator.getRandomTrade()
e o métodosendStockTrade
para enviar a negociação ao fluxo a cada 100 milissegundos.
O método
sendStockTrade
da classeStockTradesWriter
tem o seguinte código:private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }
Consulte o desmembramento do código a seguir:
-
A API
PutRecord
espera uma matriz de bytes, e é necessário converter a transação para o formato JSON. Essa única linha de código executa a seguinte operação:byte[] bytes = trade.toJsonAsBytes();
-
Antes de enviar a transação, crie uma nova instância de
PutRecordRequest
(chamada solicitação neste caso). Cadarequest
exige o nome do fluxo, uma chave de partição e um blob de dados.PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();
O exemplo usa um tíquete de ações como uma chave de partição, que mapeia o registro para um determinado fragmento. Na prática, deve haver centenas ou milhares de chaves de partição por fragmento, de forma que os registros sejam uniformemente disseminados no fluxo. Para obter mais informações sobre como adicionar dados a um fluxo, consulte Gravar dados no Amazon Kinesis Data Streams.
Agora,
request
está pronto para enviar para o cliente (operação put):kinesisClient.putRecord(request).get();
-
A verificação e o registro de erros são sempre inclusões úteis. Este código registra condições de erro:
if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }
Adicione o bloco try/catch ao redor da operação
put
:try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }
Isso ocorre porque uma operação put do Kinesis Data Streams pode falhar devido a erro de rede ou porque o fluxo de dados pode atingir o limite de throughput e ficar limitado. É recomendado considerar cuidadosamente sua política de tentativa para operações
put
a fim de evitar perda de dados, por exemplo, usando como uma nova tentativa. -
O registro de status é útil mas opcional:
LOG.info("Putting trade: " + trade.toString());
O produtor mostrado aqui usa a funcionalidade de registro único da API do Kinesis Data Streams,
PutRecord
. Na prática, se um produtor individual gerar muitos registros, costuma ser mais eficiente usar a funcionalidade de vários registros dePutRecords
e enviar lotes de registros por vez. Para obter mais informações, consulte Gravar dados no Amazon Kinesis Data Streams. -
Como executar o produtor
-
Verifique se a chave de acesso e o par de chaves secretas recuperados em Criar um usuário e uma política do IAM estão salvos no arquivo
~/.aws/credentials
. -
Execute a classe
StockTradeWriter
com os seguintes argumentos:StockTradeStream us-west-2
Se o fluxo foi criado em uma região diferente de
us-west-2
, será necessário especificar esta região.
A saída deve ser semelhante a:
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
Suas negociações de ações agora estão sendo ingeridas pelo Kinesis Data Streams.