Amazon Kinesis Data Streams 가입 - AWS SDK for Java 2.x

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon Kinesis Data Streams 가입

다음 예제에서는 subscribeToShard 메서드를 사용하여 Amazon Kinesis 데이터 스트림에서 데이터를 검색하고 처리하는 방법을 보여줍니다. Kinesis Data Streams에서는 이제 향상된 팬아웃 기능과 지연 시간이 짧은 HTTP/2 데이터 검색 API를 갖추어 개발자가 동일한 Kinesis 데이터 스트림에서 여러 개의 지연 시간이 짧은 고성능 애플리케이션을 쉽게 실행할 수 있습니다.

설정

먼저 비동기 Kinesis 클라이언트와 SubscribeToShardRequest 객체를 만듭니다. 이러한 객체는 다음 각 예제에서 사용되어 Kinesis 이벤트를 구독합니다.

가져오기

import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;

코드

Region region = Region.US_EAST_1; KinesisAsyncClient client = KinesisAsyncClient.builder() .region(region) .build(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId("arn:aws:kinesis:us-east-1:111122223333:stream/StockTradeStream") .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();

빌더 인터페이스 사용

builder 메서드를 사용하여 SubscribeToShardResponseHandler 생성을 간소화할 수 있습니다.

작성기를 사용하여 전체 인터페이스를 구현하는 대신 메서드 호출을 통해 각 수명 주기 콜백을 설정할 수 있습니다.

코드

private static CompletableFuture<Void> responseHandlerBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .onComplete(() -> System.out.println("All records stream successfully")) // Must supply some type of subscriber .subscriber(e -> System.out.println("Received event - " + e)) .build(); return client.subscribeToShard(request, responseHandler); }

게시자의 더욱 많은 제어를 위해 publisherTransformer 메서드를 사용하여 게시자를 사용자 지정할 수 있습니다.

코드

private static CompletableFuture<Void> responseHandlerBuilderPublisherTransformer(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .publisherTransformer(p -> p.filter(e -> e instanceof SubscribeToShardEvent).limit(100)) .subscriber(e -> System.out.println("Received event - " + e)) .build(); return client.subscribeToShard(request, responseHandler); }

GitHub의 전체 예제를 참조하세요.

사용자 지정 응답 핸들러 사용

구독자와 게시자의 완전한 제어를 위해 SubscribeToShardResponseHandler 인터페이스를 구현합니다.

이 예제에서는 onEventStream 메서드를 구현하고, 이는 게시자에 대한 모든 액세스를 허용합니다. 게시자를 구독자가 출력할 이벤트 레코드로 전환하는 방법을 보여줍니다.

코드

private static CompletableFuture<Void> responseHandlerBuilderClassic(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = new SubscribeToShardResponseHandler() { @Override public void responseReceived(SubscribeToShardResponse response) { System.out.println("Receieved initial response"); } @Override public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) { publisher // Filter to only SubscribeToShardEvents .filter(SubscribeToShardEvent.class) // Flat map into a publisher of just records .flatMapIterable(SubscribeToShardEvent::records) // Limit to 1000 total records .limit(1000) // Batch records into lists of 25 .buffer(25) // Print out each record batch .subscribe(batch -> System.out.println("Record Batch - " + batch)); } @Override public void complete() { System.out.println("All records stream successfully"); } @Override public void exceptionOccurred(Throwable throwable) { System.err.println("Error during stream - " + throwable.getMessage()); } }; return client.subscribeToShard(request, responseHandler); }

GitHub의 전체 예제를 참조하세요.

방문자 인터페이스 사용

Visitor 객체를 사용하여 보고 싶은 특정 이벤트를 구독할 수 있습니다.

코드

private static CompletableFuture<Void> responseHandlerBuilderVisitorBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = SubscribeToShardResponseHandler.Visitor .builder() .onSubscribeToShardEvent(e -> System.out.println("Received subscribe to shard event " + e)) .build(); SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); }

GitHub의 전체 예제를 참조하세요.

사용자 지정 구독자 사용

사용자 지정 구독자를 구현하여 스트림을 구독할 수도 있습니다.

이 코드 조각은 예제 구독자를 보여줍니다.

코드

private static class MySubscriber implements Subscriber<SubscribeToShardEventStream> { private Subscription subscription; private AtomicInteger eventCount = new AtomicInteger(0); @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; this.subscription.request(1); } @Override public void onNext(SubscribeToShardEventStream shardSubscriptionEventStream) { System.out.println("Received event " + shardSubscriptionEventStream); if (eventCount.incrementAndGet() >= 100) { // You can cancel the subscription at any time if you wish to stop receiving events. subscription.cancel(); } subscription.request(1); } @Override public void onError(Throwable throwable) { System.err.println("Error occurred while stream - " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("Finished streaming all events"); } }

다음 코드 스니펫과 같이 사용자 지정 구독자를 subscribe 메서드에 전달할 수 있습니다.

코드

private static CompletableFuture<Void> responseHandlerBuilderSubscriber(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(MySubscriber::new) .build(); return client.subscribeToShard(request, responseHandler); }

GitHub의 전체 예제를 참조하세요.

Kinesis 데이터 스트림에 데이터 레코드 쓰기

KinesisClient 객체를 이용하면 putRecords 메서드를 사용해 Kinesis 데이터 스트림에 데이터 레코드를 쓸 수 있습니다. 이 메서드를 성공적으로 호출하려면 PutRecordsRequest 객체를 만들어야 합니다. 데이터 스트림 이름을 streamName 메서드에 전달합니다. 또한 다음 코드 예제와 같이 putRecords 메서드를 사용하여 데이터를 전달해야 합니다.

가져오기

import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;

다음 Java 코드 예제에서는 StockTrade 객체가 Kinesis 데이터 스트림에 쓸 데이터로 사용됩니다. 이 예제를 실행하기 전에 데이터 스트림이 생성되었는지 확인합니다.

코드

import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class StockTradesWriter { public static void main(String[] args) { final String usage = """ Usage: <streamName> Where: streamName - The Amazon Kinesis data stream to which records are written (for example, StockTradeStream) """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String streamName = args[0]; Region region = Region.US_EAST_1; KinesisClient kinesisClient = KinesisClient.builder() .region(region) .build(); // Ensure that the Kinesis Stream is valid. validateStream(kinesisClient, streamName); setStockData(kinesisClient, streamName); kinesisClient.close(); } public static void setStockData(KinesisClient kinesisClient, String streamName) { try { // Repeatedly send stock trades with a 100 milliseconds wait in between. StockTradeGenerator stockTradeGenerator = new StockTradeGenerator(); // Put in 50 Records for this example. int index = 50; for (int x = 0; x < index; x++) { StockTrade trade = stockTradeGenerator.getRandomTrade(); sendStockTrade(trade, kinesisClient, streamName); Thread.sleep(100); } } catch (KinesisException | InterruptedException e) { System.err.println(e.getMessage()); System.exit(1); } System.out.println("Done"); } private static void sendStockTrade(StockTrade trade, KinesisClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by // the Jackson JSON library. if (bytes == null) { System.out.println("Could not get JSON bytes for stock trade"); return; } System.out.println("Putting trade: " + trade); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in // the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request); } catch (KinesisException e) { System.err.println(e.getMessage()); } } private static void validateStream(KinesisClient kinesisClient, String streamName) { try { DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build(); DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest); if (!describeStreamResponse.streamDescription().streamStatus().toString().equals("ACTIVE")) { System.err.println("Stream " + streamName + " is not active. Please wait a few moments and try again."); System.exit(1); } } catch (KinesisException e) { System.err.println("Error found while describing the stream " + streamName); System.err.println(e); System.exit(1); } } }

GitHub의 전체 예제를 참조하세요.

타사 라이브러리 사용

사용자 지정 구독자를 구현하는 대신 타사 라이브러리를 사용할 수 있습니다. 다음은 RxJava 구현을 사용하는 방법에 대한 예입니다. 그렇지만 반응형 스트림 인터페이스를 구현하는 모든 라이브러리를 사용할 수 있습니다. 해당 라이브러리에 대한 자세한 내용은 GitHub의 RxJava 위키 페이지를 참조하십시오.

라이브러리를 사용하려면 종속성으로 추가합니다. 이 예에서는 Maven을 사용하는 경우에 사용할 POM 조각을 알려줍니다.

POM 항목

<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.1.14</version> </dependency>

가져오기

import java.net.URI; import java.util.concurrent.CompletableFuture; import io.reactivex.Flowable; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.http.Protocol; import software.amazon.awssdk.http.SdkHttpConfigurationOption; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.StartingPosition; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import software.amazon.awssdk.utils.AttributeMap;

이 예제에서는 onEventStream 수명 주기 메서드에서 RxJava를 사용합니다. 이는 게시자에 대한 모든 액세스를 제공하고, 이를 사용하여 Rx Flowable을 생성할 수 있습니다.

코드

SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .onEventStream(p -> Flowable.fromPublisher(p) .ofType(SubscribeToShardEvent.class) .flatMapIterable(SubscribeToShardEvent::records) .limit(1000) .buffer(25) .subscribe(e -> System.out.println("Record batch = " + e))) .build();

또한 다음과 같이 publisherTransformer 게시자를 포함하여 Flowable 메서드를 사용할 수 있습니다. 다음 예제에 표시된 것과 같이 Flowable 게시자를 SdkPublisher로 조정해야 합니다.

코드

SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .publisherTransformer(p -> SdkPublisher.adapt(Flowable.fromPublisher(p).limit(100))) .build();

GitHub의 전체 예제를 참조하세요.

추가 정보