

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Développez des producteurs à l'aide de l'API Amazon Kinesis Data Streams avec le AWS SDK pour Java
<a name="developing-producers-with-sdk"></a>

Vous pouvez développer des producteurs à l'aide de l'API Amazon Kinesis Data Streams associée AWS au SDK pour Java. Si vous ne connaissez pas Kinesis Data Streams, commencez par vous familiariser avec les concepts et la terminologie présentés dans [Qu'est-ce qu'Amazon Kinesis Data Streams ?](introduction.md) et [Utilisez le AWS CLI pour effectuer des opérations Amazon Kinesis Data Streams](getting-started.md).

Ces exemples traitent de l'[API Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/APIReference/) et utilisent le [SDK AWS pour Java](https://aws.amazon.com/sdk-for-java/) afin d'ajouter (mettre) des données dans un flux. Cependant, pour la plupart des cas d'utilisation, vous préférerez sans doute la bibliothèque Kinesis Data Streams. Pour de plus amples informations, veuillez consulter [Développez des producteurs à l'aide de la bibliothèque Amazon Kinesis Producer Library (KPL)](developing-producers-with-kpl.md).

L'exemple de code Java présenté dans ce chapitre montre comment effectuer les opérations de base d'API Kinesis Data Streams et est divisé logiquement par type d'opération. Ces exemples ne représentent pas du code prêt à la production, car ils ne recherchent pas toutes les exceptions possibles ou ne tiennent pas compte de toutes les considérations possibles en matière de sécurité ou de performances. En outre, vous pouvez appeler [l'API Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/APIReference/) à l'aide d'autres langages de programmation. Pour plus d'informations sur toutes les options disponibles AWS SDKs, consultez [Commencer à développer avec Amazon Web Services](https://aws.amazon.com/developers/getting-started/).

Chaque tâche a des conditions préalables ; par exemple, vous ne pouvez pas ajouter de données à un flux tant que vous n'avez pas créé de flux, ce qui demande de créer un client. Pour de plus amples informations, veuillez consulter [Création et gestion de flux de données Kinesis](working-with-streams.md).

