Entwickeln Sie benutzerdefinierte Verbraucher mit gemeinsamem Durchsatz mithilfe der AWS SDK for Java - Amazon-Kinesis-Data-Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Entwickeln Sie benutzerdefinierte Verbraucher mit gemeinsamem Durchsatz mithilfe der AWS SDK for Java

Eine der Methoden für die Entwicklung benutzerdefinierter Kinesis Data Streams-Verbraucher, die durchgängig gemeinsam genutzt werden, ist die Verwendung von Amazon Kinesis Data Streams. APIs In diesem Abschnitt wird die Verwendung der Kinesis Data Streams APIs mit dem AWS SDK für Java beschrieben. Der Java-Beispielcode in diesem Abschnitt zeigt, wie grundlegende KDS API Operationen ausgeführt werden. Er ist logisch nach Operationstypen unterteilt.

Diese Beispiele stellen keinen produktionsbereiten Code dar. Sie überprüfen nicht alle möglichen Ausnahmen und es werden nicht alle möglichen Sicherheits- oder Leistungsüberlegungen berücksichtigt.

Sie können die Kinesis Data Streams APIs mit anderen Programmiersprachen aufrufen. Weitere Informationen zu allen verfügbaren AWS SDKs Produkten finden Sie unter Start Developing with Amazon Web Services.

Wichtig

Die empfohlene Methode für die Entwicklung benutzerdefinierter Kinesis Data Streams Streams-Benutzer, die durchgehend gemeinsam genutzt werden, ist die Verwendung der Kinesis Client Library ()KCL. KCLhilft Ihnen dabei, Daten aus einem Kinesis-Datenstrom zu nutzen und zu verarbeiten, indem es sich um viele der komplexen Aufgaben kümmert, die mit verteilter Datenverarbeitung verbunden sind. Weitere Informationen finden Sie unter Entwicklung benutzerdefinierter Verbraucher mit gemeinsamem Durchsatz unter Verwendung von KCL.

Daten aus einem Stream abrufen

Die Kinesis Data Streams APIs umfassen die getRecords Methoden getShardIterator und, die Sie aufrufen können, um Datensätze aus einem Datenstream abzurufen. Dies ist das PULL-Modell, bei dem der Code Datensätze direkt aus den Shards des Datenstroms abruft.

Wichtig

Wir empfehlen, dass Sie die von angebotene Unterstützung für den Record Processor verwendenKCL, um Datensätze aus Ihren Datenströmen abzurufen. Dies ist das PUSH-Modell, bei dem Sie den Code implementieren, der die Daten verarbeitet. Der KCL ruft Datensätze aus dem Datenstrom ab und übermittelt sie an Ihren Anwendungscode. Darüber hinaus KCL bietet das Funktionen für Failover, Wiederherstellung und Lastenausgleich. Weitere Informationen finden Sie unter Entwicklung benutzerdefinierter Verbraucher mit gemeinsamem Durchsatz unter Verwendung von KCL.

In einigen Fällen ziehen Sie es jedoch möglicherweise vor, die Kinesis Data Streams APIs zu verwenden. Dies ist beispielsweise der Fall, wenn Sie benutzerdefinierte Tools für das Überwachen und Debuggen Ihrer Datenströme implementieren.

Wichtig

Kinesis Data Streams unterstützt Änderungen des Zeitraums der Datensatzaufbewahrung für einen Datenstrom. Weitere Informationen finden Sie unter Ändern Sie den Aufbewahrungszeitraum für Daten.

Verwenden Sie Shard-Iteratoren

Sie rufen Datensätze aus dem Stream pro Shard ab. Für jeden Shard und jeden Datensatzstapel, den Sie aus dem Shard abrufen, benötigen Sie einen Shard-Iterator. Der Shard-Iterator wird im getRecordsRequest-Objekt verwendet, um den Shard anzugeben, aus dem die Datensätze abgerufen werden. Der Typ, der dem Shard-Iterator zugeordnet ist, gibt die Stelle im Shard an, von der die Datensätze abgerufen werden sollen (weitere Informationen dazu finden Sie später in diesem Abschnitt). Bevor Sie mit dem Shard-Iterator arbeiten können, müssen Sie den Shard abrufen. Weitere Informationen finden Sie unter Shards auflisten.

