Desenvolva consumidores expandidos aprimorados com o AWS SDK for Java - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Desenvolva consumidores expandidos aprimorados com o AWS SDK for Java

A distribuição avançada é um recurso do Amazon Kinesis Data Streams que permite que os consumidores recebam registros de um fluxo de dados com throughput dedicada de até 2 MB de dados por segundo por fragmento. Um consumidor que usa distribuição avançada não precisa lidar com outros consumidores que estejam recebendo dados do fluxo. Para obter mais informações, consulte Desenvolva consumidores de distribuição aprimorados com taxa de transferência dedicada.

Você pode usar API as operações para criar um consumidor que usa o fan-out aprimorado no Kinesis Data Streams.

Para registrar um consumidor com fan-out aprimorado usando o Kinesis Data Streams API
  1. Ligue RegisterStreamConsumerpara registrar seu aplicativo como um consumidor que usa fan-out aprimorado. O Kinesis Data Streams gera um Amazon Resource ARN Name () para o consumidor e o retorna na resposta.

  2. Para começar a ouvir um fragmento específico, encaminhe ao consumidor ARN uma ligação para SubscribeToShard. Em seguida, o Kinesis Data Streams começa a enviar os registros desse fragmento para você, na forma de eventos SubscribeToShardEventdo tipo em uma conexão /2. HTTP A conexão permanece aberta por até 5 minutos. Ligue SubscribeToShardnovamente se quiser continuar recebendo registros do fragmento após o future retorno da chamada para ser SubscribeToShardconcluído normalmente ou excepcionalmente.

    nota

    SubscribeToShardAPItambém retorna a lista dos fragmentos secundários do fragmento atual quando o final do fragmento atual é atingido.

  3. Para cancelar o registro de um consumidor que está usando o fan-out aprimorado, ligue. DeregisterStreamConsumer

O de código a seguir é um exemplo de como é possível inscrever o consumidor em um fragmento, renovar a assinatura periodicamente e manipular os eventos.

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

event.ContinuationSequenceNumber retorna null para indicar que o fragmento passou por uma divisão ou uma mesclagem. O fragmento agora está em estado de CLOSED, todos os registros de dados disponíveis nele foram lidos. Nesse cenário, como mostrado no exemplo acima, é possível usar event.childShards para conhecer os fragmentos filho que foram criados pela divisão ou mesclagem do fragmento sendo processado. Para obter mais informações, consulte ChildShard.