

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Entwickeln Sie Verbraucher mit dem AWS SDK für Java
<a name="develop-consumers-sdk"></a>

 Mithilfe der Amazon Kinesis Data Streams APIs können Sie benutzerdefinierte Verbraucher entwickeln. In diesem Abschnitt wird die Verwendung der Kinesis Data Streams APIs mit dem AWS SDK für Java beschrieben.

**Wichtig**  
Die empfohlene Methode zur Entwicklung benutzerdefinierter Verbraucher von Kinesis Data Streams mit durchgängiger gemeinsamer Nutzung ist die Verwendung der Kinesis Client Library (KCL). KCL unterstützt Sie bei der Nutzung und Verarbeitung von Daten aus einem Kinesis-Datenstrom, indem sie sich um viele der komplexen Aufgaben kümmert, die mit verteilter Datenverarbeitung verbunden sind. Weitere Informationen finden Sie unter [Entwickeln Sie Verbraucher mit KCL in Java](develop-kcl-consumers-java.md).

**Topics**
+ [Entwickeln Sie Verbraucher mit gemeinsamem Durchsatz mit dem AWS SDK für Java](developing-consumers-with-sdk.md)
+ [Entwickeln Sie mehr Fan-Out-Nutzer mit dem AWS SDK für Java](building-enhanced-consumers-api.md)
+ [Interagieren Sie mit Daten mithilfe der Schema Registry AWS Glue](building-enhanced-consumers-glue-schema-registry.md)

# Entwickeln Sie Verbraucher mit gemeinsamem Durchsatz mit dem AWS SDK für Java
<a name="developing-consumers-with-sdk"></a>

