기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
를 사용하여 향상된 팬아웃 소비자 개발 AWS SDK for Java
향상된 팬아웃은 소비자가 샤드당 1초에 최대 2MB의 전용 처리량으로 데이터 스트림으로부터 레코드를 수신할 수 있도록 하는 Amazon Kinesis Data Streams 기능입니다. 향상된 팬아웃을 사용하는 소비자는 스트림으로부터 데이터를 수신하는 다른 소비자와 경쟁할 필요가 없습니다. 자세한 내용은 전용 처리량으로 향상된 팬아웃 소비자 개발 단원을 참조하십시오.
API 작업을 사용하여 Kinesis Data Streams에서 향상된 팬아웃을 사용하는 소비자를 구축할 수 있습니다.
Kinesis Data Streams를 사용하여 향상된 팬아웃으로 소비자를 등록하려면 API
-
를 호출RegisterStreamConsumer하여 향상된 팬아웃을 사용하는 소비자로 애플리케이션을 등록합니다. Kinesis Data Streams는 소비자의 Amazon 리소스 이름(ARN)을 생성하고 응답으로 반환합니다.
-
특정 샤드를 들으려면 소비자를 ARN에 호출합니다SubscribeToShard. 그런 다음 Kinesis Data Streams는 HTTP/2 연결을 SubscribeToShardEvent 통해 유형의 이벤트 형태로 해당 샤드의 레코드를 사용자에게 푸시하기 시작합니다. 이 연결은 최대 5분 동안 활성화됩니다. 가 정상적으로 또는 예외적으로 SubscribeToShard 완료되도록 호출에서 반환
future
된 후 샤드에서 레코드를 계속 수신하려면 SubscribeToShard 다시 호출합니다.참고
SubscribeToShard
API 는 또한 현재 샤드의 끝에 도달했을 때 현재 샤드의 하위 샤드 목록을 반환합니다. -
향상된 팬아웃을 사용하는 소비자의 등록을 취소하려면 를 호출합니다DeregisterStreamConsumer.
다음 코드는 소비자를 샤드에 등록하고, 등록을 정기적으로 갱신하고, 이벤트를 처리하는 방법을 보여 주는 예제입니다.
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }
event.ContinuationSequenceNumber
가 null
을 반환하는 경우 이는 이 샤드와 관련된 샤드 분할 또는 병합이 발생했음을 나타냅니다. 이 샤드는 현재 CLOSED
상태이며 이 샤드에서 사용 가능한 모든 데이터 레코드를 읽었습니다. 이 시나리오에서는 위의 예에 따라 event.childShards
를 사용하여 분할 또는 병합으로 생성된 처리 중인 샤드의 새 하위 샤드에 대해 알아볼 수 있습니다. 자세한 내용은 ChildShard를 참조하세요.