Développez des consommateurs à débit partagé grâce au AWS SDK for Java - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Développez des consommateurs à débit partagé grâce au AWS SDK for Java

L'une des méthodes pour développer des clients Kinesis Data Streams personnalisés avec partage complet consiste à utiliser Amazon Kinesis Data APIs Streams avec le. AWS SDK for Java Cette section décrit l'utilisation des Kinesis Data APIs Streams avec AWS SDK for Java le. Vous pouvez appeler les Kinesis Data APIs Streams à l'aide d'autres langages de programmation. Pour plus d'informations sur toutes les options disponibles AWS SDKs, consultez Commencer à développer avec Amazon Web Services.

L'exemple de code Java présenté dans cette section montre comment effectuer des opérations Kinesis Data API Streams de base. Il est divisé logiquement par type d'opération. Ces exemples ne représentent pas du code destiné à la production. Ils ne recherchent pas toutes les exceptions possibles et ils ne tiennent pas compte de toutes les considérations possibles en matière de sécurité ou de performances.

Obtenir des données à partir d'un flux

Les Kinesis Data APIs Streams incluent getShardIterator les méthodes getRecords et que vous pouvez invoquer pour récupérer des enregistrements d'un flux de données. Il s'agit du modèle d'extraction où votre code extrait les enregistrements de données directement depuis les partitions du flux de données.

Important

Nous vous recommandons d'utiliser le support du processeur d'enregistrements fourni par KCL pour récupérer les enregistrements de vos flux de données. Il s'agit du modèle push, où vous implémentez le code qui traite les données. Il KCL récupère les enregistrements de données du flux de données et les transmet au code de votre application. En outre, il KCL fournit des fonctionnalités de basculement, de restauration et d'équilibrage de charge. Pour plus d'informations, consultez la section Développement de consommateurs personnalisés à l'aide KCL d'un débit partagé.

Toutefois, dans certains cas, vous préférerez peut-être utiliser les Kinesis Data APIs Streams. Par exemple, pour implémenter les outils personnalisés pour surveiller ou déboguer vos flux de données.

Important

Kinesis Data Streams prend en charge les modifications de la période de conservation des enregistrements de données de votre flux de données. Pour de plus amples informations, veuillez consulter Modifier la période de conservation des données.

Utiliser des itérateurs de partitions

Vous extrayez des enregistrements du flux à partir du flux pour chaque partition. Pour chaque partition et chaque lot d'enregistrements que vous extrayez de cette partition, vous devez obtenir un itérateur de partition. L'itérateur de partition est utilisé dans l'objet getRecordsRequest pour spécifier la partition à partir de laquelle les enregistrements sont extraits. Le type associé à l'itérateur de partition détermine le point de la partition à partir duquel les enregistrements doivent être extraits (voir plus loin dans cette section pour plus de détails). Avant de pouvoir utiliser l'itérateur de partition, vous devez récupérer la partition. Pour de plus amples informations, veuillez consulter Répertorier les fragments.

Obtenez l'itérateur de partition initial à l'aide de la méthode getShardIterator. Obtenez des itérateurs de partition pour les lots supplémentaires d'enregistrements à l'aide de la méthode getNextShardIterator de l'objet getRecordsResult renvoyé par la méthode getRecords. Un itérateur de partition est valide pendant 5 minutes. Si vous utilisez un itérateur de partition pendant qu'il est valide, vous en obtenez un nouveau. Chaque itérateur de partition reste valide 5 minutes, même après avoir été utilisé.

Pour obtenir l'itérateur de partition initial, instanciez GetShardIteratorRequest et passez-le à la méthode getShardIterator. Pour configurer la demande, spécifiez le flux et l'ID de partition. Pour plus d'informations sur la façon d'obtenir les streams de votre AWS compte, consultezAfficher la liste des flux. Pour plus d'informations sur la façon d'obtenir les partitions d'un flux, consultez Répertorier les fragments.

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

Cet exemple de code spécifie TRIM_HORIZON comme type d'itérateur lors de l'obtention de l'itérateur de partition initial. Ce type d'itérateur signifie que les enregistrements doivent être renvoyés en commençant par le premier enregistrement ajouté à la partition, et pas en commençant par l'enregistrement ajouté le plus récemment, également appelé l'extrémité. Les types d'itérateur possibles sont les suivants :

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

Pour plus d'informations, consultez ShardIteratorType.

Certains types d'itérateur nécessitent que vous spécifiiez un numéro de séquence en plus du type ; par exemple :

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

Après avoir obtenu un enregistrement à l'aide de getRecords, vous pouvez obtenir le numéro de séquence de l'enregistrement en appelant la méthode getSequenceNumber de l'enregistrement.

record.getSequenceNumber()