Rufen Sie diesen ersten Shard-Iterator mit der getShardIterator-Methode ab. Rufen Sie Shard-Iteratoren für weitere Datensatzstapel mit der getNextShardIterator -Methode des getRecordsResult-Objekts ab, das von der getRecords-Methode zurückgegeben wird. Ein Shard-Iterator verliert seine Gültigkeit nach 5 Minuten. Wenn Sie einen Shard-Iterator verwenden, solange er gültig ist, erhalten Sie einen neuen. Jeder Shard-Iterator ist 5 Minuten lang gültig, selbst wenn er bereits verwendet wurde.

Instanziieren Sie GetShardIteratorRequest, um den ersten Shard-Iterator abzurufen. Übergeben Sie ihn an die getShardIterator-Methode. Geben Sie zum Konfigurieren der Anforderung den Stream und die Shard-ID an. Informationen darüber, wie Sie die Streams in Ihrem AWS Konto abrufen können, finden Sie unter. Auflisten von Streams Informationen zum Abrufen der Shards in einem Stream finden Sie unter Shards auflisten.

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

Der Beispiel-Code gibt beim Abrufen des ersten Shard-Iterators TRIM_HORIZON als Iterator-Typ an. Dieser Iterator-Typ bedeutet, dass Datensätze zurückgegeben werden sollen. Und zwar beginnend mit dem ersten zum Shard hinzugefügten Datensatz und nicht mit dem letzten hinzugefügten Datensatz, auch als Spitze bezeichnet. Folgende Iterator-Typen werden unterstützt:

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

Weitere Informationen finden Sie unter ShardIteratorType.

Bei einigen Iterator-Typen müssen Sie zusätzlich eine Sequenznummer angeben, z. B.:

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

Nachdem Sie einen Datensatz mit getRecords abgerufen haben, erhalten Sie die Sequenznummer für den Datensatz, indem Sie die getSequenceNumber-Methode des Datensatzes aufrufen.

record.getSequenceNumber()

Darüber hinaus erhält der Code, durch den Datensätze zum Stream hinzufügt werden, die Sequenznummer für einen hinzugefügten Datensatz, indem getSequenceNumber auf dem Ergebnis von putRecord aufgerufen wird.

lastSequenceNumber = putRecordResult.getSequenceNumber();

Sie können mit diesen Sequenznummern eine strenge aufsteigende Anordnung der Datensätze gewährleisten. Weitere Informationen finden Sie im Code-Beispiel unter PutRecordBeispiel.

Benutzen GetRecords

Instanziieren Sie nach Abruf des Shard-Iterators ein GetRecordsRequest-Objekt. Geben Sie den Iterator für die Anforderung mit der setShardIterator-Methode an.

Optional können Sie auch die Anzahl der abzurufenden Datensätze mithilfe der setLimit-Methode angeben. Die Anzahl der Datensätze, die getRecords zurückgibt, ist stets gleich oder kleiner als dieses Limit. Wenn Sie kein Limit angeben, gibt getRecords 10 MB abgerufener Datensätze zurück. Beim unten stehenden Beispiel-Code wird das Limit auf 25 Datensätze festgelegt.

Wenn keine Datensätze zurückgegeben werden, bedeutet dies, dass derzeit keine Datensätze von diesem Shard für die vom Shard-Iterator angegebene Sequenznummer verfügbar sind. Wenn dies der Fall ist, sollte Ihre Anwendung so lange warten, wie dies für die Datenquellen des Streams angemessen ist. Versuchen Sie dann erneut mit dem Shard-Iterator, der vom vorherigen Aufruf von getRecords zurückgegeben wurde, Daten aus dem Shard abzurufen.

