Développement d'applications consommateur avec diffusion améliorée à l'aide de l'API Kinesis Data Streams - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Développement d'applications consommateur avec diffusion améliorée à l'aide de l'API Kinesis Data Streams

La diffusion améliorée est une fonction Amazon Kinesis Data Streams permettant aux applications consommateur de recevoir des enregistrements provenant d'un flux de données avec un débit dédié de jusqu'à 2 Mo de données par seconde par partition. Une application consommateur utilisant la diffusion améliorée n'a pas besoin de se heurter à d'autres applications consommateur qui reçoivent des données à partir du flux. Pour de plus amples informations, veuillez consulter Développement d'applications consommateur personnalisées avec un débit dédié (diffusion améliorée).

Vous pouvez utiliser des opérations d'API pour créer une application consommateur qui utilise la diffusion améliorée dans Kinesis Data Streams.

Inscrire une application consommateur avec la diffusion améliorée à l'aide de l'API Kinesis Data Streams
  1. Appelez RegisterStreamConsumer pour enregistrer votre application en tant qu'application consommateur utilisant la diffusion améliorée. Kinesis Data Streams génère un Amazon Resource Name (ARN) pour l'application consommateur et le renvoie dans la réponse.

  2. Pour commencer à écouter une partition spécifique, transmettez l'ARN de l'application consommateur dans un appel à SubscribeToShard. Kinesis Data Streams commence ensuite à pousser les enregistrements provenant de la partition vers vous, sous la forme d'événements de type SubscribeToShardEvent sur une connexion HTTP/2. La connexion demeure ouvert pour une durée maximum de 5 minutes. Appelez SubscribeToShard à nouveau si vous souhaitez continuer à recevoir les enregistrements de cette partition après le renvoi de future par l'appel à SubscribeToShard normalement ou exceptionnellement.

    Note

    L'API SubscribeToShard renvoie également la liste des fragments enfants de la partition actuelle lorsque la fin de la partition actuelle est atteinte.

  3. Pour annuler l'inscription d'une application consommateur qui utilise la diffusion améliorée, appelez l'élément DeregisterStreamConsumer.

Le code suivant est un exemple de la façon dont vous pouvez abonner votre application consommateur à une partition, renouveler l'abonnement périodiquement et gérer les événements.

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

Si event.ContinuationSequenceNumber renvoie null, cela indique qu'une division ou une fusion de partition a eu lieu impliquant cette partition. Cette partition est maintenant dans un état CLOSED, et vous avez lu tous les enregistrements de données disponibles à partir de cette partition. Dans ce scénario, comme indiqué ci-dessus, vous pouvez utiliser event.childShards pour en savoir plus sur les nouvelles partitions secondaires de la partition en cours de traitement qui ont été créées par la scission ou la fusion. Pour plus d'informations, consultez ChildShard (français non garanti).