Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Entwickeln Sie Verbraucher mit gemeinsamem Durchsatz mit dem AWS SDK for Java
Eine der Methoden zur Entwicklung benutzerdefinierter Kinesis Data Streams-Verbraucher, die durchgehend gemeinsam genutzt werden, ist die Verwendung von Amazon Kinesis Data Streams APIs mit dem. AWS SDK for Java In diesem Abschnitt wird die Verwendung der Kinesis Data Streams APIs mit dem AWS SDK for Java beschrieben. Sie können die Kinesis Data Streams APIs mit anderen Programmiersprachen aufrufen. Weitere Informationen zu allen verfügbaren AWS SDKs Produkten finden Sie unter Start Developing with Amazon Web Services
Der Java-Beispielcode in diesem Abschnitt zeigt, wie grundlegende Kinesis Data Streams API Streams-Operationen ausgeführt werden. Er ist logisch nach Operationstypen unterteilt. Diese Beispiele stellen keinen produktionsbereiten Code dar. Sie überprüfen nicht alle möglichen Ausnahmen und es werden nicht alle möglichen Sicherheits- oder Leistungsüberlegungen berücksichtigt.
Themen
Daten aus einem Stream abrufen
Die Kinesis Data Streams APIs umfassen die getRecords
Methoden getShardIterator
und, die Sie aufrufen können, um Datensätze aus einem Datenstream abzurufen. Dies ist das PULL-Modell, bei dem der Code Datensätze direkt aus den Shards des Datenstroms abruft.
Wichtig
Wir empfehlen, dass Sie die von angebotene Unterstützung für den Record Processor verwendenKCL, um Datensätze aus Ihren Datenströmen abzurufen. Dies ist das PUSH-Modell, bei dem Sie den Code implementieren, der die Daten verarbeitet. Der KCL ruft Datensätze aus dem Datenstrom ab und übermittelt sie an Ihren Anwendungscode. Darüber hinaus KCL bietet das Funktionen für Failover, Wiederherstellung und Lastenausgleich. Weitere Informationen finden Sie unter Entwickeln benutzerdefinierter Verbraucher mit gemeinsamem Durchsatz unter Verwendung von KCL.
In einigen Fällen ziehen Sie es jedoch möglicherweise vor, die Kinesis Data Streams APIs zu verwenden. Dies ist beispielsweise der Fall, wenn Sie benutzerdefinierte Tools für das Überwachen und Debuggen Ihrer Datenströme implementieren.
Wichtig
Kinesis Data Streams unterstützt Änderungen des Zeitraums der Datensatzaufbewahrung für einen Datenstrom. Weitere Informationen finden Sie unter Ändern Sie den Aufbewahrungszeitraum für Daten.
Verwenden Sie Shard-Iteratoren
Sie rufen Datensätze aus dem Stream pro Shard ab. Für jeden Shard und jeden Datensatzstapel, den Sie aus dem Shard abrufen, benötigen Sie einen Shard-Iterator. Der Shard-Iterator wird im getRecordsRequest
-Objekt verwendet, um den Shard anzugeben, aus dem die Datensätze abgerufen werden. Der Typ, der dem Shard-Iterator zugeordnet ist, gibt die Stelle im Shard an, von der die Datensätze abgerufen werden sollen (weitere Informationen dazu finden Sie später in diesem Abschnitt). Bevor Sie mit dem Shard-Iterator arbeiten können, müssen Sie den Shard abrufen. Weitere Informationen finden Sie unter Shards auflisten.
Rufen Sie diesen ersten Shard-Iterator mit der getShardIterator
-Methode ab. Rufen Sie Shard-Iteratoren für weitere Datensatzstapel mit der getNextShardIterator
-Methode des getRecordsResult
-Objekts ab, das von der getRecords
-Methode zurückgegeben wird. Ein Shard-Iterator verliert seine Gültigkeit nach 5 Minuten. Wenn Sie einen Shard-Iterator verwenden, solange er gültig ist, erhalten Sie einen neuen. Jeder Shard-Iterator ist 5 Minuten lang gültig, selbst wenn er bereits verwendet wurde.
Instanziieren Sie GetShardIteratorRequest
, um den ersten Shard-Iterator abzurufen. Übergeben Sie ihn an die getShardIterator
-Methode. Geben Sie zum Konfigurieren der Anforderung den Stream und die Shard-ID an. Informationen darüber, wie Sie die Streams in Ihrem AWS Konto abrufen können, finden Sie unter. Auflisten von Streams Informationen zum Abrufen der Shards in einem Stream finden Sie unter Shards auflisten.
String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();
Der Beispiel-Code gibt beim Abrufen des ersten Shard-Iterators TRIM_HORIZON
als Iterator-Typ an. Dieser Iterator-Typ bedeutet, dass Datensätze zurückgegeben werden sollen. Und zwar beginnend mit dem ersten zum Shard hinzugefügten Datensatz und nicht mit dem letzten hinzugefügten Datensatz, auch als Spitze bezeichnet. Folgende Iterator-Typen werden unterstützt:
-
AT_SEQUENCE_NUMBER
-
AFTER_SEQUENCE_NUMBER
-
AT_TIMESTAMP
-
TRIM_HORIZON
-
LATEST
Weitere Informationen finden Sie unter ShardIteratorType.
Bei einigen Iterator-Typen müssen Sie zusätzlich eine Sequenznummer angeben, z. B.:
getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);
Nachdem Sie einen Datensatz mit getRecords
abgerufen haben, erhalten Sie die Sequenznummer für den Datensatz, indem Sie die getSequenceNumber
-Methode des Datensatzes aufrufen.
record.getSequenceNumber()
Darüber hinaus erhält der Code, durch den Datensätze zum Stream hinzufügt werden, die Sequenznummer für einen hinzugefügten Datensatz, indem getSequenceNumber
auf dem Ergebnis von putRecord
aufgerufen wird.
lastSequenceNumber = putRecordResult.getSequenceNumber();
Sie können mit diesen Sequenznummern eine strenge aufsteigende Anordnung der Datensätze gewährleisten. Weitere Informationen finden Sie im Code-Beispiel unter PutRecordBeispiel.
Benutzen GetRecords
Instanziieren Sie nach Abruf des Shard-Iterators ein GetRecordsRequest
-Objekt. Geben Sie den Iterator für die Anforderung mit der setShardIterator
-Methode an.
Optional können Sie auch die Anzahl der abzurufenden Datensätze mithilfe der setLimit
-Methode angeben. Die Anzahl der Datensätze, die getRecords
zurückgibt, ist stets gleich oder kleiner als dieses Limit. Wenn Sie kein Limit angeben, gibt getRecords
10 MB abgerufener Datensätze zurück. Beim unten stehenden Beispiel-Code wird das Limit auf 25 Datensätze festgelegt.
Wenn keine Datensätze zurückgegeben werden, bedeutet dies, dass derzeit keine Datensätze von diesem Shard für die vom Shard-Iterator angegebene Sequenznummer verfügbar sind. Wenn dies der Fall ist, sollte Ihre Anwendung so lange warten, wie dies für die Datenquellen des Streams angemessen ist. Versuchen Sie dann erneut mit dem Shard-Iterator, der vom vorherigen Aufruf von getRecords
zurückgegeben wurde, Daten aus dem Shard abzurufen.
Übergeben Sie getRecordsRequest
an die getRecords
-Methode und erfassen Sie die zurückgegebenen Werte als getRecordsResult
-Objekt. Rufen Sie die getRecords
-Methode auf dem getRecordsResult
-Objekt auf, um die Datensätze abzurufen.
GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();
Zur Vorbereitung auf einen weiteren Aufruf von getRecords
rufen Sie den nächsten Shard-Iterator von getRecordsResult
ab.
shardIterator = getRecordsResult.getNextShardIterator();
Die besten Ergebnisse erzielen Sie, wenn Sie mindestens 1 Sekunde (1.000 Millisekunden) zwischen den Aufrufen von getRecords
warten, um ein Überschreiten des Limits für die Aufrufe von getRecords
zu vermeiden.
try { Thread.sleep(1000); } catch (InterruptedException e) {}
In der Regel sollten Sie getRecords
in einer Schleife aufrufen, auch wenn Sie einen einzelnen Datensatz in einem Testszenario abrufen. Ein einzelner Aufruf von getRecords
gibt möglicherweise eine leere Datensatzliste zurück, auch wenn der Shard mehrere Datensätze mit höheren Sequenznummern enthält. In diesem Fall verweist der zurückgegebene NextShardIterator
zusammen mit der leeren Datensatzliste auf eine höhere Sequenznummer im Shard. Nachfolgende Aufrufe von getRecords
führen dann zu einem erfolgreichen Abruf. Das folgende Beispiel zeigt die Verwendung einer Schleife.
Beispiel: getRecords
Das folgende Codebeispiel spiegelt die getRecords
-Tipps in diesem Abschnitt wieder, einschließlich der Aufrufe in Schleifen.
// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }
Bei der Nutzung der Kinesis Client Library müssen möglicherweise mehrere Aufrufe durchgeführt werden, ehe Daten zurückgegeben werden. Dieses Verhalten ist beabsichtigt und weist nicht auf ein Problem mit den KCL oder Ihren Daten hin.
Passen Sie sich an einen Reshard an
Wenn getRecordsResult.getNextShardIterator
null
zurückgibt, bedeutet dies, dass eine Aufteilung oder Zusammenführung des Shards stattgefunden hat, die diesen Shard betrifft. Dieser Shard befindet sich jetzt in einem CLOSED
-Status und Sie haben alle verfügbaren Datensätze von diesem Shard gelesen.
In diesem Szenario können Sie getRecordsResult.childShards
verwenden, um etwas über die neuen untergeordneten Shards des zu verarbeitenden Shards zu erfahren, die durch die Aufteilung oder Zusammenführung entstanden sind. Weitere Informationen finden Sie unter. ChildShard
Bei einer Teilung ist die parentShardId
der beiden neuen Shards gleich der Shard-ID des zuvor verarbeiteten Shards. Der Wert von adjacentParentShardId
für beide Shards ist null
.
Bei einer Zusammenführung ist bei dem entstandenen einzelnen Datensatz die parentShardId
identisch mit der Shard-ID eines übergeordneten Shards und die adjacentParentShardId
ist gleich der Shard-ID des anderen übergeordneten Shards. Ihre Anwendung hat bereits alle Daten aus einem dieser Shards ausgelesen. Dies ist der Shard, für den getRecordsResult.getNextShardIterator
null
zurückgegeben hat. Wenn die Reihenfolge der Daten für Ihre Anwendung von Bedeutung ist, stellen Sie sicher, dass die Anwendung auch alle Daten des anderen übergeordneten Shards ausliest, ehe neue Daten aus dem durch die Zusammenführung entstandenen untergeordneten Shards ausgelesen werden.
Wenn Sie mehrere Prozessoren zum Abrufen von Daten aus dem Stream verwenden (beispielsweise einen Prozessor pro Shard), und es kommt zu einer Teilung oder Zusammenführung von Shards, sollten Sie die Anzahl der Prozessoren entsprechend anpassen.
Weitere Informationen zum Resharding, einschließlich einer Diskussion über Shard-Status, beispielsweise CLOSED
finden Sie unter Einen Stream erneut teilen.