

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Entwickeln Sie Produzenten mithilfe der Amazon Kinesis Data Streams Streams-API mit dem AWS SDK für Java
<a name="developing-producers-with-sdk"></a>

Sie können Producer mithilfe der Amazon Kinesis Data Streams Streams-API mit dem AWS SDK for Java entwickeln. Wenn Sie Kinesis Data Streams zum ersten Mal verwenden, sollten Sie sich zunächst mit den Konzepten und der Terminologie in [Was ist Amazon Kinesis Data Streams?](introduction.md) und [Verwenden Sie die AWS CLI , um Amazon Kinesis Data Streams Streams-Operationen durchzuführen](getting-started.md) vertraut machen.

In den Beispielen wird die [API für Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/APIReference/) behandelt und das [AWS -SDK for Java](https://aws.amazon.com/sdk-for-java/) eingesetzt, um Daten zu einem Stream hinzuzufügen. Für die meisten Anwendungsfälle sollten Sie jedoch die KPL-Bibliothek von Kinesis Data Streams verwenden. Weitere Informationen finden Sie unter [Entwickeln Sie Produzenten mithilfe der Amazon Kinesis Producer Library (KPL)](developing-producers-with-kpl.md).

Anhand des Java-Beispiel-Codes in diesem Abschnitt, der nach Operationstyp logisch unterteilt ist, wird gezeigt, wie Sie grundlegende API-Vorgänge mit Kinesis Data Streams durchführen. Diese Beispiele stellen keinen produktionsbereiten Code dar, d. h. es werden nicht alle möglichen Ausnahmen geprüft und es werden nicht alle möglichen Sicherheits- oder Leistungsüberlegungen berücksichtigt. Sie können zudem die [API für Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/APIReference/) mit anderen Programmiersprachen aufrufen. Weitere Informationen zu allen verfügbaren AWS SDKs Produkten finden Sie unter [Start Developing with Amazon Web Services](https://aws.amazon.com/developers/getting-started/).

Für jede Aufgabe gibt es Voraussetzungen. So können Sie beispielsweise erst dann Daten zu einem Stream hinzufügen, wenn Sie einen erstellt haben. Deshalb müssen Sie einen Client anlegen. Weitere Informationen finden Sie unter [Kinesis-Datenstreams erstellen und verwalten](working-with-streams.md).

**Topics**
+ [Daten zu einem Stream hinzufügen](#kinesis-using-sdk-java-add-data-to-stream)
+ [Interagieren Sie mit Daten mithilfe der AWS Glue Schema Registry](kinesis-integration-glue-schema-registry.md)

## Daten zu einem Stream hinzufügen
<a name="kinesis-using-sdk-java-add-data-to-stream"></a>

Sobald ein Stream eingerichtet wurde, können Sie Daten in Form von Datensätzen hinzufügen. Ein Datensatz ist eine Datenstruktur, die die zu verarbeitenden Daten in Form eines Daten-Blobs enthält. Nach dem Speichern der Daten im Datensatz prüft, interpretiert und ändert Kinesis Data Streams die Daten nicht. Jeder Datensatz verfügt zudem über eine zugeordnete Sequenznummer und einen Partitionsschlüssel.

Die API für Kinesis Data Streams unterstützt das Hinzufügen von Daten zu einem Stream mittels zweier Operationen: [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) und [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html). Die Operation `PutRecords` sendet per HTTP-Anforderung mehrere Datensätze an Ihren Stream und die Operation `PutRecord` sendet immer nur einen Datensatz, dabei ist für jeden Datensatz eine separate HTTP-Anforderung erforderlich. Für die meisten Anwendungen sollten Sie `PutRecords` bevorzugen, da diese Operation pro Datenproduzent einen höheren Durchsatz erzielt. Weitere Informationen zu diesen Operationen finden Sie unten in separaten Unterabschnitten.

**Topics**
+ [Fügen Sie mehrere Datensätze hinzu mit PutRecords](#kinesis-using-sdk-java-putrecords)
+ [Fügen Sie einen einzelnen Datensatz hinzu mit PutRecord](#kinesis-using-sdk-java-putrecord)

Beachten Sie Folgendes: Wenn Ihre Quellanwendung Daten mit der API für Kinesis Data Streams zum Stream hinzufügt, gibt es vermutlich eine oder mehrere Konsumentenanwendungen, die gleichzeitig Daten aus dem Stream verarbeiten. Weitere Informationen dazu, wie Konsumenten Daten über die API für Kinesis Data Streams abrufen, erhalten Sie unter [Daten aus einem Stream abrufen](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data).

**Wichtig**  
[Ändern Sie den Aufbewahrungszeitraum für Daten](kinesis-extended-retention.md)

### Fügen Sie mehrere Datensätze hinzu mit PutRecords
<a name="kinesis-using-sdk-java-putrecords"></a>

Die [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html)-Operation sendet in einer einzelnen Anforderung mehrere Datensätze an Kinesis Data Streams. Mit `PutRecords` erzielen Produzenten einen höheren Durchsatz, wenn sie Daten an ihren Kinesis-Datenstrom senden. Jede `PutRecords`-Anfrage kann bis zu 500 Datensätze unterstützen. Die maximale Größe jedes Datensatzes in der Anforderung beträgt 1 MB bis zu einem Limit von 5 MB für die gesamte Anforderung einschließlich Partitionsschlüsseln. Wie bei der unten beschriebenen einzelnen `PutRecord`-Operation nutzt `PutRecords` Sequenznummern und Partitionsschlüssel. Der `PutRecord`-Parameter `SequenceNumberForOrdering` ist jedoch nicht in einem `PutRecords`-Aufruf enthalten. Die `PutRecords`-Operation versucht, alle Datensätze in der natürlichen Reihenfolge der Anforderung zu verarbeiten. 

Jeder Datensatz hat eine eindeutige Sequenznummer. Die Sequenznummer wird von Kinesis Data Streams zugewiesen, nachdem Sie `client.putRecords` aufgerufen haben, um die Datensätze zum Stream hinzuzufügen. Sequenznummern für denselben Partitionsschlüssel steigen in der Regel im Laufe der Zeit; je länger die Zeitdauer zwischen `PutRecords`-Anforderungen ist, desto größer wird die Sequenznummer.

**Anmerkung**  
Sequenznummern können nicht als Index für Datensätze innerhalb desselben Streams verwendet werden. Nutzen Sie für die logische Unterteilung von Datensätzen Partitionsschlüssel oder erstellen Sie für jeden Datensatz einen separaten Stream.

Eine `PutRecords`-Anforderung kann Datensätze mit unterschiedlichen Partitionsschlüsseln enthalten. Der Anforderung gilt für einen Stream. Jede Anforderung kann eine beliebige Kombination aus Partitionsschlüsseln und Datenschlüsseln enthalten, sofern die Grenzwerte der Anforderung nicht überschritten werden. Anforderungen mit vielen verschiedenen Partitionsschlüsseln an Streams mit vielen verschiedenen Shards sind im Allgemeinen schneller als Anforderungen mit wenigen Partitionsschlüsseln an Streams mit nur wenigen Shards. Die Anzahl der Partitionsschlüssel sollte viel größer sein als die Anzahl der Shards, um Latenzzeiten zu verkürzen und einen maximalen Durchsatz zu erzielen.

#### PutRecords Beispiel
<a name="kinesis-using-sdk-java-putrecords-example"></a>

Der folgende Code erstellt 100 Datensätze mit sequenziellen Partitionsschlüsseln und übergibt diese an einen Stream namens `DataStream`. 

```
        AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        
        clientBuilder.setRegion(regionName);
        clientBuilder.setCredentials(credentialsProvider);
        clientBuilder.setClientConfiguration(config);
        
        AmazonKinesis kinesisClient = clientBuilder.build();
 
        PutRecordsRequest putRecordsRequest  = new PutRecordsRequest();
        putRecordsRequest.setStreamName(streamName);
        List <PutRecordsRequestEntry> putRecordsRequestEntryList  = new ArrayList<>(); 
        for (int i = 0; i < 100; i++) {
            PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes()));
            putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
            putRecordsRequestEntryList.add(putRecordsRequestEntry); 
        }

        putRecordsRequest.setRecords(putRecordsRequestEntryList);
        PutRecordsResult putRecordsResult  = kinesisClient.putRecords(putRecordsRequest);
        System.out.println("Put Result" + putRecordsResult);
```

Die `PutRecords`-Antwort umfasst ein Array von `Records`-Antworten. Jeder Eintrag im Antwort-Array korreliert direkt mit einem Eintrag im Anforderungs-Array und zwar in natürlicher Reihenfolge von oben nach unten wie in der Anforderung und der Antwort. Das `Records`-Antwort-Array enthält stets die gleiche Anzahl an Datensätzen wie das Anforderungs-Array.

#### Behandeln Sie Fehler bei der Verwendung PutRecords
<a name="kinesis-using-sdk-java-putrecords-handling-failures"></a>

Standardmäßig führt ein Fehler bei einzelnen Datensätzen innerhalb einer Anforderung nicht zur Beendigung der Verarbeitung nachfolgender Datensätze in einer `PutRecords`-Anforderung. Das bedeutet, dass ein `Records`-Antwort-Array sowohl erfolgreich verarbeitete als auch nicht erfolgreich verarbeitete Datensätze enthält. Sie müssen nicht erfolgreich verarbeitete Datensätze erkennen und in einen nachfolgenden Aufruf aufnehmen. 

Erfolgreich verarbeitete Datensätze enthalten `SequenceNumber`- und `ShardID`-Werte, nicht erfolgreiche verarbeitete Datensätze enthalten `ErrorCode`-und `ErrorMessage`-Werte. Der `ErrorCode`-Parameter gibt den Fehlertyp an. Folgende Werte sind möglich: `ProvisionedThroughputExceededException` oder `InternalFailure`. `ErrorMessage` enthält weitere Informationen zur `ProvisionedThroughputExceededException`-Ausnahme, einschließlich Konto-ID, Stream-Name und Shard-ID des gedrosselten Datensatzes. Im folgenden Beispiel umfasst eine `PutRecords`-Anforderung drei Datensätze. Der zweite Datensatz schlägt fehl und wird in der Antwort angegeben. 

**Example PutRecords Syntax anfordern**  

```
{
    "Records": [
        {
    	"Data": "XzxkYXRhPl8w",
	    "PartitionKey": "partitionKey1"
        },
        {
    	"Data": "AbceddeRFfg12asd",
	    "PartitionKey": "partitionKey1"	
        },
        {
    	"Data": "KFpcd98*7nd1",
	    "PartitionKey": "partitionKey3"
        }
    ],
    "StreamName": "myStream"
}
```

**Example PutRecords Syntax der Antwort**  

```
{
    "FailedRecordCount”: 1,
    "Records": [
        {
	    "SequenceNumber": "21269319989900637946712965403778482371",
	    "ShardId": "shardId-000000000001"

        },
        {
	    “ErrorCode":”ProvisionedThroughputExceededException”,
	    “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."

        },
        {
	    "SequenceNumber": "21269319989999637946712965403778482985",
	    "ShardId": "shardId-000000000002"
        }
    ]
}
```

Datensätze, die nicht erfolgreich verarbeitet wurden, können in nachfolgende `PutRecords`-Anforderungen aufgenommen werden. Überprüfen Sie zuerst den Parameter `FailedRecordCount` im `putRecordsResult`, um zu bestätigen, ob Datensätze in der Anforderung fehlgeschlagen sind. Wenn dies der Fall ist, darf jeder `putRecordsEntry`, dessen `ErrorCode` nicht `null` ist, nicht in der nächsten Anforderung hinzugefügt werden. Ein Beispiel für diese Art von Handler finden Sie in folgendem Code:

**Example PutRecords Fehlerhandler**  

```
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(myStreamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
for (int j = 0; j < 100; j++) {
    PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
    putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes()));
    putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j));
    putRecordsRequestEntryList.add(putRecordsRequestEntry);
}

putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);

while (putRecordsResult.getFailedRecordCount() > 0) {
    final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>();
    final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords();
    for (int i = 0; i < putRecordsResultEntryList.size(); i++) {
        final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i);
        final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i);
        if (putRecordsResultEntry.getErrorCode() != null) {
            failedRecordsList.add(putRecordRequestEntry);
        }
    }
    putRecordsRequestEntryList = failedRecordsList;
    putRecordsRequest.setRecords(putRecordsRequestEntryList);
    putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);
}
```

### Fügen Sie einen einzelnen Datensatz hinzu mit PutRecord
<a name="kinesis-using-sdk-java-putrecord"></a>

Jeder Aufruf von [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) wird auf einem einzelnen Datensatz ausgeführt. Bevorzugen Sie die `PutRecords`-Operation, die in [Fügen Sie mehrere Datensätze hinzu mit PutRecords](#kinesis-using-sdk-java-putrecords) beschrieben ist, es sei denn, Ihre Anwendung muss stets einzelne Datensätze per Anforderung senden oder es liegen andere Gründe vor, die gegen den Einsatz von `PutRecords` sprechen.

Jeder Datensatz hat eine eindeutige Sequenznummer. Die Sequenznummer wird von Kinesis Data Streams zugewiesen, nachdem Sie `client.putRecord` aufgerufen haben, um den Datensatz zum Stream hinzuzufügen. Sequenznummern für denselben Partitionsschlüssel steigen in der Regel im Laufe der Zeit; je länger die Zeitdauer zwischen `PutRecord`-Anforderungen ist, desto größer wird die Sequenznummer.

 Wenn nacheinander viele PUT-Operationen durchgeführt werden, steigen die zurückgegebenen Sequenznummern nicht zwangsläufig an, da Kinesis Data Streams diese als gleichzeitig interpretiert. Um steigende Sequenznummern für denselben Partitionsschlüssel sicherzustellen, sollten Sie den `SequenceNumberForOrdering`-Parameter so verwenden, wie es im [PutRecord Beispiel](#kinesis-using-sdk-java-putrecord-example)-Codebeispiel gezeigt wird. 

 Unabhängig davon, ob Sie `SequenceNumberForOrdering` verwenden, werden Datensätze, die Kinesis Data Streams über einen `GetRecords`-Aufruf empfängt, streng nach Sequenznummern sortiert. 

**Anmerkung**  
Sequenznummern können nicht als Index für Datensätze innerhalb desselben Streams verwendet werden. Nutzen Sie für die logische Unterteilung von Datensätzen Partitionsschlüssel oder erstellen Sie für jeden Datensatz einen separaten Stream.

Ein Partitionsschlüssel wird verwendet, um Daten in einem Stream zu gruppieren. Ein Datensatz wird einem Shard innerhalb des Streams basierend auf dessen Partitionsschlüssel zugewiesen. Insbesondere Kinesis Data Streams verwendet den Partitionsschlüssel als Eingabe für eine Hash-Funktion, die den Partitionsschlüssel (und die zugehörigen Daten) einem bestimmten Shard zuordnet.

 Aufgrund dieses Hashing-Mechanismus werden alle Datensätze mit demselben Partitionsschlüssel zum selben Shard im Stream zugeordnet. Wenn jedoch die Anzahl der Partitionsschlüssel die Anzahl der Shards übersteigt, enthalten einige Shards zwangsläufig Datensätze mit unterschiedlichen Partitionsschlüsseln. Vom Design-Standpunkt aus sollte die Anzahl der Shards (die über die `setShardCount`-Methode von `CreateStreamRequest` angegeben wird) wesentlich geringer sein als die Anzahl der eindeutigen Partitionsschlüssel und das Datenvolumen für einen einzelnen Partitionsschlüssel sollte wesentlich geringer sein, als die Shard-Kapazität, um sicherzustellen, dass alle Shards gut ausgelastet sind. 

#### PutRecord Beispiel
<a name="kinesis-using-sdk-java-putrecord-example"></a>

Der folgende Code erstellt 10 Datensätze, die auf zwei Partitionsschlüssel verteilt werden, und fügt diese einem Stream namens `myStreamName` hinzu.

```
for (int j = 0; j < 10; j++) 
{
  PutRecordRequest putRecordRequest = new PutRecordRequest();
  putRecordRequest.setStreamName( myStreamName );
  putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() ));
  putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 ));  
  putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord );
  PutRecordResult putRecordResult = client.putRecord( putRecordRequest );
  sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber();
}
```

Beim zuvor beschriebenen Codebeispiel wird `setSequenceNumberForOrdering` eingesetzt, um eine aufsteigende Reihenfolge innerhalb der einzelnen Partitionsschlüssel zu gewährleisten. Für eine effektive Verwendung des Parameters setzen Sie die `SequenceNumberForOrdering` des aktuellen Datensatzes (Datensatz *n*) auf die Sequenznummer des vorhergehenden Datensatzes (Datensatz *n-1*). Rufen Sie `getSequenceNumber` auf dem Ergebnis von `putRecord` auf, um die Sequenznummer des Datensatzes zu erhalten, der zum Stream hinzugefügt wurde.

Der `SequenceNumberForOrdering`-Parameter stellt strikt aufsteigende Sequenznummern für denselben Partitionsschlüssel sicher. `SequenceNumberForOrdering` stellt keine Datensatzsortierung bei mehreren Partitionsschlüsseln bereit. 

# Interagieren Sie mit Daten mithilfe der AWS Glue Schema Registry
<a name="kinesis-integration-glue-schema-registry"></a>

Sie können Ihre Kinesis-Datenströme in die AWS Glue Schema Registry integrieren. Mit der AWS Glue Schema Registry können Sie Schemas zentral erkennen, steuern und weiterentwickeln und gleichzeitig sicherstellen, dass die erstellten Daten kontinuierlich anhand eines registrierten Schemas validiert werden. Ein Schema definiert die Struktur und das Format eines Datensatzes. Ein Schema ist eine versionierte Spezifikation für zuverlässige Datenveröffentlichung, -nutzung oder -speicherung. Mit der AWS Glue Schema Registry können Sie die end-to-end Datenqualität und die Datenverwaltung in Ihren Streaming-Anwendungen verbessern. Weitere Informationen finden Sie unter [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Eine Möglichkeit, diese Integration einzurichten, sind die `PutRecords` im AWS Java-SDK APIs verfügbaren `PutRecord` Kinesis Data Streams. 

Detaillierte Anweisungen zur Einrichtung der Integration von Kinesis Data Streams mit der Schemaregistry mithilfe von PutRecords und PutRecord Kinesis Data Streams APIs finden Sie im Abschnitt „Interaktion mit Daten mithilfe der Kinesis Data Streams APIs“ unter [Anwendungsfall: Integration von Amazon Kinesis Data Streams mit der AWS](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) Glue Schema Registry.