En outre, le code qui ajoute les enregistrements au flux de données peut obtenir le numéro de séquence d'un enregistrement ajouté en appelant getSequenceNumber sur le résultat de putRecord.

lastSequenceNumber = putRecordResult.getSequenceNumber();

Vous pouvez utiliser des numéros de séquence pour garantir un ordre croissant strict des enregistrements. Pour plus d'informations, consultez l'exemple de code dans PutRecordexemple.

Utiliser GetRecords

Après avoir obtenu l'itérateur de partition, instanciez un objet GetRecordsRequest. Spécifiez l'itérateur der la demande à l'aide de la méthode setShardIterator.

Le cas échéant, vous pouvez également définir le nombre d'enregistrements à extraire à l'aide de la méthode setLimit. Le nombre d'enregistrements renvoyés par getRecords est toujours égal ou inférieur à cette limite. Si vous ne spécifiez pas cette limite, getRecords renvoie 10 Mo d'enregistrements extraits. L'exemple de code ci-dessous définit cette limite à 25 enregistrements.

Si aucun enregistrements n'est renvoyé, cela signifie qu'aucun enregistrement de données n'est actuellement disponible à partir de cette partition au numéro de séquence référencé par l'itérateur de partition. Dans cette situation, votre application doit attendre pendant le laps de temps approprié pour les sources de données du flux. Essayez ensuite d'extraire les données de la partition à l'aide de l'itérateur de partition renvoyé par l'appel précédent de getRecords.

Passez l'objet getRecordsRequest à la méthode getRecords et capturez la valeur renvoyée en tant qu'objet getRecordsResult. Pour extraire les enregistrements de données, appelez la méthode getRecords sur l'objet getRecordsResult.

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

Pour vous préparer à un autre appel de getRecords, obtenez l'itérateur de partition suivant de getRecordsResult.

shardIterator = getRecordsResult.getNextShardIterator();

Pour obtenir les meilleurs résultats, attendez pendant au moins 1 seconde (1 000 millisecondes) entre les appels de getRecords, afin d'éviter de dépasser la limite de la fréquence getRecords.

try { Thread.sleep(1000); } catch (InterruptedException e) {}

En général, vous devez appeler getRecords en boucle, même si vous êtes en train d'extraire un seul enregistrement dans un scénario de test. Un seul appel de getRecords peut renvoyer une liste d'enregistrements vide, même si la partition contient plusieurs enregistrements à des numéros de séquence ultérieurs. Dans ce cas, l'objet NextShardIterator renvoyé avec la liste d'enregistrements vide fait référence à un numéro de séquence ultérieur de la partition, et les appels successifs de getRecords finissent par renvoyer les enregistrements. L'exemple suivant illustre l'utilisation d'une boucle.

Exemple : getRecords

L'exemple de code suivant reflète les conseils pour getRecords de cette section, notamment la réalisation d'appels en boucle.

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

Si vous utilisez la bibliothèque client Kinesis (KCL), elle peut effectuer plusieurs appels avant de renvoyer des données. Ce comportement est intentionnel et n'indique aucun problème lié aux données KCL ou à vos données.

S'adapter à une refonte

Si getRecordsResult.getNextShardIterator renvoie null, cela indique qu'une division ou une fusion de partition a eu lieu impliquant cette partition. Cette partition est désormais dans un état CLOSED, et vous avez lu tous les enregistrements de données disponibles à partir de cette partition.

Dans ce scénario, vous pouvez utiliser getRecordsResult.childShards pour en savoir plus sur les nouvelles partitions secondaires de la partition en cours de traitement qui ont été créées par le fractionnement ou la fusion. Pour plus d'informations, consultez ChildShard.

Dans le cas d'un fractionnement, les deux nouvelles partitions ont parentShardId égal à l'ID de la partition que vous avez traitée précédemment. La valeur de adjacentParentShardId pour ces deux partitions est null.

Dans le cas d'une fusion, la nouvelle partition créée par la fusion a parentShardId égal à l'ID de partition de l'une des partitions parent et adjacentParentShardId égal à l'ID de partition de l'autre partition parent. Votre application a déjà lu toutes les données à partir de l'une de ces partitions. Il s'agit de la partition pour laquelle getRecordsResult.getNextShardIterator a renvoyé null. Si l'ordre des données est important pour votre application, assurez-vous qu'elle lit aussi toutes les données de l'autre partition parente avant de lire les nouvelles données de la partition enfant créée par la fusion.

Si vous utilisez plusieurs processeurs pour extraire des données du flux (disons un processeur par partition) et qu'un fractionnement ou une fusion a lieu, augmentez ou réduisez le nombre de processeurs pour prendre en compte le nouveau nombre de partitions.

Pour plus d'informations sur le repartitionnement, plus une présentation des états des partitions, par exemple CLOSED, consultez Revisionner un stream.