기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
구독 Amazon Kinesis Data Streams
다음 예제에서는 subscribeToShard
메서드를 사용하여 Amazon Kinesis Data Streams에서 데이터를 검색하고 처리하는 방법을 보여줍니다. Kinesis Data Streams 이제 는 향상된 팬아웃 기능과 지연 시간이 짧은 HTTP/2 데이터 검색 을 사용하여 개발자가 동일한 Kinesis 데이터 스트림에서 지연 시간이 짧은 고성능 애플리케이션을 여러 개 더 쉽게 실행할 수 API있도록 합니다.
설정
먼저 비동기 Kinesis 클라이언트와 SubscribeToShardRequest
가져오기
import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
코드
Region region = Region.US_EAST_1; KinesisAsyncClient client = KinesisAsyncClient.builder() .region(region) .build(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId("arn:aws:kinesis:us-east-1:111122223333:stream/StockTradeStream") .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
빌더 인터페이스 사용
builder
메서드를 사용하여 의 생성을 단순화할 수 있습니다SubscribeToShardResponseHandler
작성기를 사용하여 전체 인터페이스를 구현하는 대신 메서드 호출을 통해 각 수명 주기 콜백을 설정할 수 있습니다.
코드
private static CompletableFuture<Void> responseHandlerBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .onComplete(() -> System.out.println("All records stream successfully")) // Must supply some type of subscriber .subscriber(e -> System.out.println("Received event - " + e)) .build(); return client.subscribeToShard(request, responseHandler); }
게시자의 더욱 많은 제어를 위해 publisherTransformer
메서드를 사용하여 게시자를 사용자 지정할 수 있습니다.
코드
private static CompletableFuture<Void> responseHandlerBuilderPublisherTransformer(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .publisherTransformer(p -> p.filter(e -> e instanceof SubscribeToShardEvent).limit(100)) .subscriber(e -> System.out.println("Received event - " + e)) .build(); return client.subscribeToShard(request, responseHandler); }
의 전체 예제
사용자 지정 응답 핸들러 사용
구독자와 게시자의 완전한 제어를 위해 SubscribeToShardResponseHandler
인터페이스를 구현합니다.
이 예제에서는 onEventStream
메서드를 구현하고, 이는 게시자에 대한 모든 액세스를 허용합니다. 게시자를 구독자가 출력할 이벤트 레코드로 전환하는 방법을 보여줍니다.
코드
private static CompletableFuture<Void> responseHandlerBuilderClassic(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = new SubscribeToShardResponseHandler() { @Override public void responseReceived(SubscribeToShardResponse response) { System.out.println("Receieved initial response"); } @Override public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) { publisher // Filter to only SubscribeToShardEvents .filter(SubscribeToShardEvent.class) // Flat map into a publisher of just records .flatMapIterable(SubscribeToShardEvent::records) // Limit to 1000 total records .limit(1000) // Batch records into lists of 25 .buffer(25) // Print out each record batch .subscribe(batch -> System.out.println("Record Batch - " + batch)); } @Override public void complete() { System.out.println("All records stream successfully"); } @Override public void exceptionOccurred(Throwable throwable) { System.err.println("Error during stream - " + throwable.getMessage()); } }; return client.subscribeToShard(request, responseHandler); }
의 전체 예제
방문자 인터페이스 사용
Visitor
코드
private static CompletableFuture<Void> responseHandlerBuilderVisitorBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = SubscribeToShardResponseHandler.Visitor .builder() .onSubscribeToShardEvent(e -> System.out.println("Received subscribe to shard event " + e)) .build(); SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); }
의 전체 예제
사용자 지정 구독자 사용
사용자 지정 구독자를 구현하여 스트림을 구독할 수도 있습니다.
이 코드 조각은 예제 구독자를 보여줍니다.
코드
private static class MySubscriber implements Subscriber<SubscribeToShardEventStream> { private Subscription subscription; private AtomicInteger eventCount = new AtomicInteger(0); @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; this.subscription.request(1); } @Override public void onNext(SubscribeToShardEventStream shardSubscriptionEventStream) { System.out.println("Received event " + shardSubscriptionEventStream); if (eventCount.incrementAndGet() >= 100) { // You can cancel the subscription at any time if you wish to stop receiving events. subscription.cancel(); } subscription.request(1); } @Override public void onError(Throwable throwable) { System.err.println("Error occurred while stream - " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("Finished streaming all events"); } }
다음 코드 스니펫과 같이 사용자 지정 구독자를 subscribe
메서드에 전달할 수 있습니다.
코드
private static CompletableFuture<Void> responseHandlerBuilderSubscriber(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(MySubscriber::new) .build(); return client.subscribeToShard(request, responseHandler); }
의 전체 예제
데이터 스트림에 Kinesis 데이터 레코드 쓰기
KinesisClientputRecords
메서드를 사용하여 데이터 스트림에 Kinesis 데이터 레코드를 쓸 수 있습니다. 이 메서드를 성공적으로 호출하려면 PutRecordsRequeststreamName
메서드에 전달합니다. 또한 다음 코드 예제와 같이 putRecords
메서드를 사용하여 데이터를 전달해야 합니다.
가져오기
import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
다음 Java 코드 예제에서는 StockTrade 객체가 데이터 스트림에 쓰는 Kinesis 데이터로 사용된다는 점에 유의하세요. 이 예제를 실행하기 전에 데이터 스트림이 생성되었는지 확인합니다.
코드
import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class StockTradesWriter { public static void main(String[] args) { final String usage = """ Usage: <streamName> Where: streamName - The Amazon Kinesis data stream to which records are written (for example, StockTradeStream) """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String streamName = args[0]; Region region = Region.US_EAST_1; KinesisClient kinesisClient = KinesisClient.builder() .region(region) .build(); // Ensure that the Kinesis Stream is valid. validateStream(kinesisClient, streamName); setStockData(kinesisClient, streamName); kinesisClient.close(); } public static void setStockData(KinesisClient kinesisClient, String streamName) { try { // Repeatedly send stock trades with a 100 milliseconds wait in between. StockTradeGenerator stockTradeGenerator = new StockTradeGenerator(); // Put in 50 Records for this example. int index = 50; for (int x = 0; x < index; x++) { StockTrade trade = stockTradeGenerator.getRandomTrade(); sendStockTrade(trade, kinesisClient, streamName); Thread.sleep(100); } } catch (KinesisException | InterruptedException e) { System.err.println(e.getMessage()); System.exit(1); } System.out.println("Done"); } private static void sendStockTrade(StockTrade trade, KinesisClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by // the Jackson JSON library. if (bytes == null) { System.out.println("Could not get JSON bytes for stock trade"); return; } System.out.println("Putting trade: " + trade); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in // the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request); } catch (KinesisException e) { System.err.println(e.getMessage()); } } private static void validateStream(KinesisClient kinesisClient, String streamName) { try { DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build(); DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest); if (!describeStreamResponse.streamDescription().streamStatus().toString().equals("ACTIVE")) { System.err.println("Stream " + streamName + " is not active. Please wait a few moments and try again."); System.exit(1); } } catch (KinesisException e) { System.err.println("Error found while describing the stream " + streamName); System.err.println(e); System.exit(1); } } }
의 전체 예제
타사 라이브러리 사용
사용자 지정 구독자를 구현하는 대신 타사 라이브러리를 사용할 수 있습니다. 이 예제에서는 RxJava 구현을 사용하는 방법을 보여주지만 Reactive Streams 인터페이스를 구현하는 모든 라이브러리를 사용할 수 있습니다. 해당 라이브러리에 대한 자세한 내용은 RxJava Github의 위키 페이지를
라이브러리를 사용하려면 종속성으로 추가합니다. Maven을 사용하는 경우 예제는 사용할 POM 조각을 보여줍니다.
POM 항목
<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.2.21</version> </dependency>
가져오기
import java.net.URI; import java.util.concurrent.CompletableFuture; import io.reactivex.Flowable; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.SdkHttpConfigurationOption; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.StartingPosition; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import software.amazon.awssdk.utils.AttributeMap;
이 예제는 onEventStream
수명 주기 메서 RxJava 드에서 를 사용합니다. 이는 게시자에 대한 모든 액세스를 제공하고, 이를 사용하여 Rx Flowable을 생성할 수 있습니다.
코드
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .onEventStream(p -> Flowable.fromPublisher(p) .ofType(SubscribeToShardEvent.class) .flatMapIterable(SubscribeToShardEvent::records) .limit(1000) .buffer(25) .subscribe(e -> System.out.println("Record batch = " + e))) .build();
또한 다음과 같이 publisherTransformer
게시자를 포함하여 Flowable
메서드를 사용할 수 있습니다. 다음 예제와 SdkPublisher같이 Flowable
게시자를 에 맞게 조정해야 합니다.
코드
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .publisherTransformer(p -> SdkPublisher.adapt(Flowable.fromPublisher(p).limit(100))) .build();
의 전체 예제
추가 정보
-
SubscribeToShardEvent 참조의 Amazon Kinesis API
-
SubscribeToShard 참조의 Amazon Kinesis API