Übergeben Sie getRecordsRequest an die getRecords-Methode und erfassen Sie die zurückgegebenen Werte als getRecordsResult-Objekt. Rufen Sie die getRecords-Methode auf dem getRecordsResult-Objekt auf, um die Datensätze abzurufen.

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

Zur Vorbereitung auf einen weiteren Aufruf von getRecords rufen Sie den nächsten Shard-Iterator von getRecordsResult ab.

shardIterator = getRecordsResult.getNextShardIterator();

Die besten Ergebnisse erzielen Sie, wenn Sie mindestens 1 Sekunde (1.000 Millisekunden) zwischen den Aufrufen von getRecords warten, um ein Überschreiten des Limits für die Aufrufe von getRecords zu vermeiden.

try { Thread.sleep(1000); } catch (InterruptedException e) {}

In der Regel sollten Sie getRecords in einer Schleife aufrufen, auch wenn Sie einen einzelnen Datensatz in einem Testszenario abrufen. Ein einzelner Aufruf von getRecords gibt möglicherweise eine leere Datensatzliste zurück, auch wenn der Shard mehrere Datensätze mit höheren Sequenznummern enthält. In diesem Fall verweist der zurückgegebene NextShardIterator zusammen mit der leeren Datensatzliste auf eine höhere Sequenznummer im Shard. Nachfolgende Aufrufe von getRecords führen dann zu einem erfolgreichen Abruf. Das folgende Beispiel zeigt die Verwendung einer Schleife.

Beispiel: getRecords

Das folgende Codebeispiel spiegelt die getRecords-Tipps in diesem Abschnitt wieder, einschließlich der Aufrufe in Schleifen.

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

Bei der Nutzung der Kinesis Client Library müssen möglicherweise mehrere Aufrufe durchgeführt werden, ehe Daten zurückgegeben werden. Dieses Verhalten ist beabsichtigt und weist nicht auf ein Problem mit den KCL oder Ihren Daten hin.

Passen Sie sich an einen Reshard an

Wenn getRecordsResult.getNextShardIterator null zurückgibt, bedeutet dies, dass eine Aufteilung oder Zusammenführung des Shards stattgefunden hat, die diesen Shard betrifft. Dieser Shard befindet sich jetzt in einem CLOSED-Status und Sie haben alle verfügbaren Datensätze von diesem Shard gelesen.

In diesem Szenario können Sie getRecordsResult.childShards verwenden, um etwas über die neuen untergeordneten Shards des zu verarbeitenden Shards zu erfahren, die durch die Aufteilung oder Zusammenführung entstanden sind. Weitere Informationen finden Sie unter. ChildShard

Bei einer Teilung ist die parentShardId der beiden neuen Shards gleich der Shard-ID des zuvor verarbeiteten Shards. Der Wert von adjacentParentShardId für beide Shards ist null.

Bei einer Zusammenführung ist bei dem entstandenen einzelnen Datensatz die parentShardId identisch mit der Shard-ID eines übergeordneten Shards und die adjacentParentShardId ist gleich der Shard-ID des anderen übergeordneten Shards. Ihre Anwendung hat bereits alle Daten aus einem dieser Shards ausgelesen. Dies ist der Shard, für den getRecordsResult.getNextShardIterator null zurückgegeben hat. Wenn die Reihenfolge der Daten für Ihre Anwendung von Bedeutung ist, stellen Sie sicher, dass die Anwendung auch alle Daten des anderen übergeordneten Shards ausliest, ehe neue Daten aus dem durch die Zusammenführung entstandenen untergeordneten Shards ausgelesen werden.

Wenn Sie mehrere Prozessoren zum Abrufen von Daten aus dem Stream verwenden (beispielsweise einen Prozessor pro Shard), und es kommt zu einer Teilung oder Zusammenführung von Shards, sollten Sie die Anzahl der Prozessoren entsprechend anpassen.

Weitere Informationen zum Resharding, einschließlich einer Diskussion über Shard-Status, beispielsweise CLOSED finden Sie unter Einen Stream erneut teilen.