

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Kinesis を使用する
<a name="examples-kinesis"></a>

このセクションでは、AWS SDK for Java 2.x を使用して [Amazon Kinesis](https://docs.aws.amazon.com/kinesis/) をプログラムする例を示します。

Kinesis の詳細については、「[Amazon Kinesis デベロッパーガイド](https://docs.aws.amazon.com/streams/latest/dev/introduction.html)」を参照してください。

次の例には各手法を示すのに必要なコードのみが含まれます。[完全なサンプルコードは GitHub で入手できます](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/javav2)。そこから、単一のソースファイルをダウンロードするかリポジトリをローカルにクローン作成して、ビルドし実行するためのすべての例を取得できます。

**Topics**
+ [Amazon Kinesis Data Streams​ へのサブスクライブ](examples-kinesis-stream.md)

# Amazon Kinesis Data Streams​ へのサブスクライブ
<a name="examples-kinesis-stream"></a>

以下の例では、Amazon Kinesis メソッドを使用し、`subscribeToShard` Data Streams からデータを取得して処理する方法について説明します。Kinesis Data Streams では、強化されたファンアウト機能と低レイテンシーの HTTP/2 データ取得 API が導入されました。これにより、デベロッパーは複数の低レイテンシーで高パフォーマンスのアプリケーションを、同じ Kinesis Data Streams でより簡単に実行できます。

## セットアップ
<a name="set-up"></a>

まず、非同期 Kinesisクライアントと [SubscribeToShardRequest](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/model/SubscribeToShardRequest.html) オブジェクトを作成します。これらのオブジェクトは、Kinesis イベントにサブスクライブする次のそれぞれの例で使用されます。

 **インポート** 

```
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
```

 **コード** 

```
        Region region = Region.US_EAST_1;
        KinesisAsyncClient client = KinesisAsyncClient.builder()
        .region(region)
        .build();

        SubscribeToShardRequest request = SubscribeToShardRequest.builder()
                .consumerARN(CONSUMER_ARN)
                .shardId("arn:aws:kinesis:us-east-1:111122223333:stream/StockTradeStream")
                .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
```

## ビルダーインターフェイスを使用する
<a name="use-the-builder-interface"></a>

`builder` メソッドを使用して、[SubscribeToShardResponseHandler](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/model/SubscribeToShardResponseHandler.html) の作成をシンプルにできます。

ビルダーを使用して、完全なインターフェイスを実装する代わりにメソッド呼び出しで各ライフサイクルのコールバックを設定できます。

 **コード** 

```
    private static CompletableFuture<Void> responseHandlerBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) {
        SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                .builder()
                .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                .onComplete(() -> System.out.println("All records stream successfully"))
                // Must supply some type of subscriber
                .subscriber(e -> System.out.println("Received event - " + e))
                .build();
        return client.subscribeToShard(request, responseHandler);
    }
```

公開者をさらに制御するには、`publisherTransformer` メソッドを使用して公開者をカスタマイズできます。

 **Code** 

```
    private static CompletableFuture<Void> responseHandlerBuilderPublisherTransformer(KinesisAsyncClient client, SubscribeToShardRequest request) {
        SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                .builder()
                .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                .publisherTransformer(p -> p.filter(e -> e instanceof SubscribeToShardEvent).limit(100))
                .subscriber(e -> System.out.println("Received event - " + e))
                .build();
        return client.subscribeToShard(request, responseHandler);
    }
```

[GitHub](https://github.com/awsdocs/aws-doc-sdk-examples/blob/ac748d8ef99cd17e297cb74fe13aa671e2679088/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java) で完全な例をご覧ください。

## カスタムレスポンスハンドラを使用する
<a name="use-a-custom-response-handler"></a>

サブスクライバーとパブリッシャーの完全なコントロールのためには、`SubscribeToShardResponseHandler` インターフェイスを実装します。

この例では、`onEventStream` メソッドを実装します。これにより、公開者へのフルアクセスが許可されます。このデモでは、受信者による表示のために公開者をイベントレコードに変換する方法を示します。

 **Code** 

```
    private static CompletableFuture<Void> responseHandlerBuilderClassic(KinesisAsyncClient client, SubscribeToShardRequest request) {
        SubscribeToShardResponseHandler responseHandler = new SubscribeToShardResponseHandler() {

            @Override
            public void responseReceived(SubscribeToShardResponse response) {
                System.out.println("Receieved initial response");
            }

            @Override
            public void onEventStream(SdkPublisher<SubscribeToShardEventStream> publisher) {
                publisher
                        // Filter to only SubscribeToShardEvents
                        .filter(SubscribeToShardEvent.class)
                        // Flat map into a publisher of just records
                        .flatMapIterable(SubscribeToShardEvent::records)
                        // Limit to 1000 total records
                        .limit(1000)
                        // Batch records into lists of 25
                        .buffer(25)
                        // Print out each record batch
                        .subscribe(batch -> System.out.println("Record Batch - " + batch));
            }

            @Override
            public void complete() {
                System.out.println("All records stream successfully");
            }

            @Override
            public void exceptionOccurred(Throwable throwable) {
                System.err.println("Error during stream - " + throwable.getMessage());
            }
        };
        return client.subscribeToShard(request, responseHandler);
    }
```

[GitHub](https://github.com/awsdocs/aws-doc-sdk-examples/blob/ac748d8ef99cd17e297cb74fe13aa671e2679088/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java) で完全な例をご覧ください。

## 訪問者インターフェイスを使用する
<a name="use-the-visitor-interface"></a>

[Visitor](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/model/SubscribeToShardResponseHandler.Visitor.html) オブジェクトを使用して、監視する特定のイベントにサブスクライブできます。

 **Code** 

```
    private static CompletableFuture<Void> responseHandlerBuilderVisitorBuilder(KinesisAsyncClient client, SubscribeToShardRequest request) {
        SubscribeToShardResponseHandler.Visitor visitor = SubscribeToShardResponseHandler.Visitor
                .builder()
                .onSubscribeToShardEvent(e -> System.out.println("Received subscribe to shard event " + e))
                .build();
        SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                .builder()
                .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                .subscriber(visitor)
                .build();
        return client.subscribeToShard(request, responseHandler);
    }
```

[GitHub](https://github.com/awsdocs/aws-doc-sdk-examples/blob/ac748d8ef99cd17e297cb74fe13aa671e2679088/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java) で完全な例をご覧ください。

## カスタム受信者を使用する
<a name="use-a-custom-subscriber"></a>

独自のカスタム受信者を実装して、ストリームにサブスクライブすることもできます。

このコードスニペットでは、サブスクライバーの例を示しています。

 **コード** 

```
    private static class MySubscriber implements Subscriber<SubscribeToShardEventStream> {

        private Subscription subscription;
        private AtomicInteger eventCount = new AtomicInteger(0);

        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            this.subscription.request(1);
        }

        @Override
        public void onNext(SubscribeToShardEventStream shardSubscriptionEventStream) {
            System.out.println("Received event " + shardSubscriptionEventStream);
            if (eventCount.incrementAndGet() >= 100) {
                // You can cancel the subscription at any time if you wish to stop receiving events.
                subscription.cancel();
            }
            subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            System.err.println("Error occurred while stream - " + throwable.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println("Finished streaming all events");
        }
    }
```

次のコードスニペットに示すように、カスタムサブスクライバーを `subscribe` メソッドに渡すことができます。

 **Code** 

```
    private static CompletableFuture<Void> responseHandlerBuilderSubscriber(KinesisAsyncClient client, SubscribeToShardRequest request) {
        SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                .builder()
                .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                .subscriber(MySubscriber::new)
                .build();
        return client.subscribeToShard(request, responseHandler);
    }
```

[GitHub](https://github.com/awsdocs/aws-doc-sdk-examples/blob/ac748d8ef99cd17e297cb74fe13aa671e2679088/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java) で完全な例をご覧ください。

## Kinesis データストリームにデータレコードを書き込む
<a name="write-data-records-into-a-kinesis-data-stream"></a>

[KinesisClient](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/KinesisClient.html) オブジェクトで `putRecords` メソッドを使用して、データレコードを Kinesis データストリームに書き込むことができます。このメソッドを正常に呼び出すには、[PutRecordsRequest](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/kinesis/model/PutRecordsRequest.html) オブジェクトを作成します。データストリームの名前を `streamName` メソッドに渡します。また、`putRecords` メソッドを使用してデータを渡す必要があります (次のコード例に示します)。

 **インポート**。

```
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
```

次の Java コード例では、**StockTrade** オブジェクトが Kinesis データストリームに書き込むデータとして使用されています。この例を実行する前に、データストリームを作成したことを確認します。

 **Code** 

```
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;

/**
 * Before running this Java V2 code example, set up your development
 * environment, including your credentials.
 *
 * For more information, see the following documentation topic:
 *
 * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
 */
public class StockTradesWriter {
    public static void main(String[] args) {
        final String usage = """

                Usage:
                    <streamName>

                Where:
                    streamName - The Amazon Kinesis data stream to which records are written (for example, StockTradeStream)
                """;

        if (args.length != 1) {
            System.out.println(usage);
            System.exit(1);
        }

        String streamName = args[0];
        Region region = Region.US_EAST_1;
        KinesisClient kinesisClient = KinesisClient.builder()
                .region(region)
                .build();

        // Ensure that the Kinesis Stream is valid.
        validateStream(kinesisClient, streamName);
        setStockData(kinesisClient, streamName);
        kinesisClient.close();
    }

    public static void setStockData(KinesisClient kinesisClient, String streamName) {
        try {
            // Repeatedly send stock trades with a 100 milliseconds wait in between.
            StockTradeGenerator stockTradeGenerator = new StockTradeGenerator();

            // Put in 50 Records for this example.
            int index = 50;
            for (int x = 0; x < index; x++) {
                StockTrade trade = stockTradeGenerator.getRandomTrade();
                sendStockTrade(trade, kinesisClient, streamName);
                Thread.sleep(100);
            }

        } catch (KinesisException | InterruptedException e) {
            System.err.println(e.getMessage());
            System.exit(1);
        }
        System.out.println("Done");
    }

    private static void sendStockTrade(StockTrade trade, KinesisClient kinesisClient,
            String streamName) {
        byte[] bytes = trade.toJsonAsBytes();

        // The bytes could be null if there is an issue with the JSON serialization by
        // the Jackson JSON library.
        if (bytes == null) {
            System.out.println("Could not get JSON bytes for stock trade");
            return;
        }

        System.out.println("Putting trade: " + trade);
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in
                                                       // the Supplemental Information section below.
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(bytes))
                .build();

        try {
            kinesisClient.putRecord(request);
        } catch (KinesisException e) {
            System.err.println(e.getMessage());
        }
    }

    private static void validateStream(KinesisClient kinesisClient, String streamName) {
        try {
            DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
                    .streamName(streamName)
                    .build();

            DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest);

            if (!describeStreamResponse.streamDescription().streamStatus().toString().equals("ACTIVE")) {
                System.err.println("Stream " + streamName + " is not active. Please wait a few moments and try again.");
                System.exit(1);
            }

        } catch (KinesisException e) {
            System.err.println("Error found while describing the stream " + streamName);
            System.err.println(e);
            System.exit(1);
        }
    }
}
```

[GitHub](https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/StockTradesWriter.java) で完全な例をご覧ください。

## サードパーティーライブラリを使用する
<a name="use-a-third-party-library"></a>

カスタムの受信者を実装せずに、その他のサードパーティーのライブラリを使用することができます。この例では、RxJava の実装を例に挙げていますが、リアクティブなストリームのインターフェイスを実装するライブラリを使用することもできます。上記ライブラリの詳細については、「[Github の RxJava wiki ページ](https://github.com/ReactiveX/RxJava/wiki)」を参照してください。

このライブラリを使用するには、依存関係として追加します。使用する POM スニペットの例を示します (Maven を使用している場合)。

 **POM エントリ** 

```
<dependency>
 <groupId>io.reactivex.rxjava2</groupId>
 <artifactId>rxjava</artifactId>
 <version>2.2.21</version>
</dependency>
```

 **インポート**。

```
import java.net.URI;
import java.util.concurrent.CompletableFuture;

import io.reactivex.Flowable;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.StartingPosition;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import software.amazon.awssdk.utils.AttributeMap;
```

この例では、`onEventStream` ライフサイクルメソッドの RxJava を使用します。これにより、発行者へのフルアクセスが付与され、これを使用して Rx Flowable を作成できます。

 **コード** 

```
        SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
            .builder()
            .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
            .onEventStream(p -> Flowable.fromPublisher(p)
                                        .ofType(SubscribeToShardEvent.class)
                                        .flatMapIterable(SubscribeToShardEvent::records)
                                        .limit(1000)
                                        .buffer(25)
                                        .subscribe(e -> System.out.println("Record batch = " + e)))
            .build();
```

`publisherTransformer` 発行者を指定して、`Flowable` メソッドを使用することもできます。次の例に示すように、`Flowable` 発行者を *SdkPublisher* に適用する必要があります。

 **Code** 

```
        SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
            .builder()
            .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
            .publisherTransformer(p -> SdkPublisher.adapt(Flowable.fromPublisher(p).limit(100)))
            .build();
```

[GitHub](https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamRxJavaEx.java) で完全な例をご覧ください。

## 詳細情報
<a name="more-information"></a>
+  Amazon Kinesis API リファレンスの [SubscribeToShardEvent](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html)
+  Amazon Kinesis API リファレンスの [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)