

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Kinesis 작업
<a name="examples-kinesis"></a>

이 단원에서는 AWS SDK for Java 2.x를 사용한 [Amazon Kinesis](https://docs.aws.amazon.com/kinesis/) 프로그래밍의 예제를 제공합니다.

Kinesis에 대한 자세한 내용은 [Amazon Kinesis 개발자 안내서](https://docs.aws.amazon.com/streams/latest/dev/introduction.html)를 참조하세요.

다음 예제에는 각 기술을 보여주는 데 필요한 코드만 포함되어 있습니다. [전체 예제 코드는 GitHub](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/javav2)에 있습니다. 이 위치에서 단일 소스 파일을 다운로드하거나 리포지토리를 로컬로 복사하여 모든 예제를 빌드하고 실행할 수 있습니다.

**Topics**
+ [Amazon Kinesis Data Streams 가입](examples-kinesis-stream.md)

# Amazon Kinesis Data Streams 가입
<a name="examples-kinesis-stream"></a>

다음 예제에서는 `subscribeToShard` 메서드를 사용하여 Amazon Kinesis 데이터 스트림에서 데이터를 검색하고 처리하는 방법을 보여줍니다. Kinesis Data Streams에서는 이제 향상된 팬아웃 기능과 지연 시간이 짧은 HTTP/2 데이터 검색 API를 갖추어 개발자가 동일한 Kinesis 데이터 스트림에서 여러 개의 지연 시간이 짧은 고성능 애플리케이션을 쉽게 실행할 수 있습니다.

## 설정
<a name="set-up"></a>

먼저 비동기 Kinesis 클라이언트와 [SubscribeToShardRequest](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/model/SubscribeToShardRequest.html) 객체를 만듭니다. 이러한 객체는 다음 각 예제에서 사용되어 Kinesis 이벤트를 구독합니다.

 **가져옵니다**.

```
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
```

 ** 코드** 

```
        Region region = Region.US_EAST_1;
        KinesisAsyncClient client = KinesisAsyncClient.builder()
        .region(region)
        .build();

        SubscribeToShardRequest request = SubscribeToShardRequest.builder()
                .consumerARN(CONSUMER_ARN)
                .shardId("arn:aws:kinesis:us-east-1:111122223333:stream/StockTradeStream")
                .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
```

## 빌더 인터페이스 사용
<a name="use-the-builder-interface"></a>

`builder` 메서드를 사용하여 [SubscribeToShardResponseHandler](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/model/SubscribeToShardResponseHandler.html) 생성을 간소화할 수 있습니다.

작성기를 사용하여 전체 인터페이스를 구현하는 대신 메서드 호출을 통해 각 수명 주기 콜백을 설정할 수 있습니다.

 ** 코드** 

```
    private static CompletableFuture<Void> responseHandlerBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) {
        SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                .builder()
                .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                .onComplete(() -> System.out.println("All records stream successfully"))
                // Must supply some type of subscriber
                .subscriber(e -> System.out.println("Received event - " + e))
                .build();
        return client.subscribeToShard(request, responseHandler);
    }
```

게시자의 더욱 많은 제어를 위해 `publisherTransformer` 메서드를 사용하여 게시자를 사용자 지정할 수 있습니다.

 ** 코드** 

```
    private static CompletableFuture<Void> responseHandlerBuilderPublisherTransformer(KinesisAsyncClient client, SubscribeToShardRequest request) {
        SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                .builder()
                .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                .publisherTransformer(p -> p.filter(e -> e instanceof SubscribeToShardEvent).limit(100))
                .subscriber(e -> System.out.println("Received event - " + e))
                .build();
        return client.subscribeToShard(request, responseHandler);
    }
```

GitHub의 [전체 예제](https://github.com/awsdocs/aws-doc-sdk-examples/blob/ac748d8ef99cd17e297cb74fe13aa671e2679088/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java)를 참조하세요.

## 사용자 지정 응답 핸들러 사용
<a name="use-a-custom-response-handler"></a>

구독자와 게시자의 완전한 제어를 위해 `SubscribeToShardResponseHandler` 인터페이스를 구현합니다.

이 예제에서는 `onEventStream` 메서드를 구현하고, 이는 게시자에 대한 모든 액세스를 허용합니다. 게시자를 구독자가 출력할 이벤트 레코드로 전환하는 방법을 보여줍니다.

 ** 코드** 

```
    private static CompletableFuture<Void> responseHandlerBuilderClassic(KinesisAsyncClient client, SubscribeToShardRequest request) {
        SubscribeToShardResponseHandler responseHandler = new SubscribeToShardResponseHandler() {

            @Override
            public void responseReceived(SubscribeToShardResponse response) {
                System.out.println("Receieved initial response");
            }

            @Override
            public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) {
                publisher
                        // Filter to only SubscribeToShardEvents
                        .filter(SubscribeToShardEvent.class)
                        // Flat map into a publisher of just records
                        .flatMapIterable(SubscribeToShardEvent::records)
                        // Limit to 1000 total records
                        .limit(1000)
                        // Batch records into lists of 25
                        .buffer(25)
                        // Print out each record batch
                        .subscribe(batch -> System.out.println("Record Batch - " + batch));
            }

            @Override
            public void complete() {
                System.out.println("All records stream successfully");
            }

            @Override
            public void exceptionOccurred(Throwable throwable) {
                System.err.println("Error during stream - " + throwable.getMessage());
            }
        };
        return client.subscribeToShard(request, responseHandler);
    }
```

GitHub의 [전체 예제](https://github.com/awsdocs/aws-doc-sdk-examples/blob/ac748d8ef99cd17e297cb74fe13aa671e2679088/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java)를 참조하세요.

## 방문자 인터페이스 사용
<a name="use-the-visitor-interface"></a>

[Visitor](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/model/SubscribeToShardResponseHandler.Visitor.html) 객체를 사용하여 보고 싶은 특정 이벤트를 구독할 수 있습니다.

 ** 코드** 

```
    private static CompletableFuture<Void> responseHandlerBuilderVisitorBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) {
        SubscribeToShardResponseHandler.Visitor visitor = SubscribeToShardResponseHandler.Visitor
                .builder()
                .onSubscribeToShardEvent(e -> System.out.println("Received subscribe to shard event " + e))
                .build();
        SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                .builder()
                .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                .subscriber(visitor)
                .build();
        return client.subscribeToShard(request, responseHandler);
    }
```

GitHub의 [전체 예제](https://github.com/awsdocs/aws-doc-sdk-examples/blob/ac748d8ef99cd17e297cb74fe13aa671e2679088/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java)를 참조하세요.

## 사용자 지정 구독자 사용
<a name="use-a-custom-subscriber"></a>

사용자 지정 구독자를 구현하여 스트림을 구독할 수도 있습니다.

이 코드 조각은 예제 구독자를 보여줍니다.

 ** 코드** 

```
    private static class MySubscriber implements Subscriber<SubscribeToShardEventStream> {

        private Subscription subscription;
        private AtomicInteger eventCount = new AtomicInteger(0);

        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            this.subscription.request(1);
        }

        @Override
        public void onNext(SubscribeToShardEventStream shardSubscriptionEventStream) {
            System.out.println("Received event " + shardSubscriptionEventStream);
            if (eventCount.incrementAndGet() >= 100) {
                // You can cancel the subscription at any time if you wish to stop receiving events.
                subscription.cancel();
            }
            subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            System.err.println("Error occurred while stream - " + throwable.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println("Finished streaming all events");
        }
    }
```

다음 코드 스니펫과 같이 사용자 지정 구독자를 `subscribe` 메서드에 전달할 수 있습니다.

 ** 코드** 

```
    private static CompletableFuture<Void> responseHandlerBuilderSubscriber(KinesisAsyncClient client, SubscribeToShardRequest request) {
        SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                .builder()
                .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                .subscriber(MySubscriber::new)
                .build();
        return client.subscribeToShard(request, responseHandler);
    }
```

GitHub의 [전체 예제](https://github.com/awsdocs/aws-doc-sdk-examples/blob/ac748d8ef99cd17e297cb74fe13aa671e2679088/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java)를 참조하세요.

## Kinesis 데이터 스트림에 데이터 레코드 쓰기
<a name="write-data-records-into-a-kinesis-data-stream"></a>

[KinesisClient](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/KinesisClient.html) 객체를 이용하면 `putRecords` 메서드를 사용해 Kinesis 데이터 스트림에 데이터 레코드를 쓸 수 있습니다. 이 메서드를 성공적으로 호출하려면 [PutRecordsRequest](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/model/PutRecordsRequest.html) 객체를 만들어야 합니다. 데이터 스트림 이름을 `streamName` 메서드에 전달합니다. 또한 다음 코드 예제와 같이 `putRecords` 메서드를 사용하여 데이터를 전달해야 합니다.

 **가져옵니다**.

```
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
```

다음 Java 코드 예제에서는 **StockTrade** 객체가 Kinesis 데이터 스트림에 쓸 데이터로 사용됩니다. 이 예제를 실행하기 전에 데이터 스트림이 생성되었는지 확인합니다.

 ** 코드** 

```
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;

/**
 * Before running this Java V2 code example, set up your development
 * environment, including your credentials.
 *
 * For more information, see the following documentation topic:
 *
 * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
 */
public class StockTradesWriter {
    public static void main(String[] args) {
        final String usage = """

                Usage:
                    <streamName>

                Where:
                    streamName - The Amazon Kinesis data stream to which records are written (for example, StockTradeStream)
                """;

        if (args.length != 1) {
            System.out.println(usage);
            System.exit(1);
        }

        String streamName = args[0];
        Region region = Region.US_EAST_1;
        KinesisClient kinesisClient = KinesisClient.builder()
                .region(region)
                .build();

        // Ensure that the Kinesis Stream is valid.
        validateStream(kinesisClient, streamName);
        setStockData(kinesisClient, streamName);
        kinesisClient.close();
    }

    public static void setStockData(KinesisClient kinesisClient, String streamName) {
        try {
            // Repeatedly send stock trades with a 100 milliseconds wait in between.
            StockTradeGenerator stockTradeGenerator = new StockTradeGenerator();

            // Put in 50 Records for this example.
            int index = 50;
            for (int x = 0; x < index; x++) {
                StockTrade trade = stockTradeGenerator.getRandomTrade();
                sendStockTrade(trade, kinesisClient, streamName);
                Thread.sleep(100);
            }

        } catch (KinesisException | InterruptedException e) {
            System.err.println(e.getMessage());
            System.exit(1);
        }
        System.out.println("Done");
    }

    private static void sendStockTrade(StockTrade trade, KinesisClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();

        // The bytes could be null if there is an issue with the JSON serialization by
        // the Jackson JSON library.
        if (bytes == null) {
            System.out.println("Could not get JSON bytes for stock trade");
            return;
        }

        System.out.println("Putting trade: " + trade);
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in
                                                       // the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();

        try {
            kinesisClient.putRecord(request);
        } catch (KinesisException e) {
            System.err.println(e.getMessage());
        }
    }

    private static void validateStream(KinesisClient kinesisClient, String streamName) {
        try {
            DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
                    .streamName(streamName)
                    .build();

            DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest);

            if (!describeStreamResponse.streamDescription().streamStatus().toString().equals("ACTIVE")) {
                System.err.println("Stream " + streamName + " is not active. Please wait a few moments and try again.");
                System.exit(1);
            }

        } catch (KinesisException e) {
            System.err.println("Error found while describing the stream " + streamName);
            System.err.println(e);
            System.exit(1);
        }
    }
}
```

GitHub의 [전체 예제](https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/StockTradesWriter.java)를 참조하세요.

## 타사 라이브러리 사용
<a name="use-a-third-party-library"></a>

사용자 지정 구독자를 구현하는 대신 타사 라이브러리를 사용할 수 있습니다. 다음은 RxJava 구현을 사용하는 방법에 대한 예입니다. 그렇지만 반응형 스트림 인터페이스를 구현하는 모든 라이브러리를 사용할 수 있습니다. 해당 라이브러리에 대한 자세한 내용은 [GitHub의 RxJava 위키 페이지](https://github.com/ReactiveX/RxJava/wiki)를 참조하십시오.

라이브러리를 사용하려면 종속성으로 추가합니다. 이 예에서는 Maven을 사용하는 경우에 사용할 POM 조각을 알려줍니다.

 **POM 항목** 

```
<dependency>
 <groupId>io.reactivex.rxjava2</groupId>
 <artifactId>rxjava</artifactId>
 <version>2.2.21</version>
</dependency>
```

 **가져옵니다**.

```
import java.net.URI;
import java.util.concurrent.CompletableFuture;

import io.reactivex.Flowable;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.StartingPosition;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import software.amazon.awssdk.utils.AttributeMap;
```

이 예제에서는 `onEventStream` 수명 주기 메서드에서 RxJava를 사용합니다. 이는 게시자에 대한 모든 액세스를 제공하고, 이를 사용하여 Rx Flowable을 생성할 수 있습니다.

 ** 코드** 

```
        SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
            .builder()
            .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
            .onEventStream(p -> Flowable.fromPublisher(p)
                                        .ofType(SubscribeToShardEvent.class)
                                        .flatMapIterable(SubscribeToShardEvent::records)
                                        .limit(1000)
                                        .buffer(25)
                                        .subscribe(e -> System.out.println("Record batch = " + e)))
            .build();
```

또한 다음과 같이 `publisherTransformer` 게시자를 포함하여 `Flowable` 메서드를 사용할 수 있습니다. 다음 예제에 표시된 것과 같이 `Flowable` 게시자를 *SdkPublisher*로 조정해야 합니다.

 ** 코드** 

```
        SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
            .builder()
            .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
            .publisherTransformer(p -> SdkPublisher.adapt(Flowable.fromPublisher(p).limit(100)))
            .build();
```

GitHub의 [전체 예제](https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamRxJavaEx.java)를 참조하세요.

## 추가 정보
<a name="more-information"></a>
+  Amazon Kinesis API 참조의 [SubscribeToShardEvent](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html)
+  Amazon Kinesis API 참조의 [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)