Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Implementieren Sie den Verbraucher
Die Verbraucheranwendung in diesem Tutorial verarbeitet die Wertpapiertransaktionen im Daten-Stream kontinuierlich. Sie gibt dann für jede Minute die beliebtesten Aktien aus, die gekauft und verkauft wurden. Die Anwendung basiert auf der Kinesis Client Library (KCL), die einen Großteil der Arbeit erledigt, die für Verbraucher-Apps üblich ist. Weitere Informationen finden Sie unter KCLInformationen zu 1.x und 2.x.
Überprüfen Sie die folgenden Informationen in Bezug auf den Quellcode.
- StockTradesProcessor Klasse
-
Die für Sie bereitgestellte Hauptklasse des Verbrauchers, die die folgenden Aufgaben erfüllt:
-
Liest die als Argumente übergebenen Anwendungs-, Datenstrom- und Regionsnamen.
-
Erzeugt eine
KinesisAsyncClient
Instanz mit dem Namen der Region. -
Erstellt eine
StockTradeRecordProcessorFactory
-Instance für die Instances vonShardRecordProcessor
, implementiert von einerStockTradeRecordProcessor
-Instance. -
Erzeugt eine
ConfigsBuilder
Instanz mit derStockTradeRecordProcessorFactory
InstanzKinesisAsyncClient
StreamName
ApplicationName
,, und. Dies ist für das Erstellen aller Konfigurationen mit Standardwerten nützlich. -
Erstellt mit der Instanz einen KCL Scheduler (in KCL Version 1.x wurde er früher als KCL Worker bezeichnet).
ConfigsBuilder
-
Der Scheduler erstellt für jeden Shard (der dieser Verbraucher-Instance zugeordnet ist) einen neuen Thread, der in einer Schleife die Datensätze aus dem Daten-Stream liest. Anschließend wird die
StockTradeRecordProcessor
-Instance aufgerufen, um die empfangenen Datensatzstapel zu verarbeiten.
-
- StockTradeRecordProcessor Klasse
-
Implementierung der
StockTradeRecordProcessor
-Instance, die wiederum fünf erforderliche Methoden implementiert:initialize
,processRecords
,leaseLost
,shardEnded
undshutdownRequested
.Die
shutdownRequested
Methodeninitialize
und werden von der verwendet, KCL um dem Aufzeichnungsprozessor mitzuteilen, wann er bereit sein sollte, mit dem Empfang von Datensätzen zu beginnen bzw. wann er damit rechnen sollte, keine Aufzeichnungen mehr zu empfangen, sodass er alle anwendungsspezifischen Einrichtungs- und Beendungsaufgaben ausführen kann.leaseLost
undshardEnded
werden verwendet, um eine beliebige Logik dafür zu implementieren, was zu tun ist, wenn ein Lease verloren geht oder eine Verarbeitung das Ende eines Shards erreicht hat. In diesem Beispiel protokollieren wir einfach Meldungen dieser Ereignisse.Der Code für diese Methoden wird für Sie bereitgestellt. Die wesentliche Verarbeitung erfolgt mit der
processRecords
Methode, die wiederumprocessRecord
für die einzelnen Datensätze nutzt. Die letztgenannte Methode wird als nahezu leerer Skeleton-Code bereitgestellt und im nächsten Schritt (der weitere Informationen enthält) implementiert.Beachten Sie außerdem die Implementierung der Hilfsmethoden für
processRecord
:reportStats
undresetStats
, die im ursprünglichen Quellcode leer sind.Die
processRecords
-Methode wurde für Sie implementiert und führt die folgenden Schritte aus:-
Für jeden übergebenen Datensatz wird
processRecord
aufgerufen. -
Ruft
reportStats()
zum Drucken der neuesten Statistiken auf, wenn seit dem letzten Bericht mindestens 1 Minute vergangen ist, und dannresetStats()
, um die Statistiken zu löschen, damit das nächste Intervall nur neue Datensätze enthält. -
Legt den Zeitpunkt für die nächste Berichterstellung fest.
-
Ruft
checkpoint()
auf, wenn seit dem letzten Prüfpunkt mindestens 1 Minute vergangen ist. -
Legt den Zeitpunkt für das nächste Checkpointing fest.
Diese Methode verwendet für das Checkpointing und die Berichterstellung ein Intervall von 60 Sekunden. Weitere Informationen zum Checkpointing finden Sie unter Verwendung der Kinesis Client Library.
-
- StockStats Klasse
-
Diese Klasse stellt eine Datenaufbewahrung und eine Nachverfolgung von Statistiken für die beliebtesten Aktien bereit. Dieser Code wird für Sie bereitgestellt und enthält folgende Methoden:
-
addStockTrade(StockTrade)
: fügt die angegebeneStockTrade
in die ausgeführten Statistiken ein. -
toString()
: gibt die Statistiken als formatierte Zeichenfolge zurück.
Diese Klasse verfolgt die beliebtesten Aktien, indem sie fortlaufend die Gesamtzahl der Trades für jede Aktie und deren maximale Anzahl zählt. Sie aktualisiert diese Werte, sobald eine neue Handelstransaktion empfangen wird.
-
Fügen Sie Code zu den Methoden der StockTradeRecordProcessor
-Klasse hinzu, wie in den folgenden Schritten gezeigt.
So implementieren Sie den Konsumenten
-
Implementieren Sie die
processRecord
-Methode, indem Sie ein richtig bemessenesStockTrade
-Objekt instanziieren und die Datensatzdaten zu diesem hinzufügen, sodass im Falle eines Problems eine Warnung protokolliert wird.byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
-
Implementieren Sie eine
reportStats
Methode. Ändern Sie das Ausgabeformat nach Ihren Wünschen.System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
-
Implementieren Sie die Methode
resetStats
, die eine neuestockStats
-Instance erstellt.stockStats = new StockStats();
-
Implementieren Sie die folgenden Methoden, die für die
ShardRecordProcessor
Schnittstelle erforderlich sind:@Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
So führen Sie den Konsumenten aus
-
Führen Sie den unter Implementieren Sie den Hersteller erstellten Produzenten aus, um simulierte Wertpapiertransaktionsdatensätze in den Stream zu schreiben.
-
Stellen Sie sicher, dass der Zugriffsschlüssel und das geheime key pair, das Sie zuvor (beim Erstellen des IAM Benutzers) abgerufen haben, in der Datei gespeichert sind
~/.aws/credentials
. -
Führen Sie die
StockTradesProcessor
-Klasse mit den folgenden Argumenten aus:StockTradesProcessor StockTradeStream us-west-2
Beachten Sie, dass Sie, wenn Sie Ihren Stream in einer anderen Region als
us-west-2
erstellt haben, stattdiesen diese Region hier angeben müssen.
Nach einer Minute sollen Sie eine Ausgabe ähnlich der folgenden sehen, die anschließend einmal pro Minute aktualisiert wird:
****** Shard shardId-000000000001 stats for last 1 minute ******
Most popular stock being bought: WMT, 27 buys.
Most popular stock being sold: PTR, 14 sells.
****************************************************************
Nächste Schritte
(Optional) Erweitern Sie den Consumer-Bereich