Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Sviluppa una maggiore fidelizzazione dei consumatori con il AWS SDK for Java
Il fan-out avanzato è una funzionalità del flusso di dati Amazon Kinesis che consente ai consumer di ricevere dati da un flusso di dati con velocità di trasmissione effettiva dedicata fino a 2 MiB di dati al secondo per partizione. Un'applicazione consumer che utilizza il fan-out avanzato non è in competizione con altre applicazioni che ricevono dati dal flusso. Per ulteriori informazioni, consulta Sviluppa consumatori con fan-out migliorati con un throughput dedicato.
Puoi utilizzare API le operazioni per creare un consumatore che utilizzi il fan-out avanzato in Kinesis Data Streams.
Per registrare un consumatore con un fan-out avanzato utilizzando Kinesis Data Streams API
-
Chiama RegisterStreamConsumerper registrare la tua applicazione come consumatore che utilizza un fan-out avanzato. Kinesis Data Streams genera un Amazon Resource ARN Name () per il consumatore e lo restituisce nella risposta.
-
Per iniziare ad ascoltare uno shard specifico, invia al consumatore una chiamata ARN a. SubscribeToShard Kinesis Data Streams inizia quindi a inviare all'utente i record da quello shard, sotto forma di eventi SubscribeToShardEventdi tipo su una connessione /2. HTTP La connessione rimane aperta per un massimo di 5 minuti. Chiama SubscribeToShardnuovamente se desideri continuare a ricevere i record dallo shard dopo
future
che la chiamata restituisce il termine normale o eccezionale. SubscribeToShardNota
SubscribeToShard
APIrestituisce anche l'elenco dei frammenti secondari dello shard corrente quando viene raggiunta la fine dello shard corrente. -
Per annullare la registrazione di un consumatore che utilizza il fan-out avanzato, chiama. DeregisterStreamConsumer
Il seguente codice è un esempio di come è possibile sottoscrivere l'applicazione consumer a uno shard, rinnovare la sottoscrizione periodicamente e gestire gli eventi.
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }
Se event.ContinuationSequenceNumber
restituisce null
, indica che si è verificata una divisione o un'unione della partizione che ha interessato questa partizione. Questa partizione si trova ora nello stato CLOSED
e hai letto tutti i record di dati disponibili da questa partizione. In questo scenario, è possibile utilizzare event.childShards
per conoscere le nuove partizioni secondarie della partizione in fase di elaborazione che sono state create dalla divisione o dall'unione. Per ulteriori informazioni, consulta. ChildShard