Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Implementa il produttore
L'applicazione nel Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x utilizza come scenario quello del monitoraggio del mercato azionario reale. I seguenti principi illustrano brevemente in che modo questo scenario è mappato alla struttura del codice producer e di supporto.
Consulta il codice sorgente e rivedi le informazioni riportate di seguito.
- StockTrade classe
-
Una singola negoziazione è rappresentata da un'istanza della classe
StockTrade
. Questa istanza include attributi come il simbolo dei titoli, il prezzo, il numero di azioni, il tipo di operazione (acquisto o vendita) e un ID univoco che identifica l'operazione. Questa classe è implementata per te. - Record di flusso
-
Un flusso è una sequenza di record. Un record è una serializzazione di un'
StockTrade
istanza in JSON formato. Per esempio:{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
- StockTradeGenerator classe
-
StockTradeGenerator
include un metodo denominatogetRandomTrade()
che restituisce una negoziazione generata casualmente ogni volta che viene invocata. Questa classe è implementata per te. - StockTradesWriter classe
-
Il metodo
main
del producer,StockTradesWriter
recupera continuamente uno scambio casuale e lo invia al flusso di dati Kinesis eseguendo queste operazioni:-
Legge il nome del flusso e il nome della regione come input.
-
Crea un
AmazonKinesisClientBuilder
. -
Utilizza il generatore client per impostare la regione, le credenziali e la configurazione del client.
-
Crea un client
AmazonKinesis
utilizzando il generatore di client. -
Verifica che il flusso esista e sia attivo (in caso contrario, si chiude con un errore).
-
In un ciclo continuo, chiama il metodo
StockTradeGenerator.getRandomTrade()
e quindi il metodosendStockTrade
per inviare lo scambio al flusso ogni 100 millisecondi.
Il metodo
sendStockTrade
della classeStockTradesWriter
include il codice seguente:private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }
Fai riferimento alla seguente suddivisione del codice:
-
PutRecord
APISi aspetta un array di byte e devitrade
convertirlo JSON in formato. Questa singola riga di codice esegue tale operazione:byte[] bytes = trade.toJsonAsBytes();
-
Prima di poter inviare lo scambio, devi creare una nuova istanza
PutRecordRequest
(denominataputRecord
in questo caso):PutRecordRequest putRecord = new PutRecordRequest();
Ogni chiamata
PutRecord
richiede il nome del flusso, la chiave di partizione e il blob di dati. Il codice seguente compila questi campi nell'oggettoputRecord
utilizzando i relativi metodisetXxxx()
:putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));
Questo esempio utilizza un ticket per titoli come chiave di partizione, che mappa il record a un determinato shard. In pratica, dovresti avere centinaia o migliaia di chiavi di partizione per shard, in modo che i record vengano distribuiti in modo uniforme in tutto il flusso. Per ulteriori informazioni su come aggiungere dati a un flusso, consulta Aggiungi dati a uno stream.
Ora
putRecord
è pronto per l'invio al client (operazioneput
):kinesisClient.putRecord(putRecord);
-
La verifica e la registrazione degli errori sono sempre aggiunte utili. Questo codice registra le condizioni di errore:
if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }
Aggiungi il blocco try/catch per l'operazione
put
:try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }
Questo è perché un'operazione
put
del flusso di dati Kinesis potrebbe non riuscire a causa di un errore di rete o perché il flusso di dati raggiunge il limite di velocità di trasmissione effettiva e viene sottoposto a limitazione. Si consiglia di valutare attentamente la politica di riprova perput
le operazioni volte a evitare la perdita di dati, ad esempio utilizzando un nuovo tentativo. -
La registrazione dello stato è utile, ma opzionale:
LOG.info("Putting trade: " + trade.toString());
Il produttore mostrato qui utilizza la funzionalità di registrazione singola di Kinesis API Data Streams,.
PutRecord
In pratica, se un producer genera numerosi record, spesso è più efficiente utilizzare la funzionalità record multipli diPutRecords
e inviare batch di record ogni volta. Per ulteriori informazioni, consulta Aggiungi dati a uno stream. -
Per eseguire il producer
-
Verificare che la chiave di accesso e la coppia di chiavi segrete recuperate in precedenza (durante la creazione IAM dell'utente) siano salvate nel file
~/.aws/credentials
. -
Eseguire la classe
StockTradeWriter
con i seguenti argomenti:StockTradeStream us-west-2
Se è stato creato un flusso in una regione diversa da
us-west-2
, è necessario specificare quella regione qui.
Verrà visualizzato un output simile al seguente:
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
Il flusso di negoziazioni viene ora importato dal flusso di dati Kinesis.