Entwickeln Sie Verbraucher mit KCL in Java - Amazon-Kinesis-Data-Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Entwickeln Sie Verbraucher mit KCL in Java

Voraussetzungen

Bevor Sie mit der Verwendung von KCL 3.x beginnen, stellen Sie sicher, dass Sie über Folgendes verfügen:

  • Java Development Kit (JDK) 8 oder höher

  • AWS SDK for Java 2.x

  • Maven oder Gradle für das Abhängigkeitsmanagement

KCLsammelt CPU Nutzungsmetriken wie die CPU Auslastung des Rechenhosts, auf dem die Mitarbeiter arbeiten, um die Auslastung auszugleichen und eine gleichmäßige Ressourcenauslastung für alle Mitarbeiter zu erreichen. Um die Erfassung von CPU Nutzungskennzahlen von Mitarbeitern KCL zu ermöglichen, müssen Sie die folgenden Voraussetzungen erfüllen:

Amazon Elastic Compute Cloud(AmazonEC2)

  • Ihr Betriebssystem muss Linux OS sein.

  • Sie müssen es IMDSv2in Ihrer EC2 Instanz aktivieren.

Amazon Elastic Container Service (AmazonECS) bei Amazon EC2

Amazon ECS auf AWS Fargate

  • Sie müssen den Fargate-Task-Metadaten-Endpunkt Version 4 aktivieren. Wenn Sie die Fargate-Plattformversion 1.4.0 oder höher verwenden, ist dies standardmäßig aktiviert.

  • Fargate-Plattformversion 1.4.0 oder höher.

Amazon Elastic Kubernetes Service (AmazonEKS) auf Amazon EC2

  • Ihr Betriebssystem muss Linux OS sein.

Amazon EKS auf AWS Fargate

  • Fargate-Plattform 1.3.0 oder höher.

Wichtig

Wenn es KCL nicht möglich ist, CPU Nutzungskennzahlen von Mitarbeitern zu sammeln, KCL wird auf den Durchsatz pro Mitarbeiter zurückgegriffen, um Leasingverträge zuzuweisen und die Auslastung auf die Mitarbeiter in der Flotte zu verteilen. Weitere Informationen finden Sie unter Wie werden den Mitarbeitern Leasingverträge KCL zugewiesen und die Auslastung verteilt.

Installieren und fügen Sie Abhängigkeiten hinzu

Wenn Sie Maven verwenden, fügen Sie Ihrer pom.xml Datei die folgende Abhängigkeit hinzu. Stellen Sie sicher, dass Sie 3.x.x durch die neueste Version ersetzt haben. KCL

<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>

Wenn Sie Gradle verwenden, fügen Sie Ihrer Datei Folgendes hinzu. build.gradle Stellen Sie sicher, dass Sie 3.x.x durch die neueste Version ersetzt haben. KCL

implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'

Sie können im Maven Central Repository nach der KCL neuesten Version von suchen.

Implementieren Sie den Verbraucher

Eine KCL Verbraucheranwendung besteht aus den folgenden Schlüsselkomponenten:

RecordProcessor

RecordProcessor ist die Kernkomponente, in der sich Ihre Geschäftslogik für die Verarbeitung von Kinesis-Datenstream-Datensätzen befindet. Es definiert, wie Ihre Anwendung die Daten verarbeitet, die sie vom Kinesis-Stream empfängt.

Wichtigste Aufgaben:

  • Initialisieren Sie die Verarbeitung für einen Shard

  • Batches von Datensätzen aus dem Kinesis-Stream verarbeiten

  • Die Verarbeitung für einen Shard herunterfahren (z. B. wenn der Shard geteilt oder zusammengeführt wird oder der Lease an einen anderen Host übergeben wird)

  • Verwalte Checkpoints, um den Fortschritt zu verfolgen

Im Folgenden wird ein Implementierungsbeispiel gezeigt:

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.*; import software.amazon.kinesis.processor.ShardRecordProcessor; public class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()) ); // Checkpoint periodically processRecordsInput.checkpointer().checkpoint(); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting.", t); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } }

