Implementieren Sie den Verbraucher - Amazon-Kinesis-Data-Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Implementieren Sie den Verbraucher

Die Verbraucheranwendung in diesem Tutorial verarbeitet die Wertpapiertransaktionen im Daten-Stream kontinuierlich. Sie gibt dann für jede Minute die beliebtesten Aktien aus, die gekauft und verkauft wurden. Die Anwendung basiert auf der Kinesis Client Library (KCL), die einen Großteil der Arbeit erledigt, die für Verbraucher-Apps üblich ist. Weitere Informationen finden Sie unter KCLInformationen zu 1.x und 2.x.

Überprüfen Sie die folgenden Informationen in Bezug auf den Quellcode.

StockTradesProcessor Klasse

Die für Sie bereitgestellte Hauptklasse des Verbrauchers, die die folgenden Aufgaben erfüllt:

  • Liest die als Argumente übergebenen Anwendungs-, Datenstrom- und Regionsnamen.

  • Erzeugt eine KinesisAsyncClient Instanz mit dem Namen der Region.

  • Erstellt eine StockTradeRecordProcessorFactory-Instance für die Instances von ShardRecordProcessor, implementiert von einer StockTradeRecordProcessor-Instance.

  • Erzeugt eine ConfigsBuilder Instanz mit der StockTradeRecordProcessorFactory Instanz KinesisAsyncClient StreamNameApplicationName,, und. Dies ist für das Erstellen aller Konfigurationen mit Standardwerten nützlich.

  • Erstellt mit der Instanz einen KCL Scheduler (in KCL Version 1.x wurde er früher als KCL Worker bezeichnet). ConfigsBuilder

  • Der Scheduler erstellt für jeden Shard (der dieser Verbraucher-Instance zugeordnet ist) einen neuen Thread, der in einer Schleife die Datensätze aus dem Daten-Stream liest. Anschließend wird die StockTradeRecordProcessor-Instance aufgerufen, um die empfangenen Datensatzstapel zu verarbeiten.

StockTradeRecordProcessor Klasse

Implementierung der StockTradeRecordProcessor-Instance, die wiederum fünf erforderliche Methoden implementiert: initialize, processRecords, leaseLost, shardEnded und shutdownRequested.

Die shutdownRequested Methoden initialize und werden von der verwendet, KCL um dem Aufzeichnungsprozessor mitzuteilen, wann er bereit sein sollte, mit dem Empfang von Datensätzen zu beginnen bzw. wann er damit rechnen sollte, keine Aufzeichnungen mehr zu empfangen, sodass er alle anwendungsspezifischen Einrichtungs- und Beendungsaufgaben ausführen kann. leaseLostund shardEnded werden verwendet, um eine beliebige Logik dafür zu implementieren, was zu tun ist, wenn ein Lease verloren geht oder eine Verarbeitung das Ende eines Shards erreicht hat. In diesem Beispiel protokollieren wir einfach Meldungen dieser Ereignisse.

Der Code für diese Methoden wird für Sie bereitgestellt. Die wesentliche Verarbeitung erfolgt mit der processRecords Methode, die wiederum processRecord für die einzelnen Datensätze nutzt. Die letztgenannte Methode wird als nahezu leerer Skeleton-Code bereitgestellt und im nächsten Schritt (der weitere Informationen enthält) implementiert.

Beachten Sie außerdem die Implementierung der Hilfsmethoden für processRecord: reportStats und resetStats, die im ursprünglichen Quellcode leer sind.

Die processRecords-Methode wurde für Sie implementiert und führt die folgenden Schritte aus:

  • Für jeden übergebenen Datensatz wird processRecord aufgerufen.

  • Ruft reportStats() zum Drucken der neuesten Statistiken auf, wenn seit dem letzten Bericht mindestens 1 Minute vergangen ist, und dann resetStats(), um die Statistiken zu löschen, damit das nächste Intervall nur neue Datensätze enthält.

  • Legt den Zeitpunkt für die nächste Berichterstellung fest.

  • Ruft checkpoint() auf, wenn seit dem letzten Prüfpunkt mindestens 1 Minute vergangen ist.

  • Legt den Zeitpunkt für das nächste Checkpointing fest.

Diese Methode verwendet für das Checkpointing und die Berichterstellung ein Intervall von 60 Sekunden. Weitere Informationen zum Checkpointing finden Sie unter Verwendung der Kinesis Client Library.

StockStats Klasse

Diese Klasse stellt eine Datenaufbewahrung und eine Nachverfolgung von Statistiken für die beliebtesten Aktien bereit. Dieser Code wird für Sie bereitgestellt und enthält folgende Methoden:

  • addStockTrade(StockTrade): fügt die angegebene StockTrade in die ausgeführten Statistiken ein.

  • toString(): gibt die Statistiken als formatierte Zeichenfolge zurück.

Diese Klasse verfolgt die beliebtesten Aktien, indem sie fortlaufend die Gesamtzahl der Trades für jede Aktie und deren maximale Anzahl zählt. Sie aktualisiert diese Werte, sobald eine neue Handelstransaktion empfangen wird.

Fügen Sie Code zu den Methoden der StockTradeRecordProcessor-Klasse hinzu, wie in den folgenden Schritten gezeigt.

So implementieren Sie den Konsumenten
  1. Implementieren Sie die processRecord-Methode, indem Sie ein richtig bemessenes StockTrade-Objekt instanziieren und die Datensatzdaten zu diesem hinzufügen, sodass im Falle eines Problems eine Warnung protokolliert wird.

    byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
  2. Implementieren Sie eine reportStats Methode. Ändern Sie das Ausgabeformat nach Ihren Wünschen.

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. Implementieren Sie die Methode resetStats, die eine neue stockStats-Instance erstellt.

    stockStats = new StockStats();
  4. Implementieren Sie die folgenden Methoden, die für die ShardRecordProcessor Schnittstelle erforderlich sind:

    @Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
So führen Sie den Konsumenten aus
  1. Führen Sie den unter Implementieren Sie den Hersteller erstellten Produzenten aus, um simulierte Wertpapiertransaktionsdatensätze in den Stream zu schreiben.

  2. Stellen Sie sicher, dass der Zugriffsschlüssel und das geheime key pair, das Sie zuvor (beim Erstellen des IAM Benutzers) abgerufen haben, in der Datei gespeichert sind~/.aws/credentials.

  3. Führen Sie die StockTradesProcessor-Klasse mit den folgenden Argumenten aus:

    StockTradesProcessor StockTradeStream us-west-2

    Beachten Sie, dass Sie, wenn Sie Ihren Stream in einer anderen Region als us-west-2 erstellt haben, stattdiesen diese Region hier angeben müssen.

Nach einer Minute sollen Sie eine Ausgabe ähnlich der folgenden sehen, die anschließend einmal pro Minute aktualisiert wird:

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

Nächste Schritte

(Optional) Erweitern Sie den Consumer-Bereich