

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Amazon Kinesis Data Streams API 搭配 開發生產者 適用於 Java 的 AWS SDK
<a name="developing-producers-with-sdk"></a>

您可以使用 Amazon Kinesis Data Streams API 搭配適用於 Java 的 AWS SDK 來開發生產者。如果您是 Kinesis Data Streams 的新手，請先熟悉一下 [什麼是 Amazon Kinesis Data Streams？](introduction.md) 及 [使用 AWS CLI 執行 Amazon Kinesis Data Streams 操作](getting-started.md) 所介紹的概念和術語。

本文範例會討論 [Kinesis Data Streams API](https://docs.aws.amazon.com/kinesis/latest/APIReference/) 並使用[適用於 Java 的AWS SDK](https://aws.amazon.com/sdk-for-java/) 將資料加入 (放入) 串流。不過，對於大部分的使用案例，則應使用 Kinesis Data Streams KPL 程式庫為宜。如需詳細資訊，請參閱[使用 Amazon Kinesis Producer Library (KPL) 開發生產者](developing-producers-with-kpl.md)。

本章的 Java 範例程式碼示範如何執行基本的 Kinesis Data Streams API 操作，並依操作類型按照邏輯進行劃分。這些範例不代表可立即生產的程式碼，無法檢查出所有可能的例外狀況，也不可視為任何潛在安全或效能疑慮的原因。此外，您亦可使用其他程式設計語言呼叫 [Kinesis Data Streams API](https://docs.aws.amazon.com/kinesis/latest/APIReference/)。如需所有 AWS SDKs的詳細資訊，請參閱[使用 Amazon Web Services 開始開發](https://aws.amazon.com/developers/getting-started/)。

每項任務皆有其先決條件；例如，若要加入資料至串流，您必須先建立串流，而建立串流則需事先建立用戶端。如需詳細資訊，請參閱[建立和管理 Kinesis 資料串流](working-with-streams.md)。

**Topics**
+ [將資料新增至串流](#kinesis-using-sdk-java-add-data-to-stream)
+ [使用 AWS Glue 結構描述登錄檔與資料互動](kinesis-integration-glue-schema-registry.md)

## 將資料新增至串流
<a name="kinesis-using-sdk-java-add-data-to-stream"></a>

一旦建立了串流之後，您即可將資料以記錄的形式加入至該串流。記錄是一種資料結構，其中包含所要處理的資料 Blob 形式的資料。當您將資料存放於記錄後，Kinesis Data Streams 即絲毫不會檢查、解譯或變更該資料。每筆記錄也各有其相關聯的序號和分割區索引鍵。

Kinesis Data Streams API 提供兩種不同的操作可新增資料至串流：[https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) 和 [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html)。`PutRecords` 操作會透過 HTTP 請求將多筆記錄傳送至您的串流，而單數的 `PutRecord` 操作則是一次傳送一筆記錄至您的串流 (需針對每筆記錄發出單獨的 HTTP 請求)。大多數應用程式均應使用 `PutRecords` 為宜，因為這將使每個資料生產者達到更高的傳輸量。如需上述各項操作的詳細資訊，請分別參閱以下各小節。

**Topics**
+ [使用 PutRecords 新增多筆記錄](#kinesis-using-sdk-java-putrecords)
+ [使用 PutRecord 新增單一記錄](#kinesis-using-sdk-java-putrecord)

務請切記，若您的來源應用程式使用 Kinesis Data Streams API 加入資料至串流，很可能會有一個或多個取用者應用程式同時處理從串流取出的資料。如需有關取用者如何使用 Kinesis Data Streams API 取得資料的詳細資訊，請參閱 [從串流取得資料](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data)。

**重要**  
[變更資料保留期間](kinesis-extended-retention.md)

### 使用 PutRecords 新增多筆記錄
<a name="kinesis-using-sdk-java-putrecords"></a>

[https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) 操作會透過單次請求將多筆記錄傳送至 Kinesis Data Streams。藉由使用 `PutRecords`，生產者傳送資料至其 Kinesis 資料串流時將可達到更高的輸送量。每個 `PutRecords` 請求最高可支援 500 筆記錄。請求中的每筆記錄最大可為 1 MB，整個請求的最高限制為 5 MB，包括分區索引鍵。如同以下所述的單數 `PutRecord` 操作，`PutRecords` 也使用序號和分割區索引鍵。不過，`PutRecord` 的 `SequenceNumberForOrdering` 參數並未包含在 `PutRecords` 呼叫中。`PutRecords` 操作將嘗試依照請求的自然順序處理所有記錄。

每筆資料記錄都有獨一無二的序號。此序號是由 Kinesis Data Streams 在您呼叫 `client.putRecords` 將資料記錄加入至串流後所指派。同一分割區索引鍵的序號通常會隨著時間而增加；逐次 `PutRecords` 請求的間隔期間愈長，序號將變得愈大。

**注意**  
序號不能用做為同一串流中各資料集的索引。若要按照邏輯分隔資料集，請使用分割區索引鍵或為每個資料集建立個別串流。

`PutRecords` 請求可以附上具有不同分割區索引鍵的記錄。請求是以整個串流當成範圍；每次請求均能附上任意組合的分割區索引鍵和記錄，總數最多可達到請求的限額。使用許多不同的分割區索引鍵對具有許多不同碎片的串流發出請求，通常會比使用少量的分割區索引鍵對少量的碎片發出請求更快。分割區索引鍵數目應該要比碎片數目大得多，以減少延遲並達到最高的傳輸量。

#### PutRecords 範例
<a name="kinesis-using-sdk-java-putrecords-example"></a>

以下程式碼會建立 100 筆具有循序分割區索引鍵的資料記錄，並將其放入名為 `DataStream` 的串流。

```
        AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        
        clientBuilder.setRegion(regionName);
        clientBuilder.setCredentials(credentialsProvider);
        clientBuilder.setClientConfiguration(config);
        
        AmazonKinesis kinesisClient = clientBuilder.build();
 
        PutRecordsRequest putRecordsRequest  = new PutRecordsRequest();
        putRecordsRequest.setStreamName(streamName);
        List <PutRecordsRequestEntry> putRecordsRequestEntryList  = new ArrayList<>(); 
        for (int i = 0; i < 100; i++) {
            PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes()));
            putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
            putRecordsRequestEntryList.add(putRecordsRequestEntry); 
        }

        putRecordsRequest.setRecords(putRecordsRequestEntryList);
        PutRecordsResult putRecordsResult  = kinesisClient.putRecords(putRecordsRequest);
        System.out.println("Put Result" + putRecordsResult);
```

`PutRecords` 回應包含回應 `Records` 的陣列。回應陣列中的每筆記錄依照自然順序 (請求和回應的內容由上而下) 與請求陣列中的某一記錄直接相關。回應 `Records` 陣列一律包含與請求陣列相同數目的記錄。

#### 使用 PutRecords 時處理失敗
<a name="kinesis-using-sdk-java-putrecords-handling-failures"></a>

預設情況下，請求中個別記錄的失敗不會造成停止處理 `PutRecords` 請求中的後續記錄。這表示回應 `Records` 陣列包含已成功處理和未成功處理的記錄。您必須偵測未成功處理的記錄，並將其納入到後續呼叫中。

成功的記錄包含 `SequenceNumber` 和 `ShardID` 值，未成功的記錄則包含 `ErrorCode` 和 `ErrorMessage` 值。`ErrorCode` 參數反映錯誤類型，且可能是下列值的其中之一：`ProvisionedThroughputExceededException` 或 `InternalFailure`。`ErrorMessage` 提供關於`ProvisionedThroughputExceededException` 例外的更詳細資訊，包括帳戶 ID、串流名稱、以及遭節制的記錄之碎片 ID。以下範例所示的 `PutRecords` 請求中有三筆記錄。第二筆記錄發生失敗，會反映在回應中。

**Example PutRecords 請求語法**  

```
{
    "Records": [
        {
    	"Data": "XzxkYXRhPl8w",
	    "PartitionKey": "partitionKey1"
        },
        {
    	"Data": "AbceddeRFfg12asd",
	    "PartitionKey": "partitionKey1"	
        },
        {
    	"Data": "KFpcd98*7nd1",
	    "PartitionKey": "partitionKey3"
        }
    ],
    "StreamName": "myStream"
}
```

**Example PutRecords 回應語法**  

```
{
    "FailedRecordCount”: 1,
    "Records": [
        {
	    "SequenceNumber": "21269319989900637946712965403778482371",
	    "ShardId": "shardId-000000000001"

        },
        {
	    “ErrorCode":”ProvisionedThroughputExceededException”,
	    “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."

        },
        {
	    "SequenceNumber": "21269319989999637946712965403778482985",
	    "ShardId": "shardId-000000000002"
        }
    ]
}
```

未成功處理的記錄可納入到後續的 `PutRecords` 請求中。首先，查看 `putRecordsResult` 中的 `FailedRecordCount` 參數以確認請求中是否有失敗的記錄。若有，即應將 `putRecordsEntry` 非 `ErrorCode` 的每個 `null` 加入至後續的請求。如需此處理常式類型的範例，請參閱以下程式碼。

**Example PutRecords 失敗處理常式**  

```
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(myStreamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
for (int j = 0; j < 100; j++) {
    PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
    putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes()));
    putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j));
    putRecordsRequestEntryList.add(putRecordsRequestEntry);
}

putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);

while (putRecordsResult.getFailedRecordCount() > 0) {
    final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>();
    final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords();
    for (int i = 0; i < putRecordsResultEntryList.size(); i++) {
        final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i);
        final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i);
        if (putRecordsResultEntry.getErrorCode() != null) {
            failedRecordsList.add(putRecordRequestEntry);
        }
    }
    putRecordsRequestEntryList = failedRecordsList;
    putRecordsRequest.setRecords(putRecordsRequestEntryList);
    putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);
}
```

### 使用 PutRecord 新增單一記錄
<a name="kinesis-using-sdk-java-putrecord"></a>

逐次呼叫 [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) 對單一記錄進行操作。`PutRecords`所述的 [使用 PutRecords 新增多筆記錄](#kinesis-using-sdk-java-putrecords) 操作方為首選，除非您的應用程式具體需要始終透過單次請求傳送單一記錄，或因其他緣故無法使用 `PutRecords`。

每筆資料記錄都有獨一無二的序號。此序號是由 Kinesis Data Streams 在您呼叫 `client.putRecord` 將資料記錄加入至串流後所指派。同一分割區索引鍵的序號通常會隨著時間而增加；逐次 `PutRecord` 請求的間隔期間愈長，序號將變得愈大。

 快速連續進行放置操作時，不保證傳回的序號會增加，因為放置操作對 Kinesis Data Streams 基本上是同時發生。為保證同一分割區索引鍵的序號嚴格遞增，請使用 `SequenceNumberForOrdering` 參數，如 [PutRecord 範例](#kinesis-using-sdk-java-putrecord-example)的程式碼範例所示。

 無論您是否使用 `SequenceNumberForOrdering`，Kinesis Data Streams 透過 `GetRecords` 呼叫接收的記錄都將依照序號嚴格排序。

**注意**  
序號不能用做為同一串流中各資料集的索引。若要按照邏輯分隔資料集，請使用分割區索引鍵或為每個資料集建立個別串流。

分割區索引鍵用於將串流中的資料分組。資料記錄是根據其分割區索引鍵指派給串流中的碎片。具體而言，Kinesis Data Streams 使用分割區索引鍵做為雜湊函數的輸入，由該函數將分割區索引鍵 (和相關聯的資料) 對應到特定碎片。

 經過此雜湊處理機制，具有相同分割區索引鍵的所有資料記錄會對應到串流中的同一碎片。然而，若分割區索引鍵數目多過碎片數目，某些碎片即必定包含具有不同分割區索引鍵的記錄。從設計的角度來看，為確保您的所有碎片獲得充分利用，碎片數目 (由 `setShardCount` 的 `CreateStreamRequest` 方法指定) 應遠少於獨一分割區索引鍵的數目，且流向單一分割區索引鍵的資料量應遠少於碎片容量。

#### PutRecord 範例
<a name="kinesis-using-sdk-java-putrecord-example"></a>

以下程式碼會建立 10 筆跨兩個分割區索引鍵分佈的資料記錄，並將其放入名為 `myStreamName` 的串流。

```
for (int j = 0; j < 10; j++) 
{
  PutRecordRequest putRecordRequest = new PutRecordRequest();
  putRecordRequest.setStreamName( myStreamName );
  putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() ));
  putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 ));  
  putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord );
  PutRecordResult putRecordResult = client.putRecord( putRecordRequest );
  sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber();
}
```

上述程式碼範例使用 `setSequenceNumberForOrdering` 保證每個分割區索引鍵內的順序嚴格遞增。為求有效使用此參數，將目前記錄 `SequenceNumberForOrdering` (記錄 *n*) 設為前一記錄 (記錄 *n-1*) 的序號。為取得已加入至串流的記錄其序號，則對 `getSequenceNumber` 的結果呼叫 `putRecord`。

`SequenceNumberForOrdering` 參數可確保嚴格遞增分割區索引鍵的序號。`SequenceNumberForOrdering` 不提供跨多個分割區索引鍵的記錄排序。

# 使用 AWS Glue 結構描述登錄檔與資料互動
<a name="kinesis-integration-glue-schema-registry"></a>

您可以將 Kinesis 資料串流與 AWS Glue 結構描述登錄檔整合。 AWS Glue 結構描述登錄檔可讓您集中探索、控制和發展結構描述，同時確保已註冊結構描述持續驗證產生的資料。結構描述定義資料記錄的結構和格式。結構描述是可靠的資料發佈、耗用或儲存的版本化規格。 AWS Glue 結構描述登錄檔可讓您改善串流應用程式中end-to-end資料品質和資料控管。如需詳細資訊，請參閱 [AWS Glue 結構描述登錄檔](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)。設定此整合的其中一個方法是透過 AWS Java SDK 中提供的 `PutRecords` 和 `PutRecord` Kinesis Data Streams API。

如需如何使用 PutRecords 和 PutRecord Kinesis Data Streams APIs 設定 Kinesis Data Streams 與結構描述登錄整合的詳細說明，請參閱[使用案例：整合 Amazon Kinesis Data Streams 與 AWS Glue 結構描述登錄](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)檔中的「使用 Kinesis Data Streams APIs 與資料互動」一節。