Im Folgenden finden Sie eine detaillierte Erläuterung der einzelnen in diesem Beispiel verwendeten Methoden:

initialisieren () InitializationInput initializationInput

  • Zweck: Richten Sie alle erforderlichen Ressourcen oder den Status für die Verarbeitung von Datensätzen ein.

  • Wann es aufgerufen wird: Einmal, wenn diesem Datensatzprozessor ein KCL Shard zugewiesen wird.

  • Die wichtigsten Punkte:

    • initializationInput.shardId(): Die ID des Shards, den dieser Prozessor verarbeiten wird.

    • initializationInput.extendedSequenceNumber(): Die Sequenznummer, von der aus die Verarbeitung gestartet werden soll.

processRecords(ProcessRecordsInput processRecordsInput)

  • Zweck: Verarbeitet die eingehenden Datensätze und überprüft optional den Fortschritt.

  • Wann es aufgerufen wird: Wiederholt, solange der Datensatzprozessor den Leasingvertrag für den Shard hält.

  • Die wichtigsten Punkte:

    • processRecordsInput.records(): Liste der zu verarbeitenden Datensätze.

    • processRecordsInput.checkpointer(): Wird verwendet, um den Fortschritt zu überprüfen.

    • Stellen Sie sicher, dass Sie bei der Verarbeitung alle Ausnahmen berücksichtigt haben, um Fehler KCL zu vermeiden.

    • Diese Methode sollte idempotent sein, da derselbe Datensatz in einigen Szenarien mehr als einmal verarbeitet werden kann, z. B. Daten, die vor einem unerwarteten Absturz oder Neustart des Workers nicht überprüft wurden.

    • Leeren Sie vor dem Checkpoint stets alle gepufferten Daten, um die Datenkonsistenz sicherzustellen.

leaseLost(LeaseLostInput leaseLostInput)

  • Zweck: Bereinigen Sie alle Ressourcen, die für die Verarbeitung dieses Shards spezifisch sind.

  • Wann es aufgerufen wird: Wenn ein anderer Scheduler den Lease für diesen Shard übernimmt.

  • Die wichtigsten Punkte:

    • Checkpointing ist bei dieser Methode nicht erlaubt.

shardEnded(ShardEndedInput shardEndedInput)

  • Zweck: Beenden Sie die Verarbeitung für diesen Shard und diesen Checkpoint.

  • Wann es aufgerufen wird: Wenn der Shard geteilt oder zusammengeführt wird, was bedeutet, dass alle Daten für diesen Shard verarbeitet wurden.

  • Die wichtigsten Punkte:

    • shardEndedInput.checkpointer(): Wird verwendet, um das letzte Checkpointing durchzuführen.

    • Checkpointing ist bei dieser Methode zwingend erforderlich, um die Verarbeitung abzuschließen.

    • Wenn die Daten und der Checkpoint hier nicht geleert werden, kann dies zu Datenverlust oder doppelter Verarbeitung führen, wenn der Shard erneut geöffnet wird.

shutdownRequested(ShutdownRequestedInput shutdownRequestedInput)

  • Zweck: Beim Herunterfahren werden die Ressourcen überprüft und bereinigt. KCL

  • Wann es aufgerufen wird: Wann KCL wird das System heruntergefahren (z. B. wenn die Anwendung beendet wird).

  • Die wichtigsten Punkte:

    • shutdownRequestedInput.checkpointer(): Wird verwendet, um Checkpoints vor dem Herunterfahren durchzuführen.

    • Stellen Sie sicher, dass Sie Checkpointing in der Methode implementiert haben, damit der Fortschritt gespeichert wird, bevor die Anwendung beendet wird.

    • Wenn die Daten und der Checkpoint hier nicht geleert werden, kann dies zu Datenverlust oder zur erneuten Verarbeitung von Datensätzen führen, wenn die Anwendung neu gestartet wird.

Wichtig

