

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# で Amazon Kinesis Data Streams API を使用してプロデューサーを開発する AWS SDK for Java
<a name="developing-producers-with-sdk"></a>

 AWS SDK for Java で Amazon Kinesis Data Streams API を使用してプロデューサーを開発できます。Kinesis Data Streams を初めて利用する場合は、[Amazon Kinesis Data Streams とは](introduction.md)および[AWS CLI を使用して Amazon Kinesis Data Streams オペレーションを実行する](getting-started.md)で説明されている概念と用語について理解することから始めてください。

以下の例では、[Kinesis Data Streams API](https://docs.aws.amazon.com/kinesis/latest/APIReference/) について説明し、[AWS SDK for Java](https://aws.amazon.com/sdk-for-java/) を使用してストリームにデータを追加 (入力) します。ただし、ほとんどのユースケースでは、Kinesis Data Streams KPL ライブラリを使用します。詳細については、[Amazon Kinesis Producer Library (KPL) を使用してプロデューサーを開発する](developing-producers-with-kpl.md)を参照してください。

この章で紹介する Java サンプルコードは、基本的な Kinesis Data Streams API オペレーションを実行する方法を示しており、オペレーションタイプ別に論理的に分割されています。これらのサンプルは、すべての例外を確認しているわけではなく、すべてのセキュリティやパフォーマンスの側面を考慮しているわけでもない点で、本稼働環境に使用できるコードを表すものではありません。また、他のプログラミング言語を使用して [Kinesis Data Streams API](https://docs.aws.amazon.com/kinesis/latest/APIReference/) を呼び出すこともできます。利用可能なすべての AWS SDKs[「アマゾン ウェブ サービスの開発を開始する](https://aws.amazon.com/developers/getting-started/)」を参照してください。

各タスクには前提条件があります。たとえば、ストリームを作成するまではストリームにデータを追加できず、ストリームを作成するにはクライアントを作成する必要があります。詳細については、「[Kinesis Data Streams を作成して管理する](working-with-streams.md)」を参照してください。

**Topics**
+ [ストリームにデータを追加する](#kinesis-using-sdk-java-add-data-to-stream)
+ [AWS Glue Schema Registry を使用してデータを操作する](kinesis-integration-glue-schema-registry.md)

## ストリームにデータを追加する
<a name="kinesis-using-sdk-java-add-data-to-stream"></a>

ストリームを作成したら、レコードの形式でストリームにデータを追加できます。レコードはデータ BLOB の形式で処理するデータを格納するデータ構造です。データをレコードに保存した後、Kinesis Data Streams ではいずれの方法でもデータが検査、解釈、または変更されることはありません。各レコードにはシーケンス番号とパーティションキーも関連付けられます。

Kinesis Data Streams API には、ストリームにデータを追加するオペレーションとして [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) と [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) の 2 つの異なるオペレーションがあります。`PutRecords` オペレーションは HTTP リクエストごとストリームに複数のレコードを送信し、単数形の `PutRecord` オペレーションは一度に 1 つずつストリームにレコードを送信します (各レコードについて個別の HTTP リクエストが必要です)。データプロデューサーあたりのスループットが向上するため、ほとんどのアプリケーションでは `PutRecords` を使用してください。これらの各オペレーションの詳細については、後のそれぞれのサブセクションを参照してください。

**Topics**
+ [PutRecords を使用して複数のレコードを追加する](#kinesis-using-sdk-java-putrecords)
+ [PutRecord を使用して単一レコードを追加する](#kinesis-using-sdk-java-putrecord)

ソースアプリケーションは Kinesis Data Streams API を使用してストリームにデータを追加するため、1 つ以上のコンシューマーアプリケーションが同時にストリームからデータを取得して処理する可能性があることを常に念頭に置いてください。コンシューマーが Kinesis Data Streams API を使用してデータを取得する方法の詳細については、[ストリームからのデータを取得する](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data)を参照してください。

**重要**  
[データ保持期間を変更する](kinesis-extended-retention.md)

### PutRecords を使用して複数のレコードを追加する
<a name="kinesis-using-sdk-java-putrecords"></a>

[https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) オペレーションは、1 つのリクエストで Kinesis Data Streams に複数のレコードを送信します。`PutRecords` を使用することによって、プロデューサーは Kinesis Data Streams にデータを送信するときに高スループットを実現できます。各`PutRecords` リクエストは、最大 500 レコードをサポートできます。リクエストに含まれる各レコードは 1 MB、リクエスト全体の上限はパーティションキーを含めて最大 5 MB。後で説明する単一の `PutRecord` オペレーションと同様に、`PutRecords` はシーケンス番号とパーティションキーを使用します。ただし、`PutRecord` の `SequenceNumberForOrdering` パラメータは、`PutRecords` の呼び出しには含まれません。`PutRecords` オペレーションでは、リクエストの自然な順序ですべてのレコードを処理するよう試みます。

各データレコードには一意のシーケンス番号があります。シーケンス番号は、`client.putRecords` を呼び出してストリームにデータレコードを追加した後に、Kinesis Data Streams によって割り当てられます。同じパーティションキーのシーケンス番号は一般的に、時間の経過とともに大きくなります。`PutRecords`リクエスト間の期間が長くなるほど、シーケンス番号は大きくなります。

**注記**  
シーケンス番号は、同じストリーム内の一連のデータのインデックスとして使用することはできません。一連のデータを論理的に区別するには、パーティションキーを使用するか、データセットごとに個別のストリームを作成します。

`PutRecords` リクエストには、異なるパーティションキーのレコードを含めることができます。リクエストのスコープはストリームです。各リクエストには、リクエストの制限まで、パーティションキーとレコードのあらゆる組み合わせを含めることができます。複数の異なるパーティションキーを使用して、複数の異なるシャードを含むストリームに対して実行されたリクエストは、少数のパーティションキーを使用して少数のシャードに対して実行されたリクエストよりも一般的に高速です。レイテンシーを低減し、スループットを最大化するには、パーティションキーの数をシャードの数よりも大きくする必要があります。

#### PutRecords の例
<a name="kinesis-using-sdk-java-putrecords-example"></a>

次のコードでは、シーケンシャルなパーティションキーを持つ 100 件のデータレコードを作成し、`DataStream` という名前のストリームに格納しています。

```
        AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        
        clientBuilder.setRegion(regionName);
        clientBuilder.setCredentials(credentialsProvider);
        clientBuilder.setClientConfiguration(config);
        
        AmazonKinesis kinesisClient = clientBuilder.build();
 
        PutRecordsRequest putRecordsRequest  = new PutRecordsRequest();
        putRecordsRequest.setStreamName(streamName);
        List <PutRecordsRequestEntry> putRecordsRequestEntryList  = new ArrayList<>(); 
        for (int i = 0; i < 100; i++) {
            PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes()));
            putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
            putRecordsRequestEntryList.add(putRecordsRequestEntry); 
        }

        putRecordsRequest.setRecords(putRecordsRequestEntryList);
        PutRecordsResult putRecordsResult  = kinesisClient.putRecords(putRecordsRequest);
        System.out.println("Put Result" + putRecordsResult);
```

`PutRecords` のレスポンスには、レスポンスの `Records` の配列が含まれます。レスポンス配列の各レコードは、リクエスト配列内のレコードと自然な順序 (リクエストやレスポンスの上から下へ) で直接相互に関連付けられます。レスポンスの `Records` 配列には、常にリクエスト配列と同じ数のレコードが含まれます。

#### PutRecords 使用時のエラーを処理する
<a name="kinesis-using-sdk-java-putrecords-handling-failures"></a>

デフォルトでは、リクエスト内の個々のレコードでエラーが発生しても、`PutRecords` リクエスト内のそれ以降のレコードの処理は停止されません。つまり、レスポンスの `Records` 配列には、正常に処理されたレコードと、正常に処理されなかったレコードの両方が含まれていることを意味します。正常に処理されなかったレコードを検出し、それ以降の呼び出しに含める必要があります。

正常に処理されたレコードには `SequenceNumber` 値と `ShardID` 値が、正常に処理されなかったレコードには `ErrorCode` 値と `ErrorMessage` 値が含まれます。`ErrorCode` パラメータはエラーのタイプを反映し、`ProvisionedThroughputExceededException` または `InternalFailure` のいずれかの値になります。`ErrorMessage`は、`ProvisionedThroughputExceededException` 例外に関するより詳細な情報として、スロットリングされたレコードのアカウント ID、ストリーム名、シャード ID などを示します。次の例では、`PutRecords` リクエストに 3 つのレコードがあります。2 番目のレコードは失敗し、レスポンスに反映されます。

**Example PutRecords リクエストの構文**  

```
{
    "Records": [
        {
    	"Data": "XzxkYXRhPl8w",
	    "PartitionKey": "partitionKey1"
        },
        {
    	"Data": "AbceddeRFfg12asd",
	    "PartitionKey": "partitionKey1"	
        },
        {
    	"Data": "KFpcd98*7nd1",
	    "PartitionKey": "partitionKey3"
        }
    ],
    "StreamName": "myStream"
}
```

**Example PutRecords レスポンスの構文**  

```
{
    "FailedRecordCount”: 1,
    "Records": [
        {
	    "SequenceNumber": "21269319989900637946712965403778482371",
	    "ShardId": "shardId-000000000001"

        },
        {
	    “ErrorCode":”ProvisionedThroughputExceededException”,
	    “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."

        },
        {
	    "SequenceNumber": "21269319989999637946712965403778482985",
	    "ShardId": "shardId-000000000002"
        }
    ]
}
```

正常に処理されなかったレコードは、以降の `PutRecords` リクエストに含めることができます。最初に、`FailedRecordCount` の `putRecordsResult` パラメータを調べて、リクエスト内にエラーとなったレコードがあるかどうかを確認します。このようなレコードがある場合は、`putRecordsEntry` が `ErrorCode` 以外である各 `null` を、以降のリクエストに追加してください。このタイプのハンドラーの例については、次のコードを参照してください。

**Example PutRecords エラーハンドラー**  

```
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(myStreamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
for (int j = 0; j < 100; j++) {
    PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
    putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes()));
    putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j));
    putRecordsRequestEntryList.add(putRecordsRequestEntry);
}

putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);

while (putRecordsResult.getFailedRecordCount() > 0) {
    final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>();
    final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords();
    for (int i = 0; i < putRecordsResultEntryList.size(); i++) {
        final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i);
        final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i);
        if (putRecordsResultEntry.getErrorCode() != null) {
            failedRecordsList.add(putRecordRequestEntry);
        }
    }
    putRecordsRequestEntryList = failedRecordsList;
    putRecordsRequest.setRecords(putRecordsRequestEntryList);
    putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);
}
```

### PutRecord を使用して単一レコードを追加する
<a name="kinesis-using-sdk-java-putrecord"></a>

[https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) の各呼び出しは、1 つのレコードに対して動作します。アプリケーションで常にリクエストごとに 1 つのレコードを送信する必要がある場合や、`PutRecords` を使用できないその他の理由がある場合を除いて、[PutRecords を使用して複数のレコードを追加する](#kinesis-using-sdk-java-putrecords)で説明している `PutRecords` オペレーションを使用します。

各データレコードには一意のシーケンス番号があります。シーケンス番号は、`client.putRecord` を呼び出してストリームにデータレコードを追加した後に、Kinesis Data Streams によって割り当てられます。同じパーティションキーのシーケンス番号は一般的に、時間の経過とともに大きくなります。`PutRecord`リクエスト間の期間が長くなるほど、シーケンス番号は大きくなります。

 入力が立て続けに行われた場合、返されるシーケンス番号は大きくなるとは限りません。入力オペレーションが基本的に Kinesis Data Streams に対して同時に実行されるためです。同じパーティションキーに対して厳密にシーケンス番号が大きくなるようにするには、[PutRecord の例](#kinesis-using-sdk-java-putrecord-example)のサンプルコードに示しているように、`SequenceNumberForOrdering` パラメータを使用します。

 `SequenceNumberForOrdering` を使用するかどうかにかかわらず、inesis Data Streams が `GetRecords` の呼び出しを通じて受け取るレコードは厳密にシーケンス番号順になります。

**注記**  
シーケンス番号は、同じストリーム内の一連のデータのインデックスとして使用することはできません。一連のデータを論理的に区別するには、パーティションキーを使用するか、データセットごとに個別のストリームを作成します。

パーティションキーはストリーム内のデータをグループ化するために使用されます。データレコードはそのパーティションキーに基づいてストリーム内でシャードに割り当てられます。具体的には、Kinesis Data Streams ではパーティションキー (および関連するデータ) を特定のシャードにマッピングするハッシュ関数への入力として、パーティションキーを使用します。

 このハッシュメカニズムの結果として、パーティションキーが同じすべてのデータレコードは、ストリーム内で同じシャードにマッピングされます。ただし、パーティションキーの数がシャードの数を超えている場合、一部のシャードにパーティションキーが異なるレコードが格納されることがあります。設計の観点から、すべてのシャードが適切に使用されるようにするには、シャードの数 (`setShardCount` の `CreateStreamRequest` メソッドで指定) を一意のパーティションキーの数よりも大幅に少なくする必要があります。また、1 つのパーティションキーへのデータの流量をシャードの容量より大幅に小さくする必要があります。

#### PutRecord の例
<a name="kinesis-using-sdk-java-putrecord-example"></a>

以下のコードでは、2 つのパーティションキーに配分される 10 件のデータレコードを作成し、`myStreamName` という名前のストリームに格納しています。

```
for (int j = 0; j < 10; j++) 
{
  PutRecordRequest putRecordRequest = new PutRecordRequest();
  putRecordRequest.setStreamName( myStreamName );
  putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() ));
  putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 ));  
  putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord );
  PutRecordResult putRecordResult = client.putRecord( putRecordRequest );
  sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber();
}
```

上記のコード例では、`setSequenceNumberForOrdering` を使用して、各パーティションキー内で順番が厳密に増えるようにしています。このパラメータを効果的に使用するには、現在のレコードの `SequenceNumberForOrdering` (レコード *n*) を前のレコード (レコード *n-1*) のシーケンス番号に設定します。ストリームに追加されたレコードのシーケンス番号を取得するには、`getSequenceNumber` の結果に対して `putRecord` を呼び出します。

`SequenceNumberForOrdering` パラメーターを指定すると、同じパーティションキーのシーケンス番号が厳密に大きくなります。`SequenceNumberForOrdering`では、複数のパーティションキーにわたるレコードの順序付けは用意されていません。

# AWS Glue Schema Registry を使用してデータを操作する
<a name="kinesis-integration-glue-schema-registry"></a>

Kinesis データストリームを AWS Glue Schema Registry と統合できます。 AWS Glue スキーマレジストリを使用すると、スキーマを一元的に検出、制御、および進化させながら、生成されたデータが登録されたスキーマによって継続的に検証されるようにできます。スキーマ は、データレコードの構造と形式を定義します。スキーマは、信頼性の高いデータの公開、利用、または保存のための仕様をバージョニングしたものです。 AWS Glue スキーマレジストリを使用すると、ストリーミングアプリケーション内のend-to-endのデータ品質とデータガバナンスを改善できます。詳細については、[AWS Glue スキーマレジストリ](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)を参照してください。この統合を設定する方法の 1 つは、 AWS Java SDK で利用可能な `PutRecords` および `PutRecord` Kinesis Data Streams API を使用することです。

PutRecords および PutRecord Kinesis Data Streams APIs、ユースケース: Amazon Kinesis Data Streams と Glue スキーマレジストリの統合の「Kinesis Data Streams APIs を使用したデータの操作」セクションを参照してください。 [ Amazon Kinesis AWS](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)