Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Développement de producteurs utilisant l'API Amazon Kinesis Data Streams avec AWS SDK for Java
Vous pouvez développer des producteurs à l'aide de l'API Amazon Kinesis Data Streams avec le SDK AWS pour Java. Si vous ne connaissez pas Kinesis Data Streams, commencez par vous familiariser avec les concepts et la terminologie présentés dans Qu'est-ce qu'Amazon Kinesis Data Streams ? et Premiers pas avec Amazon Kinesis Data Streams.
Ces exemples traitent de l'API Kinesis Data Streams et utilisent le SDK AWS pour Java
L'exemple de code Java présenté dans ce chapitre montre comment effectuer les opérations de base d'API Kinesis Data Streams et est divisé logiquement par type d'opération. Ces exemples ne représentent pas du code prêt à la production, car ils ne recherchent pas toutes les exceptions possibles ou ne tiennent pas compte de toutes les considérations possibles en matière de sécurité ou de performances. En outre, vous pouvez appeler l'API Kinesis Data Streams à l'aide d'autres langages de programmation. Pour plus d'informations sur tous les kits de développement logiciel AWS disponibles, consultez la page Commencer à développer avec Amazon Web Services
Chaque tâche a des conditions préalables ; par exemple, vous ne pouvez pas ajouter de données à un flux tant que vous n'avez pas créé de flux, ce qui demande de créer un client. Pour de plus amples informations, veuillez consulter Création et gestion de flux.
Rubriques
Ajout de données à un flux
Une fois qu'un flux est créé, vous pouvez y ajouter des données sous forme d'enregistrements. Un enregistrement est une structure de données qui contient les données à traiter sous la forme d'un blob de données. Une fois que vous avez stocké les données dans l'enregistrement, Kinesis Data Streams n'inspecte pas, n'interprète pas ou ne modifie absolument pas les données. Chaque enregistrement a également un numéro de séquence et une clé de partition qui lui sont associés.
L'API Kinesis Data Streams comporte deux opérations différentes qui ajoutent des données à un flux, PutRecords
et PutRecord
. L'opération PutRecords
envoie plusieurs enregistrements à votre flux par demande HTTP et l'opération PutRecord
envoie des enregistrements à votre flux un à la fois (une demande HTTP distincte est nécessaire pour chaque enregistrement). Vous préférerez sans doute utiliser PutRecords
pour la plupart des applications, car cette opération permet d'atteindre un débit supérieur par application producteur. Pour plus d'informations sur chacune de ces opérations, consultez les sous-sections distinctes ci-dessous.
Rubriques
Étant donné que votre application source ajoute des données au flux à l'aide de l'API Kinesis Data Streams, n'oubliez jamais qu'une ou plusieurs applications consommateur traitent très probablement simultanément des données provenant du flux. Pour plus d'informations sur la façon dont les applications consommateur obtiennent les données à l'aide de l'API Kinesis Data Streams, consultez la page Extraction des données d'un flux.
Ajout de plusieurs enregistrements avec PutRecords
L'opération PutRecords
envoie plusieurs enregistrements à Kinesis Data Streams dans une seule demande. En utilisant PutRecords
, les applications producteur peuvent atteindre un débit supérieur lors de l'envoi de données à leur flux de données Kinesis. Chaque demande PutRecords
peut prendre en charge jusqu'à 500 enregistrements. Chaque enregistrement de la demande peut atteindre 1 Mo, jusqu'à une limite de 5 Mo pour l'ensemble de la demande, y compris les clés de partition. Comme avec la seule opération PutRecord
décrite ci-dessous, PutRecords
utilise des numéros de séquence et des clés de partition. Toutefois, le paramètre PutRecord
SequenceNumberForOrdering
n'est pas inclus dans un appel PutRecords
. L'opération PutRecords
tente de traiter tous les enregistrements dans l'ordre naturel de la demande.
Chaque enregistrement de données a un numéro de séquence unique. Le numéro de séquence est attribué par Kinesis Data Streams une fois que vous avez appelé client.putRecords
pour ajouter les enregistrements de données au flux. Les numéros de séquence correspondant à une même clé de partition deviennent généralement de plus en plus longs au fil du temps ; plus l'intervalle de temps entre chaque demande PutRecords
est élevé, plus les numéros de séquence sont longs.
Note
Les numéros de séquence ne peuvent pas servir d'index aux ensembles de données d'un même flux. Pour séparer logiquement les ensembles de données, utilisez des clés de partition ou créez un flux distinct pour chaque ensemble de données.
Une demande PutRecords
peut inclure des enregistrements ayant différentes clés de partition. La portée de la demande est un flux ; chaque demande peut inclure une combinaison de clés de partition et d'enregistrements allant jusqu'aux limites définies pour la demande. Les demandes effectuées avec de nombreuses clés de partition différentes pour des flux comportant de nombreuses partitions différentes sont généralement plus rapides que les demandes comportant un petit nombre de clés de partition pour un petit nombre de partitions. Le nombre de clés de partition doit être beaucoup plus grand que le nombre de partitions pour réduire la latence et optimiser le débit.
Exemple d'opération PutRecords
Le code suivant crée 100 enregistrements de données avec des clés de partition séquentielles et les place dans un flux appelé DataStream
.
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);
La réponse PutRecords
comprend un tableau de réponse Records
. Chaque enregistrement compris dans ce tableau de réponse correspond directement à un enregistrement dans le tableau de demande. Ces entrées sont classées dans l'ordre naturel, soit de haut en bas de la demande et de la réponse. Le tableau de réponse Records
comprend toujours le même nombre d'enregistrements que le tableau de demande.
Gestion des défaillances lors de l'utilisation de PutRecords
Par défaut, la défaillance d'enregistrements individuels dans une demande n'arrête pas le traitement des enregistrements suivants dans une demande PutRecords
. Cela signifie qu'un tableau Records
de réponse comprend à la fois des enregistrements traités avec succès et sans succès. Vous devez détecter les enregistrements traités sans succès et les inclure dans un appel ultérieur.
Les enregistrements qui ont réussi incluent les valeurs SequenceNumber
et ShardID
. Ceux qui ont échoué incluent les valeurs ErrorCode
et ErrorMessage
. Le paramètre ErrorCode
reflète le type d'erreur et peut avoir une des valeurs suivantes : ProvisionedThroughputExceededException
ou InternalFailure
. ErrorMessage
fournit des informations plus détaillées sur l'exception ProvisionedThroughputExceededException
, y compris l'ID de compte, le nom du flux et les ID de partition de l'enregistrement qui a été limité. L'exemple ci-dessous contient trois enregistrements dans une demande PutRecords
. Le second enregistrement échoue et est reflété dans la réponse.
Exemple Syntaxe de la demande PutRecords
{
"Records": [
{
"Data": "XzxkYXRhPl8w",
"PartitionKey": "partitionKey1"
},
{
"Data": "AbceddeRFfg12asd",
"PartitionKey": "partitionKey1"
},
{
"Data": "KFpcd98*7nd1",
"PartitionKey": "partitionKey3"
}
],
"StreamName": "myStream"
}
Exemple Syntaxe de la réponse PutRecords
{
"FailedRecordCount”: 1,
"Records": [
{
"SequenceNumber": "21269319989900637946712965403778482371",
"ShardId": "shardId-000000000001"
},
{
“ErrorCode":”ProvisionedThroughputExceededException”,
“ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."
},
{
"SequenceNumber": "21269319989999637946712965403778482985",
"ShardId": "shardId-000000000002"
}
]
}
Les enregistrements qui ont été traités sans succès peuvent être inclus dans des demandes PutRecords
ultérieures. Tout d'abord, vérifiez le paramètre FailedRecordCount
de putRecordsResult
afin de savoir si la demande comporte des enregistrements d'échecs. Dans ce cas, chaque putRecordsEntry
comportant un ErrorCode
qui n'est pas null
doit être ajouté à une demande ultérieure. Pour un exemple de ce type de gestionnaire, reportez-vous au code suivant.
Exemple Gestionnaire de défaillance PutRecords
PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }
Ajout d'un seul enregistrement avec PutRecord
Chaque appel de PutRecord
est exécuté sur un seul enregistrement. Préférez l'opération PutRecords
décrite dans Ajout de plusieurs enregistrements avec PutRecords, sauf si votre application a particulièrement besoin de toujours envoyer des enregistrements uniques par demande ou qu'une autre raison empêche l'utilisation de PutRecords
.
Chaque enregistrement de données a un numéro de séquence unique. Le numéro de séquence est attribué par Kinesis Data Streams une fois que vous avez appelé client.putRecord
pour ajouter l'enregistrement de données au flux. Les numéros de séquence correspondant à une même clé de partition deviennent généralement de plus en plus longs au fil du temps ; plus l'intervalle de temps entre chaque demande PutRecord
est élevé, plus les numéros de séquence sont longs.
Lorsque des opérations se succèdent rapidement, il n'est pas sûr que les numéros de séquence renvoyés augmentent, car ces opérations semblent surtout simultanées pour Kinesis Data Streams. Pour garantir une stricte augmentation des numéros de séquence pour la même clé de partition, utilisez le paramètre SequenceNumberForOrdering
, comme il est illustré dans l'exemple de code Exemple d'opération PutRecord.
Que vous utilisiez SequenceNumberForOrdering
ou non, les enregistrements que Kinesis Data Streams reçoit via un appel GetRecords
sont strictement classés par numéro de séquence.
Note
Les numéros de séquence ne peuvent pas servir d'index aux ensembles de données d'un même flux. Pour séparer logiquement les ensembles de données, utilisez des clés de partition ou créez un flux distinct pour chaque ensemble de données.
La clé de partition sert à regrouper les données dans le flux. Un enregistrement de données est attribué à une partition du flux suivant sa clé de partition. Plus précisément, Kinesis Data Streams utilise la clé de partition comme entrée d'une fonction de hachage qui mappe la clé de partition (et les données associées) à une partition spécifique.
Ce mécanisme de hachage a pour effet que tous les enregistrements de données ayant la même clé de partition sont mappés à la même partition du flux. Toutefois, si le nombre de clés de partition dépasse le nombre de partitions, certaines partitions contiennent nécessairement des enregistrements ayant des clés de partition différentes. Du point de vue de la conception, afin de garantir que toutes vos partitions sont bien utilisées, le nombre de partitions (spécifié par la méthode setShardCount
de CreateStreamRequest
) doit être nettement inférieur à celui des partitions uniques, et la quantité de données qui passe dans une clé de partition unique doit être nettement inférieure à la capacité de la partition.
Exemple d'opération PutRecord
Le code suivant crée dix enregistrements de données répartis entre deux clés de partition et les place dans un flux appelé myStreamName
.
for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }
L'exemple de code précédent utilise le paramètre setSequenceNumberForOrdering
pour garantir un ordre croissant dans chaque clé de partition. Pour utiliser ce paramètre efficacement, définissez le SequenceNumberForOrdering
de l'enregistrement en cours (enregistrement n) sur le numéro de séquence de l'enregistrement précédent (enregistrement n-1). Pour obtenir le numéro de séquence d'un enregistrement qui a été ajouté au flux, appelez getSequenceNumber
sur le résultat de putRecord
.
Le paramètre SequenceNumberForOrdering
garantit des numéros de séquence strictement croissants pour la même clé de partition. SequenceNumberForOrdering
ne propose pas l'ordre des enregistrements sur plusieurs clés de partition.
Interaction avec les données à l'aide du registre AWS Glue Schema
Vous pouvez intégrer vos flux de données Kinesis au registre de schémas AWS Glue. Le registre Glue Schema AWS vous permet de découvrir, de contrôler et de faire évoluer de manière centralisée les schémas, tout en garantissant que les données produites sont validées en permanence par un schéma enregistré. Un schéma définit la structure et le format d'un enregistrement de données. Un schéma est une spécification versionnée pour la publication, la consommation ou le stockage des données fiables. Le registre de schémas AWS vous permet d'améliorer la qualité des données de bout en bout et la gouvernance des données au sein de vos applications de streaming. Pour plus d'informations, consultez le registre AWS Glue Schema (français non garanti). L'un des moyens de configurer cette intégration consiste à utiliser les API PutRecords
et PutRecord
Kinesis Data Streams disponibles dans le SDK AWS Java.
Pour obtenir des instructions détaillées sur la configuration de l'intégration de Kinesis Data Streams à Schema Registry à l'aide des API PutRecords et PutRecord Kinesis Data Streams, consultez la section « Interaction avec les données à l'aide des API Kinesis Data Streams » dans Cas d'utilisation : Intégrer Amazon Kinesis Data Streams avec le registre AWS Glue Schema.