Eine der Methoden zur Entwicklung benutzerdefinierter Kinesis Data Streams-Verbraucher, die durchgehend gemeinsam genutzt werden, ist die Verwendung von Amazon Kinesis Data Streams APIs mit dem. AWS SDK für Java In diesem Abschnitt wird die Verwendung der Kinesis Data Streams APIs mit dem AWS SDK für Java beschrieben. Sie können die Kinesis Data Streams APIs mit anderen Programmiersprachen aufrufen. Weitere Informationen zu allen verfügbaren AWS SDKs Produkten finden Sie unter [Start Developing with Amazon Web Services](https://aws.amazon.com/developers/getting-started/). 

Der Java-Beispielcode in diesem Abschnitt zeigt, wie grundlegende Kinesis Data Streams Streams-API-Operationen ausgeführt werden. Er ist logisch nach Operationstypen unterteilt. Diese Beispiele stellen keinen produktionsbereiten Code dar. Sie überprüfen nicht alle möglichen Ausnahmen und es werden nicht alle möglichen Sicherheits- oder Leistungsüberlegungen berücksichtigt. 

**Topics**
+ [Daten aus einem Stream abrufen](#kinesis-using-sdk-java-get-data)
+ [Verwenden Sie Shard-Iteratoren](#kinesis-using-sdk-java-get-data-shard-iterators)
+ [Benutze GetRecords](#kinesis-using-sdk-java-get-data-getrecords)
+ [Passt euch an einen Reshard an](#kinesis-using-sdk-java-get-data-reshard)

## Daten aus einem Stream abrufen
<a name="kinesis-using-sdk-java-get-data"></a>

Die Kinesis Data Streams APIs umfassen die `getRecords` Methoden `getShardIterator` und, die Sie aufrufen können, um Datensätze aus einem Datenstream abzurufen. Dies ist das PULL-Modell, bei dem der Code Datensätze direkt aus den Shards des Datenstroms abruft.

**Wichtig**  
Wir empfehlen Ihnen, die von KCL bereitgestellte Unterstützung für Datensatzprozessoren zu verwenden, um Datensätze aus Ihren Datenströmen abzurufen. Dies ist das PUSH-Modell, bei dem Sie den Code implementieren, der die Daten verarbeitet. Die KCL ruft Datensätze aus dem Datenstrom ab und übermittelt diese an Ihren Anwendungscode. Darüber hinaus bietet die KCL Funktionen für einen Failover, eine Wiederherstellung und eine Lastverteilung. Weitere Informationen finden Sie unter [Entwickeln benutzerdefinierter Verbraucher mit gemeinsamem Durchsatz mithilfe von KCL](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html).

In einigen Fällen ziehen Sie es jedoch möglicherweise vor, die Kinesis Data Streams APIs zu verwenden. Dies ist beispielsweise der Fall, wenn Sie benutzerdefinierte Tools für das Überwachen und Debuggen Ihrer Datenströme implementieren.

**Wichtig**  
Kinesis Data Streams unterstützt Änderungen des Zeitraums der Datensatzaufbewahrung für einen Datenstrom. Weitere Informationen finden Sie unter [Ändern Sie den Aufbewahrungszeitraum für Daten](kinesis-extended-retention.md).

## Verwenden Sie Shard-Iteratoren
<a name="kinesis-using-sdk-java-get-data-shard-iterators"></a>

Sie rufen Datensätze aus dem Stream pro Shard ab. Für jeden Shard und jeden Datensatzstapel, den Sie aus dem Shard abrufen, benötigen Sie einen *Shard-Iterator*. Der Shard-Iterator wird im `getRecordsRequest`-Objekt verwendet, um den Shard anzugeben, aus dem die Datensätze abgerufen werden. Der Typ, der dem Shard-Iterator zugeordnet ist, gibt die Stelle im Shard an, von der die Datensätze abgerufen werden sollen (weitere Informationen dazu finden Sie später in diesem Abschnitt). Bevor Sie mit dem Shard-Iterator arbeiten können, müssen Sie den Shard abrufen. Weitere Informationen finden Sie unter [Shards auflisten](kinesis-using-sdk-java-list-shards.md).

Rufen Sie diesen ersten Shard-Iterator mit der `getShardIterator`-Methode ab. Rufen Sie Shard-Iteratoren für weitere Datensatzstapel mit der `getNextShardIterator` -Methode des `getRecordsResult`-Objekts ab, das von der `getRecords`-Methode zurückgegeben wird. Ein Shard-Iterator verliert seine Gültigkeit nach 5 Minuten. Wenn Sie einen Shard-Iterator verwenden, solange er gültig ist, erhalten Sie einen neuen. Jeder Shard-Iterator ist 5 Minuten lang gültig, selbst wenn er bereits verwendet wurde.

Instanziieren Sie `GetShardIteratorRequest`, um den ersten Shard-Iterator abzurufen. Übergeben Sie ihn an die `getShardIterator`-Methode. Geben Sie zum Konfigurieren der Anforderung den Stream und die Shard-ID an. Informationen darüber, wie Sie die Streams in Ihrem AWS Konto abrufen können, finden Sie unter. [Auflisten von Streams](kinesis-using-sdk-java-list-streams.md) Informationen zum Abrufen der Shards in einem Stream finden Sie unter [Shards auflisten](kinesis-using-sdk-java-list-shards.md).

```
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(myStreamName);
getShardIteratorRequest.setShardId(shard.getShardId());
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");

GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
```

Der Beispiel-Code gibt beim Abrufen des ersten Shard-Iterators `TRIM_HORIZON` als Iterator-Typ an. Dieser Iterator-Typ bedeutet, dass Datensätze zurückgegeben werden sollen. Und zwar beginnend mit dem ersten zum Shard hinzugefügten Datensatz und nicht mit dem letzten hinzugefügten Datensatz, auch als *Spitze* bezeichnet. Folgende Iterator-Typen werden unterstützt:
+ `AT_SEQUENCE_NUMBER`
+ `AFTER_SEQUENCE_NUMBER`
+ `AT_TIMESTAMP`
+ `TRIM_HORIZON`
+ `LATEST`

Weitere Informationen finden Sie unter [ShardIteratorType](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType).

Bei einigen Iterator-Typen müssen Sie zusätzlich eine Sequenznummer angeben, z. B.:

```
getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER");
getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);
```

Nachdem Sie einen Datensatz mit `getRecords` abgerufen haben, erhalten Sie die Sequenznummer für den Datensatz, indem Sie die `getSequenceNumber`-Methode des Datensatzes aufrufen. 

```
record.getSequenceNumber()
```

Darüber hinaus erhält der Code, durch den Datensätze zum Stream hinzufügt werden, die Sequenznummer für einen hinzugefügten Datensatz, indem `getSequenceNumber` auf dem Ergebnis von `putRecord` aufgerufen wird. 

```
lastSequenceNumber = putRecordResult.getSequenceNumber();
```

Sie können mit diesen Sequenznummern eine strenge aufsteigende Anordnung der Datensätze gewährleisten. Weitere Informationen finden Sie im Code-Beispiel unter [PutRecord Beispiel](developing-producers-with-sdk.md#kinesis-using-sdk-java-putrecord-example).

## Benutze GetRecords
<a name="kinesis-using-sdk-java-get-data-getrecords"></a>

Instanziieren Sie nach Abruf des Shard-Iterators ein `GetRecordsRequest`-Objekt. Geben Sie den Iterator für die Anforderung mit der `setShardIterator`-Methode an. 

Optional können Sie auch die Anzahl der abzurufenden Datensätze mithilfe der `setLimit`-Methode angeben. Die Anzahl der Datensätze, die `getRecords` zurückgibt, ist stets gleich oder kleiner als dieses Limit. Wenn Sie kein Limit angeben, gibt `getRecords` 10 MB abgerufener Datensätze zurück. Beim unten stehenden Beispiel-Code wird das Limit auf 25 Datensätze festgelegt.

Wenn keine Datensätze zurückgegeben werden, bedeutet dies, dass derzeit keine Datensätze von diesem Shard für die vom Shard-Iterator angegebene Sequenznummer verfügbar sind. Wenn dies der Fall ist, sollte Ihre Anwendung so lange warten, wie dies für die Datenquellen des Streams angemessen ist. Versuchen Sie dann erneut mit dem Shard-Iterator, der vom vorherigen Aufruf von `getRecords` zurückgegeben wurde, Daten aus dem Shard abzurufen. 

Übergeben Sie `getRecordsRequest` an die `getRecords`-Methode und erfassen Sie die zurückgegebenen Werte als `getRecordsResult`-Objekt. Rufen Sie die `getRecords`-Methode auf dem `getRecordsResult`-Objekt auf, um die Datensätze abzurufen. 

```
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(25);

GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
```

Zur Vorbereitung auf einen weiteren Aufruf von `getRecords` rufen Sie den nächsten Shard-Iterator von `getRecordsResult` ab. 

```
shardIterator = getRecordsResult.getNextShardIterator();
```

Die besten Ergebnisse erzielen Sie, wenn Sie mindestens 1 Sekunde (1.000 Millisekunden) zwischen den Aufrufen von `getRecords` warten, um ein Überschreiten des Limits für die Aufrufe von `getRecords` zu vermeiden. 

```
try {
  Thread.sleep(1000);
}
catch (InterruptedException e) {}
```

In der Regel sollten Sie `getRecords` in einer Schleife aufrufen, auch wenn Sie einen einzelnen Datensatz in einem Testszenario abrufen. Ein einzelner Aufruf von `getRecords` gibt möglicherweise eine leere Datensatzliste zurück, auch wenn der Shard mehrere Datensätze mit höheren Sequenznummern enthält. In diesem Fall verweist der zurückgegebene `NextShardIterator` zusammen mit der leeren Datensatzliste auf eine höhere Sequenznummer im Shard. Nachfolgende Aufrufe von `getRecords` führen dann zu einem erfolgreichen Abruf. Das folgende Beispiel zeigt die Verwendung einer Schleife.

**Beispiel: getRecords**  
Das folgende Codebeispiel spiegelt die `getRecords`-Tipps in diesem Abschnitt wieder, einschließlich der Aufrufe in Schleifen.

```
// Continuously read data records from a shard
List<Record> records;
    
while (true) {
   
  // Create a new getRecordsRequest with an existing shardIterator 
  // Set the maximum records to return to 25
  
  GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
  getRecordsRequest.setShardIterator(shardIterator);
  getRecordsRequest.setLimit(25); 

  GetRecordsResult result = client.getRecords(getRecordsRequest);
  
  // Put the result into record list. The result can be empty.
  records = result.getRecords();
  
  try {
    Thread.sleep(1000);
  } 
  catch (InterruptedException exception) {
    throw new RuntimeException(exception);
  }
  
  shardIterator = result.getNextShardIterator();
}
```

Bei der Nutzung der Kinesis Client Library müssen möglicherweise mehrere Aufrufe durchgeführt werden, ehe Daten zurückgegeben werden. Dieses Verhalten ist Design-bedingt und kein Fehler der KCL oder Ihrer Daten.

## Passt euch an einen Reshard an
<a name="kinesis-using-sdk-java-get-data-reshard"></a>

 Wenn `getRecordsResult.getNextShardIterator` `null` zurückgibt, bedeutet dies, dass eine Aufteilung oder Zusammenführung des Shards stattgefunden hat, die diesen Shard betrifft. Dieser Shard befindet sich jetzt in einem `CLOSED`-Status und Sie haben alle verfügbaren Datensätze von diesem Shard gelesen. 

 In diesem Szenario können Sie `getRecordsResult.childShards` verwenden, um etwas über die neuen untergeordneten Shards des zu verarbeitenden Shards zu erfahren, die durch die Aufteilung oder Zusammenführung entstanden sind. Weitere Informationen finden Sie unter [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).

 Bei einer Teilung ist die `parentShardId` der beiden neuen Shards gleich der Shard-ID des zuvor verarbeiteten Shards. Der Wert von `adjacentParentShardId` für beide Shards ist `null`. 

 Bei einer Zusammenführung ist bei dem entstandenen einzelnen Datensatz die `parentShardId` identisch mit der Shard-ID eines übergeordneten Shards und die `adjacentParentShardId` ist gleich der Shard-ID des anderen übergeordneten Shards. Ihre Anwendung hat bereits alle Daten aus einem dieser Shards ausgelesen. Dies ist der Shard, für den `getRecordsResult.getNextShardIterator` `null` zurückgegeben hat. Wenn die Reihenfolge der Daten für Ihre Anwendung von Bedeutung ist, stellen Sie sicher, dass die Anwendung auch alle Daten des anderen übergeordneten Shards ausliest, ehe neue Daten aus dem durch die Zusammenführung entstandenen untergeordneten Shards ausgelesen werden. 

 Wenn Sie mehrere Prozessoren zum Abrufen von Daten aus dem Stream verwenden (beispielsweise einen Prozessor pro Shard), und es kommt zu einer Teilung oder Zusammenführung von Shards, sollten Sie die Anzahl der Prozessoren entsprechend anpassen. 

 Weitere Informationen zum Resharding, einschließlich einer Diskussion über Shard-Status, beispielsweise `CLOSED` finden Sie unter [Einen Stream erneut teilen](kinesis-using-sdk-java-resharding.md). 

# Entwickeln Sie mehr Fan-Out-Nutzer mit dem AWS SDK für Java
<a name="building-enhanced-consumers-api"></a>

*Erweitertes Rundsenden* ist ein Feature in Amazon Kinesis Data Streams, die es Verbrauchern ermöglicht, Datensätze aus einem Datenstrom mit einem dedizierten Durchsatz von bis zu 2 MB Daten pro Sekunde pro Shard zu empfangen. Ein Verbraucher, der ein erweitertes Rundsenden verwendet, muss nicht mit anderen Verbrauchern konkurrieren, die Daten aus dem Stream empfangen. Weitere Informationen finden Sie unter [Entwickeln Sie verbesserte Fan-Out-Verbraucher mit dediziertem Durchsatz](enhanced-consumers.md).

Sie können API-Operationen zum Erstellen eines Verbrauchers in Kinesis Data Streams verwenden der erweitertes Rundsenden verwendet.

**Einen Verbraucher mit erweitertem Rundsenden unter Verwendung der API für Kinesis Data Streams registrieren**

1. Rufen Sie an [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html), um Ihre Anwendung als Endverbraucher zu registrieren, der den erweiterten Fan-Out verwendet. Kinesis Data Streams generiert einen Amazon-Ressourcennamen (ARN) für den Verbraucher und gibt ihn in der Antwort zurück.

1. Um mit dem Abhören eines bestimmten Shards zu beginnen, geben Sie den Kunden-ARN in einem Anruf an [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)weiter. Kinesis Data Streams beginnt dann, die Datensätze von diesem Shard in Form von Ereignissen des Typs [SubscribeToShardEvent](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html)über eine HTTP/2-Verbindung an Sie weiterzuleiten. Die Verbindung bleibt für bis zu 5 Minuten offen. Rufen Sie [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)erneut an, wenn Sie weiterhin Datensätze von dem Shard empfangen möchten`future`, nachdem der vom Anruf zurückgegebene Shard normal oder ausnahmsweise [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)abgeschlossen wurde.
**Anmerkung**  
Die `SubscribeToShard`-API gibt auch die Liste der untergeordneten Shards des aktuellen Shards zurück, wenn das Ende des aktuellen Shards erreicht ist. 

1. Rufen Sie an, um einen Verbraucher abzumelden, der den erweiterten Fan-Out verwendet. [DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)

Der folgende Code ist ein Beispiel dafür, wie Sie für Ihren Verbraucher ein Abonnement für einen Shard einrichten, das Abonnement regelmäßig erneuern und die Ereignisse verarbeiten können.

```
    import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
    import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
     
    import java.util.concurrent.CompletableFuture;
     
    /**
     * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java
     * for complete code and more examples.
     */
    public class SubscribeToShardSimpleImpl {
     
        private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737";
        private static final String SHARD_ID = "shardId-000000000000";
     
        public static void main(String[] args) {
     
            KinesisAsyncClient client = KinesisAsyncClient.create();
     
            SubscribeToShardRequest request = SubscribeToShardRequest.builder()
                    .consumerARN(CONSUMER_ARN)
                    .shardId(SHARD_ID)
                    .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
     
            // Call SubscribeToShard iteratively to renew the subscription periodically.
            while(true) {
                // Wait for the CompletableFuture to complete normally or exceptionally.
                callSubscribeToShardWithVisitor(client, request).join();
            }
     
            // Close the connection before exiting.
            // client.close();
        }
     
     
        /**
         * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface.
         */
        private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) {
            SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() {
                @Override
                public void visit(SubscribeToShardEvent event) {
                    System.out.println("Received subscribe to shard event " + event);
                }
            };
            SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                    .builder()
                    .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                    .subscriber(visitor)
                    .build();
            return client.subscribeToShard(request, responseHandler);
        }
    }
```

 Wenn `event.ContinuationSequenceNumber` `null` zurückgibt, bedeutet dies, dass eine Aufteilung oder Zusammenführung des Shards stattgefunden hat, die diesen Shard betrifft. Dieser Shard befindet sich jetzt in einem `CLOSED`-Status und Sie haben alle verfügbaren Datensätze von diesem Shard gelesen. In diesem Szenario können Sie, wie im obigen Beispiel, `event.childShards` verwenden, um etwas über die neuen untergeordneten Shards des zu verarbeitenden Shards zu erfahren, die durch die Aufteilung oder Zusammenführung entstanden sind. Weitere Informationen finden Sie unter [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).

# Interagieren Sie mit Daten mithilfe der Schema Registry AWS Glue
<a name="building-enhanced-consumers-glue-schema-registry"></a>

Sie können Ihre Kinesis-Datenströme in die AWS Glue Schema Registry integrieren. Mit der AWS Glue Schema Registry können Sie Schemas zentral erkennen, steuern und weiterentwickeln und gleichzeitig sicherstellen, dass die erstellten Daten kontinuierlich anhand eines registrierten Schemas validiert werden. Ein Schema definiert die Struktur und das Format eines Datensatzes. Ein Schema ist eine versionierte Spezifikation für zuverlässige Datenveröffentlichung, -nutzung oder -speicherung. Mit der AWS Glue Schema Registry können Sie die end-to-end Datenqualität und Datenverwaltung in Ihren Streaming-Anwendungen verbessern. Weitere Informationen finden Sie unter [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Eine Möglichkeit, diese Integration einzurichten, ist die `GetRecords` Kinesis Data Streams Streams-API, die im AWS Java SDK verfügbar ist. 

Detaillierte Anweisungen zur Einrichtung der Integration von Kinesis Data Streams mit Schema Registry mithilfe der `GetRecords` Kinesis Data Streams APIs finden Sie im Abschnitt „Interaktion mit Daten mithilfe der Kinesis Data Streams APIs“ unter [Anwendungsfall: Integration von Amazon Kinesis Data Streams mit der AWS](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) Glue Schema Registry.