KCL3.x sorgt dafür, dass weniger Daten erneut verarbeitet werden, wenn der Mietvertrag von einem Mitarbeiter an einen anderen übergeben wird, indem ein Checkpoint ausgeführt wird, bevor der vorherige Mitarbeiter geschlossen wird. Wenn Sie die Checkpoint-Logik nicht in der shutdownRequested() Methode implementieren, werden Sie diesen Vorteil nicht sehen. Stellen Sie sicher, dass Sie in der Methode eine Checkpoint-Logik implementiert haben. shutdownRequested()

RecordProcessorFactory

RecordProcessorFactory ist verantwortlich für die Erstellung neuer RecordProcessor Instanzen. KCLverwendet diese Factory, um RecordProcessor für jeden Shard, den die Anwendung verarbeiten muss, eine neue zu erstellen.

Wichtigste Aufgaben:

  • Neue RecordProcessor Instanzen bei Bedarf erstellen

  • Stellen Sie sicher, dass jede ordnungsgemäß initialisiert RecordProcessor ist

Im Folgenden finden Sie ein Implementierungsbeispiel:

import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } }

In diesem Beispiel erstellt die Factory bei SampleRecordProcessor jedem Aufruf von shardRecordProcessor () ein neues. Sie können dies um jede erforderliche Initialisierungslogik erweitern.

Scheduler

Der Scheduler ist eine Komponente auf hoher Ebene, die alle Aktivitäten der KCL Anwendung koordiniert. Es ist für die gesamte Orchestrierung der Datenverarbeitung verantwortlich.

Wichtigste Aufgaben:

  • Managen Sie den Lebenszyklus von RecordProcessors

  • Erledigen Sie das Leasingmanagement für Shards

  • Koordinieren Sie die Checkpoints

  • Verteilen Sie die Shard-Verarbeitungslast auf mehrere Worker Ihrer Anwendung

  • Bewältigen Sie Signale für ein ordnungsgemäßes Herunterfahren und Beenden von Anwendungen

Der Scheduler wird normalerweise in der Hauptanwendung erstellt und gestartet. Das Implementierungsbeispiel von Scheduler finden Sie im folgenden Abschnitt, Main Consumer Application.

Wichtigste Verbraucheranwendung

Die Hauptanwendung für Verbraucher verbindet alle Komponenten miteinander. Sie ist dafür verantwortlich, den KCL Consumer einzurichten, die erforderlichen Clients zu erstellen, den Scheduler zu konfigurieren und den Lebenszyklus der Anwendung zu verwalten.

Wichtigste Aufgaben:

  • AWS Service-Clients einrichten (Kinesis, DynamoDB,) CloudWatch

  • Konfigurieren Sie die Anwendung KCL

  • Erstellen und starten Sie den Scheduler

  • Behandelt das Herunterfahren der Anwendung

Im Folgenden finden Sie ein Implementierungsbeispiel:

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import java.util.UUID; public class SampleConsumer { private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; public SampleConsumer(String streamName, Region region) { this.streamName = streamName; this.region = region; this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } public void run() { DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoDbAsyncClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); } public static void main(String[] args) { String streamName = "your-stream-name"; // replace with your stream name Region region = Region.US_EAST_1; // replace with your region new SampleConsumer(streamName, region).run(); } }

KCLerstellt standardmäßig einen Enhanced Fan-out (EFO) -Verbraucher mit dediziertem Durchsatz. Weitere Informationen zu Enhanced Fan-Out finden Sie unter. Entwickeln Sie verbesserte Fan-Out-Verbraucher mit dediziertem Durchsatz Wenn Sie weniger als 2 Verbraucher haben oder keine Verzögerungen bei der Leseverteilung unter 200 ms benötigen, müssen Sie im Scheduler-Objekt die folgende Konfiguration festlegen, um Verbraucher mit gemeinsamem Durchsatz zu verwenden:

configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))

Der folgende Code ist ein Beispiel für die Erstellung eines Scheduler-Objekts, das Verbraucher mit gemeinsamem Durchsatz verwendet:

Importe:

import software.amazon.kinesis.retrieval.polling.PollingConfig;

Kode:

Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) );/