Sviluppa consumatori più entusiasti con Kinesis Data Streams API - Flusso di dati Amazon Kinesis

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Sviluppa consumatori più entusiasti con Kinesis Data Streams API

Il fan-out avanzato è una funzionalità del flusso di dati Amazon Kinesis che consente ai consumer di ricevere dati da un flusso di dati con velocità di trasmissione effettiva dedicata fino a 2 MiB di dati al secondo per partizione. Un'applicazione consumer che utilizza il fan-out avanzato non è in competizione con altre applicazioni che ricevono dati dal flusso. Per ulteriori informazioni, consulta Sviluppa consumatori personalizzati con throughput dedicato (fan-out migliorato).

Puoi utilizzare API le operazioni per creare un consumatore che utilizzi il fan-out avanzato in Kinesis Data Streams.

Per registrare un consumatore con un fan-out avanzato utilizzando Kinesis Data Streams API
  1. Chiama RegisterStreamConsumerper registrare la tua applicazione come consumatore che utilizza un fan-out avanzato. Kinesis Data Streams genera un Amazon Resource ARN Name () per il consumatore e lo restituisce nella risposta.

  2. Per iniziare ad ascoltare uno shard specifico, invia al consumatore una chiamata ARN a. SubscribeToShard Kinesis Data Streams inizia quindi a inviare all'utente i record da quello shard, sotto forma di eventi SubscribeToShardEventdi tipo su una connessione /2. HTTP La connessione rimane aperta per un massimo di 5 minuti. Chiama SubscribeToShardnuovamente se desideri continuare a ricevere i record dallo shard dopo future che la chiamata restituisce il termine normale o eccezionale. SubscribeToShard

    Nota

    SubscribeToShardAPIrestituisce anche l'elenco dei frammenti secondari dello shard corrente quando viene raggiunta la fine dello shard corrente.

  3. Per annullare la registrazione di un consumatore che utilizza il fan-out avanzato, chiama. DeregisterStreamConsumer

Il seguente codice è un esempio di come è possibile sottoscrivere l'applicazione consumer a uno shard, rinnovare la sottoscrizione periodicamente e gestire gli eventi.

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

Se event.ContinuationSequenceNumber restituisce null, indica che si è verificata una divisione o un'unione della partizione che ha interessato questa partizione. Questa partizione si trova ora nello stato CLOSED e hai letto tutti i record di dati disponibili da questa partizione. In questo scenario, è possibile utilizzare event.childShards per conoscere le nuove partizioni secondarie della partizione in fase di elaborazione che sono state create dalla divisione o dall'unione. Per ulteriori informazioni, vedere. ChildShard