Kinesis Data Streams API를 사용하여 향상된 팬아웃 소비자 개발 - Amazon Kinesis Data Streams

Kinesis Data Streams API를 사용하여 향상된 팬아웃 소비자 개발

향상된 팬아웃은 소비자가 샤드당 1초에 최대 2MB의 전용 처리량으로 데이터 스트림으로부터 레코드를 수신할 수 있도록 하는 Amazon Kinesis Data Streams 기능입니다. 향상된 팬아웃을 사용하는 소비자는 스트림으로부터 데이터를 수신하는 다른 소비자와 경쟁할 필요가 없습니다. 자세한 내용은 전용 처리량으로 사용자 지정 소비자 개발(향상된 팬아웃) 단원을 참조하십시오.

API 작업을 사용하여 Kinesis Data Streams에서 향상된 팬아웃을 사용하는 소비자를 만들 수 있습니다.

Kinesis Data Streams API를 사용하여 향상된 팬아웃을 사용하는 소비자 등록
  1. RegisterStreamConsumer를 직접적으로 호출하여 애플리케이션을 향상된 팬아웃을 사용하는 소비자로 등록합니다. Kinesis Data Streams는 소비자를 위한 Amazon 리소스 이름(ARN)을 생성하고 이를 응답으로 반환합니다.

  2. 특정 샤드를 듣기 시작하려면 SubscribeToShard 호출에 소비자 ARN을 전달합니다. 그런 다음 Kinesis Data Streams는 HTTP/2 연결을 통해 SubscribeToShardEvent 유형의 이벤트 형식으로 해당 샤드의 레코드를 사용자에게 푸시하기 시작합니다. 이 연결은 최대 5분 동안 활성화됩니다. SubscribeToShard 호출에 의해 반환된 future가 정상적으로 또는 예외적으로 완료된 후에도 샤드에서 레코드를 계속 수신하려면 SubscribeToShard를 다시 직접적으로 호출합니다.

    참고

    또한 SubscribeToShard API는 현재 샤드의 끝에 도달하면 현재 샤드의 하위 샤드 목록을 반환합니다.

  3. 향상된 팬아웃을 사용하는 소비자의 등록을 해제하려면 DeregisterStreamConsumer를 호출합니다.

다음 코드는 소비자를 샤드에 등록하고, 등록을 정기적으로 갱신하고, 이벤트를 처리하는 방법을 보여 주는 예제입니다.

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

event.ContinuationSequenceNumbernull을 반환하는 경우 이는 이 샤드와 관련된 샤드 분할 또는 병합이 발생했음을 나타냅니다. 이 샤드는 현재 CLOSED 상태이며 이 샤드에서 사용 가능한 모든 데이터 레코드를 읽었습니다. 이 시나리오에서는 위의 예에 따라 event.childShards를 사용하여 분할 또는 병합으로 생성된 처리 중인 샤드의 새 하위 샤드에 대해 알아볼 수 있습니다. 자세한 내용은 ChildShard를 참조하세요.