Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Implementa il consumatore
L'applicazione consumer nel Tutorial: elabora i dati di borsa in tempo reale utilizzando KPL e KCL 1.x elabora in modo continuo il flusso di negoziazioni creato in Implementa il produttore. Quindi, genera i titoli più acquistati e venduti ogni minuto. L'applicazione si basa sulla Kinesis Client Library (KCL), che svolge gran parte del lavoro pesante comune alle app di consumo. Per ulteriori informazioni, consulta Sviluppa KCL consumatori 1.x.
Consulta il codice sorgente e rivedi le informazioni riportate di seguito.
- StockTradesProcessor classe
-
Classe principale del consumer, fornita per te, che esegue queste attività:
-
Legge il nome dell'applicazione, del flusso e della regione, passati come argomenti.
-
Legge le credenziali da
~/.aws/credentials
. -
Crea un'istanza
RecordProcessorFactory
che serve istanze diRecordProcessor
, implementate da un'istanzaStockTradeRecordProcessor
. -
Crea un KCL worker con l'
RecordProcessorFactory
istanza e una configurazione standard che include il nome del flusso, le credenziali e il nome dell'applicazione. -
Il worker crea un nuovo thread per ciascuna partizione (assegnata a questa istanza consumer), che in un ciclo continuo legge i record dal flusso di dati Kinesis. Quindi invoca l'istanza
RecordProcessor
per elaborare ogni batch di record ricevuto.
-
- StockTradeRecordProcessor classe
-
Implementazione dell'istanza
RecordProcessor
, che a sua volta implementa tre metodi richiesti:initialize
,processRecords
eshutdown
.Come suggeriscono i nomi,
initialize
eshutdown
vengono utilizzati dalla Kinesis Client Library per consentire all'elaboratore di record di sapere quando dovrebbe essere pronto a iniziare a ricevere record e quando dovrebbe aspettarsi di non ricevere più record, in modo da poter effettuare qualsiasi attività di configurazione e cessazione specifica per l'applicazione. Il codice relativo viene fornito per te. L'elaborazione principale si verifica nel metodoprocessRecords
, che a sua volta utilizzaprocessRecord
per ogni record. Quest'ultimo metodo viene fornito come codice di base per lo più vuoto da implementare nella fase successiva, dove è spiegato ulteriormente.Da segnalare è anche l'implementazione dei metodi di supporto per
processRecord
, ovveroreportStats
eresetStats
, che sono vuoti nel codice sorgente originale.Il metodo
processRecords
viene implementato per te ed esegue questa procedura:-
Per ogni record passato, chiama
processRecord
su di esso. -
Se è trascorso almeno 1 minuto dall'ultimo report, chiama
reportStats()
, che consente di stampare le statistiche più recenti, seguito daresetStats()
, che cancella le statistiche in modo che l'intervallo successivo includa solo i nuovi record. -
Imposta l'orario della creazione di report successiva.
-
Se è trascorso almeno 1 minuto dall'ultimo checkpoint, chiama
checkpoint()
. -
Imposta l'orario della creazione di checkpoint successiva.
Questo metodo utilizza intervalli di 60 secondi per la frequenza di creazione di report e checkpoint. Per ulteriori informazioni sulla creazione di checkpoint, consulta Informazioni aggiuntive sul consumatore.
-
- StockStats classe
-
Questa classe fornisce la conservazione dei dati e il monitoraggio delle statistiche per i titoli più comuni nel tempo. Questo codice viene fornito per te e include i seguenti metodi:
-
addStockTrade(StockTrade)
: inserisce un datoStockTrade
nelle statistiche in esecuzione. -
toString()
: restituisce le statistiche in una stringa formattata.
Questa classe tiene traccia delle azioni più popolari tenendo un conteggio progressivo del numero totale di negoziazioni per ogni azione e del conteggio massimo. Aggiorna questi conteggi ogni volta che si verifica uno scambio.
-
Aggiungi codice ai metodi della classe StockTradeRecordProcessor
, come mostrato nella procedura seguente.
Per implementare il consumer
-
Implementare il metodo
processRecord
creando un'istanza di un oggettoStockTrade
delle dimensioni corrette e aggiungendo a essa i dati del record, registrando un avviso se si verifica un problema.StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array()); if (trade == null) { LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey()); return; } stockStats.addStockTrade(trade);
-
Implementare un metodo
reportStats
semplice. Modificare liberamente il formato di output in base alle proprie preferenze.System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
-
Infine, implementare il metodo
resetStats
, che crea una nuova istanzastockStats
.stockStats = new StockStats();
Per eseguire il consumer
-
Eseguire il producer scritto in Implementa il produttore per inserire record di scambi simulati nel flusso.
-
Verificare che la chiave di accesso e la coppia di chiavi segrete recuperate in precedenza (durante la creazione IAM dell'utente) siano salvate nel file
~/.aws/credentials
. -
Eseguire la classe
StockTradesProcessor
con i seguenti argomenti:StockTradesProcessor StockTradeStream us-west-2
Nota: se è stato creato un flusso in una regione diversa da
us-west-2
, è necessario specificare quella regione qui.
Dopo un minuto, si dovrebbe visualizzare un output come il seguente, aggiornato ogni minuto:
****** Shard shardId-000000000001 stats for last 1 minute ******
Most popular stock being bought: WMT, 27 buys.
Most popular stock being sold: PTR, 14 sells.
****************************************************************
Informazioni aggiuntive sul consumatore
Se hai familiarità con i vantaggi della Kinesis Client Library discussi in Sviluppa KCL consumatori 1.x e altrove, potresti chiederti perché utilizzarla qui. Sebbene si utilizzi solo un singolo shard stream e una singola istanza consumer per elaborarlo, è comunque più semplice implementare il consumer utilizzando. KCL Confronta la procedura di implementazione di codice nella sezione del producer con quella del consumer e potrai avere un'idea della maggiore semplicità di implementazione di un consumer. Ciò è dovuto in gran parte ai servizi che KCL fornisce.
In questa applicazione, devi concentrarti sull'implementazione di una classe di processore di record in grado di elaborare singoli record. Non devi preoccuparti di come i record vengono recuperati da Kinesis Data Streams; recupera i record e richiama KCL il processore di registrazione ogni volta che sono disponibili nuovi record. Inoltre, non devi preoccuparti nemmeno di quante istanze shard e consumer sono presenti. Se il flusso è stato incrementato, non è necessario riscrivere l'applicazione per gestire più di un'istanza shard o consumer.
Il termine checkpointing significa registrare il punto del flusso fino ai record di dati che sono stati utilizzati ed elaborati finora. Se l'applicazione si blocca, lo stream viene letto da quel punto e non dall'inizio dello stream. Il soggetto della creazione di checkpoint e i relativi modelli di progettazione e best practice non verranno trattati in questo capitolo. Tuttavia, è qualcosa che potresti incontrare negli ambienti di produzione.
Come illustrato in precedenzaImplementa il produttore, le put
operazioni in Kinesis API Data Streams utilizzano come input una chiave di partizione. Il flusso di dati Kinesis utilizza una chiave di partizione come meccanismo per dividere i record tra più partizioni (quando è presente più di una partizione nel flusso). La stessa chiave di partizione esegue l'instradamento sempre allo stesso shard. In questo modo, il consumer che elabora un determinato shard può essere progettato presupponendo che i record con la stessa chiave di partizione vengano inviati solo a quel consumer e che nessun record con la stessa chiave di partizione venga inviato a un altro consumer. Pertanto, il ruolo di lavoro di un consumer può aggregare tutti i record con la stessa chiave di partizione senza trascurare dati necessari.
In questa applicazione, l'elaborazione dei record da parte del consumatore non è intensiva, quindi puoi utilizzare uno shard ed eseguire l'elaborazione nello stesso thread del thread. KCL Tuttavia, prima valuta la possibilità di incrementare il numero di shard. In alcuni casi, potresti voler trasferire l'elaborazione a un altro thread oppure utilizzare un pool di thread se prevedi che il processo di elaborazione dei record sia intensivo. In questo modo, KCL possono recuperare nuovi record più rapidamente mentre gli altri thread possono elaborare i record in parallelo. La progettazione multithread non è banale e deve essere affrontata con tecniche avanzate, quindi aumentare il numero di frammenti è di solito il modo più efficace per scalare.
Passaggi successivi
(Facoltativo) Estendi il numero di utenti