**Topics**
+ [Ajouter des données à un flux](#kinesis-using-sdk-java-add-data-to-stream)
+ [Interagissez avec les données à l'aide du registre des AWS Glue schémas](kinesis-integration-glue-schema-registry.md)

## Ajouter des données à un flux
<a name="kinesis-using-sdk-java-add-data-to-stream"></a>

Une fois qu'un flux est créé, vous pouvez y ajouter des données sous forme d'enregistrements. Un enregistrement est une structure de données qui contient les données à traiter sous la forme d'un blob de données. Une fois que vous avez stocké les données dans l'enregistrement, Kinesis Data Streams n'inspecte pas, n'interprète pas ou ne modifie absolument pas les données. Chaque enregistrement a également un numéro de séquence et une clé de partition qui lui sont associés.

L'API Kinesis Data Streams comporte deux opérations différentes qui ajoutent des données à un flux, [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) et [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html). L'opération `PutRecords` envoie plusieurs enregistrements à votre flux par demande HTTP et l'opération `PutRecord` envoie des enregistrements à votre flux un à la fois (une demande HTTP distincte est nécessaire pour chaque enregistrement). Vous préférerez sans doute utiliser `PutRecords` pour la plupart des applications, car cette opération permet d'atteindre un débit supérieur par application producteur. Pour plus d'informations sur chacune de ces opérations, consultez les sous-sections distinctes ci-dessous.

**Topics**
+ [Ajoutez plusieurs enregistrements avec PutRecords](#kinesis-using-sdk-java-putrecords)
+ [Ajoutez un seul enregistrement avec PutRecord](#kinesis-using-sdk-java-putrecord)

Étant donné que votre application source ajoute des données au flux à l'aide de l'API Kinesis Data Streams, n'oubliez jamais qu'une ou plusieurs applications consommateur traitent très probablement simultanément des données provenant du flux. Pour plus d'informations sur la façon dont les applications consommateur obtiennent les données à l'aide de l'API Kinesis Data Streams, consultez la page [Obtenir des données à partir d'un flux](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data).

**Important**  
[Modifier la période de conservation des données](kinesis-extended-retention.md)

### Ajoutez plusieurs enregistrements avec PutRecords
<a name="kinesis-using-sdk-java-putrecords"></a>

L'opération [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) envoie plusieurs enregistrements à Kinesis Data Streams dans une seule demande. En utilisant `PutRecords`, les applications producteur peuvent atteindre un débit supérieur lors de l'envoi de données à leur flux de données Kinesis. Chaque demande `PutRecords` peut prendre en charge jusqu'à 500 enregistrements. Chaque enregistrement de la demande peut atteindre 1 Mo, jusqu'à une limite de 5 Mo pour l'ensemble de la demande, y compris les clés de partition. Comme avec la seule opération `PutRecord` décrite ci-dessous, `PutRecords` utilise des numéros de séquence et des clés de partition. Toutefois, le paramètre `PutRecord` `SequenceNumberForOrdering` n'est pas inclus dans un appel `PutRecords`. L'opération `PutRecords` tente de traiter tous les enregistrements dans l'ordre naturel de la demande. 

Chaque enregistrement de données a un numéro de séquence unique. Le numéro de séquence est attribué par Kinesis Data Streams une fois que vous avez appelé `client.putRecords` pour ajouter les enregistrements de données au flux. Les numéros de séquence correspondant à une même clé de partition deviennent généralement de plus en plus longs au fil du temps ; plus l'intervalle de temps entre chaque demande `PutRecords` est élevé, plus les numéros de séquence sont longs.

**Note**  
Les numéros de séquence ne peuvent pas servir d'index aux ensembles de données d'un même flux. Pour séparer logiquement les ensembles de données, utilisez des clés de partition ou créez un flux distinct pour chaque ensemble de données.

Une demande `PutRecords` peut inclure des enregistrements ayant différentes clés de partition. La portée de la demande est un flux ; chaque demande peut inclure une combinaison de clés de partition et d'enregistrements allant jusqu'aux limites définies pour la demande. Les demandes effectuées avec de nombreuses clés de partition différentes pour des flux comportant de nombreuses partitions différentes sont généralement plus rapides que les demandes comportant un petit nombre de clés de partition pour un petit nombre de partitions. Le nombre de clés de partition doit être beaucoup plus grand que le nombre de partitions pour réduire la latence et optimiser le débit.

#### PutRecords exemple
<a name="kinesis-using-sdk-java-putrecords-example"></a>

Le code suivant crée 100 enregistrements de données avec des clés de partition séquentielles et les place dans un flux appelé `DataStream`. 

```
        AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        
        clientBuilder.setRegion(regionName);
        clientBuilder.setCredentials(credentialsProvider);
        clientBuilder.setClientConfiguration(config);
        
        AmazonKinesis kinesisClient = clientBuilder.build();
 
        PutRecordsRequest putRecordsRequest  = new PutRecordsRequest();
        putRecordsRequest.setStreamName(streamName);
        List <PutRecordsRequestEntry> putRecordsRequestEntryList  = new ArrayList<>(); 
        for (int i = 0; i < 100; i++) {
            PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes()));
            putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
            putRecordsRequestEntryList.add(putRecordsRequestEntry); 
        }

        putRecordsRequest.setRecords(putRecordsRequestEntryList);
        PutRecordsResult putRecordsResult  = kinesisClient.putRecords(putRecordsRequest);
        System.out.println("Put Result" + putRecordsResult);
```

La réponse `PutRecords` comprend un tableau de réponse `Records`. Chaque enregistrement compris dans ce tableau de réponse correspond directement à un enregistrement dans le tableau de demande. Ces entrées sont classées dans l'ordre naturel, soit de haut en bas de la demande et de la réponse. Le tableau de réponse `Records` comprend toujours le même nombre d'enregistrements que le tableau de demande.

#### Gérez les défaillances lors de l'utilisation PutRecords
<a name="kinesis-using-sdk-java-putrecords-handling-failures"></a>

Par défaut, la défaillance d'enregistrements individuels dans une demande n'arrête pas le traitement des enregistrements suivants dans une demande `PutRecords`. Cela signifie qu'un tableau `Records` de réponse comprend à la fois des enregistrements traités avec succès et sans succès. Vous devez détecter les enregistrements traités sans succès et les inclure dans un appel ultérieur. 

Les enregistrements qui ont réussi incluent les valeurs `SequenceNumber` et `ShardID`. Ceux qui ont échoué incluent les valeurs `ErrorCode` et `ErrorMessage`. Le paramètre `ErrorCode` reflète le type d'erreur et peut avoir une des valeurs suivantes : `ProvisionedThroughputExceededException` ou `InternalFailure`. `ErrorMessage` fournit des informations plus détaillées sur l'exception `ProvisionedThroughputExceededException`, y compris l'ID de compte, le nom du flux et les ID de partition de l'enregistrement qui a été limité. L'exemple ci-dessous contient trois enregistrements dans une demande `PutRecords`. Le second enregistrement échoue et est reflété dans la réponse. 

**Example PutRecords Syntaxe de la demande**  

```
{
    "Records": [
        {
    	"Data": "XzxkYXRhPl8w",
	    "PartitionKey": "partitionKey1"
        },
        {
    	"Data": "AbceddeRFfg12asd",
	    "PartitionKey": "partitionKey1"	
        },
        {
    	"Data": "KFpcd98*7nd1",
	    "PartitionKey": "partitionKey3"
        }
    ],
    "StreamName": "myStream"
}
```

**Example PutRecords Syntaxe de réponse**  

```
{
    "FailedRecordCount”: 1,
    "Records": [
        {
	    "SequenceNumber": "21269319989900637946712965403778482371",
	    "ShardId": "shardId-000000000001"

        },
        {
	    “ErrorCode":”ProvisionedThroughputExceededException”,
	    “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."

        },
        {
	    "SequenceNumber": "21269319989999637946712965403778482985",
	    "ShardId": "shardId-000000000002"
        }
    ]
}
```

Les enregistrements qui ont été traités sans succès peuvent être inclus dans des demandes `PutRecords` ultérieures. Tout d'abord, vérifiez le paramètre `FailedRecordCount` de `putRecordsResult` afin de savoir si la demande comporte des enregistrements d'échecs. Dans ce cas, chaque `putRecordsEntry` comportant un `ErrorCode` qui n'est pas `null` doit être ajouté à une demande ultérieure. Pour un exemple de ce type de gestionnaire, reportez-vous au code suivant.

**Example PutRecords gestionnaire de défaillances**  

```
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(myStreamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
for (int j = 0; j < 100; j++) {
    PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
    putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes()));
    putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j));
    putRecordsRequestEntryList.add(putRecordsRequestEntry);
}

putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);

while (putRecordsResult.getFailedRecordCount() > 0) {
    final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>();
    final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords();
    for (int i = 0; i < putRecordsResultEntryList.size(); i++) {
        final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i);
        final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i);
        if (putRecordsResultEntry.getErrorCode() != null) {
            failedRecordsList.add(putRecordRequestEntry);
        }
    }
    putRecordsRequestEntryList = failedRecordsList;
    putRecordsRequest.setRecords(putRecordsRequestEntryList);
    putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);
}
```

### Ajoutez un seul enregistrement avec PutRecord
<a name="kinesis-using-sdk-java-putrecord"></a>

Chaque appel de [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) est exécuté sur un seul enregistrement. Préférez l'opération `PutRecords` décrite dans [Ajoutez plusieurs enregistrements avec PutRecords](#kinesis-using-sdk-java-putrecords), sauf si votre application a particulièrement besoin de toujours envoyer des enregistrements uniques par demande ou qu'une autre raison empêche l'utilisation de `PutRecords`.

Chaque enregistrement de données a un numéro de séquence unique. Le numéro de séquence est attribué par Kinesis Data Streams une fois que vous avez appelé `client.putRecord` pour ajouter l'enregistrement de données au flux. Les numéros de séquence correspondant à une même clé de partition deviennent généralement de plus en plus longs au fil du temps ; plus l'intervalle de temps entre chaque demande `PutRecord` est élevé, plus les numéros de séquence sont longs.

 Lorsque des opérations se succèdent rapidement, il n'est pas sûr que les numéros de séquence renvoyés augmentent, car ces opérations semblent surtout simultanées pour Kinesis Data Streams. Pour garantir une stricte augmentation des numéros de séquence pour la même clé de partition, utilisez le paramètre `SequenceNumberForOrdering`, comme il est illustré dans l'exemple de code [PutRecord exemple](#kinesis-using-sdk-java-putrecord-example). 

 Que vous utilisiez `SequenceNumberForOrdering` ou non, les enregistrements que Kinesis Data Streams reçoit via un appel `GetRecords` sont strictement classés par numéro de séquence. 

**Note**  
Les numéros de séquence ne peuvent pas servir d'index aux ensembles de données d'un même flux. Pour séparer logiquement les ensembles de données, utilisez des clés de partition ou créez un flux distinct pour chaque ensemble de données.

La clé de partition sert à regrouper les données dans le flux. Un enregistrement de données est attribué à une partition du flux suivant sa clé de partition. Plus précisément, Kinesis Data Streams utilise la clé de partition comme entrée d'une fonction de hachage qui mappe la clé de partition (et les données associées) à une partition spécifique.

 Ce mécanisme de hachage a pour effet que tous les enregistrements de données ayant la même clé de partition sont mappés à la même partition du flux. Toutefois, si le nombre de clés de partition dépasse le nombre de partitions, certaines partitions contiennent nécessairement des enregistrements ayant des clés de partition différentes. Du point de vue de la conception, afin de garantir que toutes vos partitions sont bien utilisées, le nombre de partitions (spécifié par la méthode `setShardCount` de `CreateStreamRequest`) doit être nettement inférieur à celui des partitions uniques, et la quantité de données qui passe dans une clé de partition unique doit être nettement inférieure à la capacité de la partition. 

#### PutRecord exemple
<a name="kinesis-using-sdk-java-putrecord-example"></a>

Le code suivant crée dix enregistrements de données répartis entre deux clés de partition et les place dans un flux appelé `myStreamName`.

```
for (int j = 0; j < 10; j++) 
{
  PutRecordRequest putRecordRequest = new PutRecordRequest();
  putRecordRequest.setStreamName( myStreamName );
  putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() ));
  putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 ));  
  putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord );
  PutRecordResult putRecordResult = client.putRecord( putRecordRequest );
  sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber();
}
```

L'exemple de code précédent utilise le paramètre `setSequenceNumberForOrdering` pour garantir un ordre croissant dans chaque clé de partition. Pour utiliser ce paramètre efficacement, définissez le `SequenceNumberForOrdering` de l'enregistrement en cours (enregistrement *n*) sur le numéro de séquence de l'enregistrement précédent (enregistrement *n-1*). Pour obtenir le numéro de séquence d'un enregistrement qui a été ajouté au flux, appelez `getSequenceNumber` sur le résultat de `putRecord`.

Le paramètre `SequenceNumberForOrdering` garantit des numéros de séquence strictement croissants pour la même clé de partition. `SequenceNumberForOrdering` ne propose pas l'ordre des enregistrements sur plusieurs clés de partition. 

# Interagissez avec les données à l'aide du registre des AWS Glue schémas
<a name="kinesis-integration-glue-schema-registry"></a>

Vous pouvez intégrer vos flux de données Kinesis au registre des AWS Glue schémas. Le registre des AWS Glue schémas vous permet de découvrir, de contrôler et de faire évoluer les schémas de manière centralisée, tout en garantissant que les données produites sont validées en permanence par un schéma enregistré. Un schéma définit la structure et le format d'un enregistrement de données. Un schéma est une spécification versionnée pour la publication, la consommation ou le stockage des données fiables. Le registre des AWS Glue schémas vous permet d'améliorer la qualité end-to-end des données et la gouvernance des données au sein de vos applications de streaming. Pour plus d'informations, consultez le registre [AWS Glue Schema](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) (français non garanti). L'un des moyens de configurer cette intégration consiste à utiliser le `PutRecords` kit `PutRecord` Kinesis Data APIs Streams disponible dans AWS le SDK Java. 

Pour obtenir des instructions détaillées sur la façon de configurer l'intégration de Kinesis Data Streams au registre des schémas à l'aide PutRecords de Kinesis Data Streams PutRecord et de Kinesis Data APIs Streams, consultez la [section « Interaction avec les données à l'aide des Kinesis APIs Data Streams » dans Use Case : Integrating Amazon Kinesis Data Streams](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) with the Glue Schema Registry. AWS 