Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Desarrolle un mayor número de consumidores con más seguidores con el AWS SDK for Java
La distribución ramificada mejorada es una característica de Amazon Kinesis Data Streams que permite a los consumidores recibir registros de un flujo de datos con un rendimiento dedicado de hasta 2 MB de datos por segundo por partición. Un consumidor que utiliza la distribución ramificada mejorada no tiene que competir con otros consumidores que reciben datos de la secuencia. Para obtener más información, consulte Desarrolle consumidores con una distribución mejorada con un rendimiento dedicado.
Puede utilizar API las operaciones para crear un consumidor que utilice la distribución mejorada en Kinesis Data Streams.
Para registrar a un consumidor con una distribución mejorada mediante Kinesis Data Streams API
-
Llame RegisterStreamConsumerpara registrar su solicitud como consumidor que utiliza un sistema de distribución mejorado. Kinesis Data Streams genera un nombre de recurso de Amazon ARN () para el consumidor y lo devuelve en la respuesta.
-
Para empezar a escuchar un fragmento específico, envía una llamada al consumidor ARN a. SubscribeToShard A continuación, Kinesis Data Streams comienza a enviarle los registros desde ese fragmento, en forma de eventos de SubscribeToShardEventtipo a través de HTTP una conexión /2. La conexión permanece abierta durante un máximo de 5 minutos. SubscribeToShardVuelva a llamar si quiere seguir recibiendo los registros del fragmento una vez
future
que la llamada devuelva y se SubscribeToShardcomplete de forma normal o excepcional.nota
SubscribeToShard
APItambién devuelve la lista de los fragmentos secundarios del fragmento actual cuando se alcanza el final del fragmento actual. -
Para anular el registro de un consumidor que utiliza la función de distribución mejorada, llama al teléfono. DeregisterStreamConsumer
El código siguiente es un ejemplo de cómo suscribir el consumidor a un fragmento, renovar la suscripción de forma periódica y controlar los eventos.
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }
Si event.ContinuationSequenceNumber
devuelve null
, indica que se ha producido una división o combinación de una partición que ha implicado esta partición. Esta partición se encuentra ahora en un estado CLOSED
, y se han leído todos los registros de datos disponibles de esta partición. En este escenario, según el ejemplo anterior, puede utilizar event.childShards
para obtener información sobre las nuevas particiones secundarias de la partición que se procesa y que se crearon mediante la división o la combinación. Para obtener más información, consulte. ChildShard