

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 開發消費者 適用於 Java 的 AWS SDK
<a name="develop-consumers-sdk"></a>

 您可以使用 Amazon Kinesis Data Streams APIs 開發自訂消費者。本節說明搭配 使用 Kinesis Data Streams APIs 適用於 Java 的 AWS SDK。

**重要**  
若要開發具有共用輸送量的自訂 Kinesis Data Streams 取用者，建議使用方法為使用 Kinesis Client Library (KCL)。KCL 會處理與分散式運算相關的許多複雜任務，協助您取用和處理 Kinesis 資料串流中的資料。如需詳細資訊，請參閱[在 Java 中使用 KCL 開發消費者](develop-kcl-consumers-java.md)。

**Topics**
+ [使用 開發共用輸送量消費者 適用於 Java 的 AWS SDK](developing-consumers-with-sdk.md)
+ [使用 開發增強型廣發消費者 適用於 Java 的 AWS SDK](building-enhanced-consumers-api.md)
+ [使用 AWS Glue 結構描述登錄檔與資料互動](building-enhanced-consumers-glue-schema-registry.md)

# 使用 開發共用輸送量消費者 適用於 Java 的 AWS SDK
<a name="developing-consumers-with-sdk"></a>

開發具有 共用的自訂 Kinesis Data Streams 取用者的方法之一是搭配 使用 Amazon Kinesis Data Streams APIs 適用於 Java 的 AWS SDK。本節說明搭配 使用 Kinesis Data Streams APIs 適用於 Java 的 AWS SDK。此外，您亦可使用其他程式設計語言呼叫 Kinesis Data Streams API。如需所有 AWS SDKs的詳細資訊，請參閱[使用 Amazon Web Services 開始開發](https://aws.amazon.com/developers/getting-started/)。

本節中的 Java 範例程式碼示範如何執行基本 Kinesis Data Streams API 操作，並以邏輯方式依操作類型分割。這些範例不代表可立即生產的程式碼，無法檢查出所有可能的例外狀況，也不可視為任何潛在安全或效能疑慮的原因。

**Topics**
+ [從串流取得資料](#kinesis-using-sdk-java-get-data)
+ [使用碎片疊代運算](#kinesis-using-sdk-java-get-data-shard-iterators)
+ [使用 GetRecords](#kinesis-using-sdk-java-get-data-getrecords)
+ [適應重新碎片](#kinesis-using-sdk-java-get-data-reshard)

## 從串流取得資料
<a name="kinesis-using-sdk-java-get-data"></a>

Kinesis Data Streams API 包括可調用以從資料串流擷取記錄的 `getShardIterator` 和 `getRecords` 方法。此為提取模型，將由您的程式碼直接從資料串流中的碎片取出資料記錄。

**重要**  
建議您使用 KCL 所提供的記錄處理器支援，從資料串流擷取資料。此為推送模型，需由您實作程式碼以處理資料。KCL 會從資料串流擷取資料記錄並將其傳遞至您應用程式的程式碼。此外，KCL 另有提供容錯移轉、復原及負載平衡功能。如需詳細資訊，請參閱[使用 KCL 開發具有共用輸送量的自訂取用者](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html)。

不過，在某些情況下，您可能會更喜歡使用 Kinesis Data Streams API。例如，您要實作自訂工具以用於對資料串流進行監控或偵錯。

**重要**  
Kinesis Data Streams 支援對資料串流變更資料記錄保留期間。如需詳細資訊，請參閱[變更資料保留期間](kinesis-extended-retention.md)。

## 使用碎片疊代運算
<a name="kinesis-using-sdk-java-get-data-shard-iterators"></a>

您將以碎片為基本單位，從串流擷取記錄。針對每個碎片，以及從該碎片擷取的各個批次的記錄，您必須取得*碎片疊代運算*。碎片疊代運算是供 `getRecordsRequest` 物件用於指定從中擷取記錄的碎片位置。與碎片疊代運算相關聯的類型決定了應從碎片中的哪個點擷取記錄 (詳細資訊請參閱本節稍後說明)。您必須先擷取碎片，才能使用碎片疊代運算。如需詳細資訊，請參閱[列出碎片](kinesis-using-sdk-java-list-shards.md)。

使用 `getShardIterator` 方法可取得初始碎片疊代運算。如欲就額外各個批次的記錄取得碎片疊代運算，請使用 `getNextShardIterator` 方法所傳回 `getRecordsResult` 物件的 `getRecords` 方法。碎片疊代運算的有效期為 5 分鐘。若您使用了仍在有效期內的碎片疊代運算，則會進行一次新的疊代運算。每一次碎片疊代運算將於 5 分鐘內維持有效，即便其已使用過亦然。

若要取得初始碎片疊代運算，請執行個體化 `GetShardIteratorRequest` 並將其傳遞給 `getShardIterator` 方法。若要設定請求，則指定串流和碎片 ID。如需有關如何在 AWS 帳戶中取得串流的資訊，請參閱 [列出串流](kinesis-using-sdk-java-list-streams.md)。如需如何取得串流中各個碎片的相關資訊，請參閱[列出碎片](kinesis-using-sdk-java-list-shards.md)。

```
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(myStreamName);
getShardIteratorRequest.setShardId(shard.getShardId());
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");

GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
```

此範本程式碼在取得初始碎片疊代運算時指定 `TRIM_HORIZON` 做為疊代運算類型。這種疊代運算類型表示應從加入至碎片的第一筆記錄開始傳回各記錄，而不是從最近加入的記錄 (又稱為*頂端*) 開始。可用的疊代運算類型如下：
+ `AT_SEQUENCE_NUMBER`
+ `AFTER_SEQUENCE_NUMBER`
+ `AT_TIMESTAMP`
+ `TRIM_HORIZON`
+ `LATEST`

如需詳細資訊，請參閱 [ShardIteratorType](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType)。

某些疊代運算類型除了表明類型外還需要指定序號，例如：

```
getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER");
getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);
```

一旦使用 `getRecords` 取得記錄後，您即可呼叫該記錄的 `getSequenceNumber` 方法以取得該記錄的序號。

```
record.getSequenceNumber()
```

此外，加入記錄至資料串流的程式碼也可透過對 `getSequenceNumber` 的結果呼叫 `putRecord` 取得所加入某一筆記錄的序號。

```
lastSequenceNumber = putRecordResult.getSequenceNumber();
```

使用序號可以保證各個記錄的順序嚴格遞增。如需詳細資訊，請參閱 [PutRecord 範例](developing-producers-with-sdk.md#kinesis-using-sdk-java-putrecord-example)所提供的程式碼範例。

## 使用 GetRecords
<a name="kinesis-using-sdk-java-get-data-getrecords"></a>

取得碎片疊代運算之後，執行個體化 `GetRecordsRequest` 物件。使用 `setShardIterator` 方法指定請求所用的疊代運算。

您也可以使用 `setLimit` 方法，選擇性設定欲擷取的記錄數目。由 `getRecords` 傳回的記錄數目一定等於或少於此限制。如果您未指定此限制，`getRecords` 將傳回已擷取的 10 MB 記錄。以下範本程式碼將此限制設為 25 筆記錄。

如果沒有傳回任何記錄，即表示此碎片在碎片疊代運算所參考的序號位置目前無資料記錄可用。在這種情況下，您的應用程式應該等待一段適當時間處理串流的資料來源。接著再次使用前次呼叫 `getRecords` 所傳回的碎片疊代運算，嘗試從碎片取得資料。

將 `getRecordsRequest` 傳遞給 `getRecords` 方法，並且擷取 `getRecordsResult` 物件形式的傳回值。若要取得資料記錄，請對 `getRecords` 物件呼叫 `getRecordsResult` 方法。

```
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(25);

GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
```

為了準備對 `getRecords` 進行另一次呼叫，請透過 `getRecordsResult` 取得下一碎片疊代運算。

```
shardIterator = getRecordsResult.getNextShardIterator();
```

為了獲得最佳結果，逐次呼叫 `getRecords` 間隔期間應至少休眠 1 秒 (1,000 毫秒) 以免超出 `getRecords` 頻率的限制。

```
try {
  Thread.sleep(1000);
}
catch (InterruptedException e) {}
```

一般而言，您應於迴圈中呼叫 `getRecords`，即便是在測試情況下擷取單一記錄亦然。僅呼叫一次 `getRecords` 可能會傳回空的記錄清單，就算是碎片在後續的序號位置包含更多記錄也不免如此。若發生這種情況，便會傳回 `NextShardIterator` 且空的記錄清單將參考碎片中的某一後續序號，因而連續呼叫 `getRecords` 最終將傳回記錄。以下範例示範迴圈的用法。

**範例：getRecords**  
以下程式碼範例反映了本節所述的 `getRecords` 技巧，包括在迴圈中發出呼叫。

```
// Continuously read data records from a shard
List<Record> records;
    
while (true) {
   
  // Create a new getRecordsRequest with an existing shardIterator 
  // Set the maximum records to return to 25
  
  GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
  getRecordsRequest.setShardIterator(shardIterator);
  getRecordsRequest.setLimit(25); 

  GetRecordsResult result = client.getRecords(getRecordsRequest);
  
  // Put the result into record list. The result can be empty.
  records = result.getRecords();
  
  try {
    Thread.sleep(1000);
  } 
  catch (InterruptedException exception) {
    throw new RuntimeException(exception);
  }
  
  shardIterator = result.getNextShardIterator();
}
```

如果您是使用 Kinesis Client Library，可能要進行多次呼叫後才會傳回資料。此行為是依據設計，並不表示 KCL 或您的資料有問題。

## 適應重新碎片
<a name="kinesis-using-sdk-java-get-data-reshard"></a>

 如果 `getRecordsResult.getNextShardIterator` 傳回 `null`，則表示發生了涉及此碎片的碎片分割或合併。此碎片現在處於 `CLOSED` 狀態，並且您已從此碎片讀取所有可用的資料記錄。

 在這個案例中，您可以使用 `getRecordsResult.childShards` 來了解分割或合併所建立之正在處理之碎片的新子碎片。如需詳細資訊，請參閱 [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html)。

 若是發生分割，兩個新碎片的 `parentShardId` 將與您先前處理的碎片其碎片 ID 相同。該兩個碎片的 `adjacentParentShardId` 值皆為 `null`。

 若是發生合併，經由合併所建立的單個新碎片其 `parentShardId` 將等於其中一個父碎片的碎片 ID，且 `adjacentParentShardId` 等於另一個父碎片的碎片 ID。您的應用程式已從上述其中一個碎片讀取了所有資料。對於該碎片，`getRecordsResult.getNextShardIterator` 已傳回 `null`。如果資料的順序對您的應用程式很重要，請確保您亦已從另一個父碎片讀取了所有資料，而後再從經由合併所建立的子碎片讀取任何新資料。

 若您使用多個處理器從串流擷取資料 (假設每個碎片各一個處理器)，則一旦發生碎片分割或合併時，請調升或調降處理器數目以適應碎片數目的變更。

 如需重新分片的詳細資訊，包括碎片狀態 (如 `CLOSED`) 方面的討論，請參閱[重新分片串流](kinesis-using-sdk-java-resharding.md)。

# 使用 開發增強型廣發消費者 適用於 Java 的 AWS SDK
<a name="building-enhanced-consumers-api"></a>

*增強型散發功能*是 Amazon Kinesis Data Streams 的一項功能，使取用者從資料串流接收記錄時，專用輸送量可高達每個碎片每秒 2 MB 的資料。使用強化廣發功能的消費者不必與其他從串流接收資料的消費者競爭。如需詳細資訊，請參閱[開發具有專用輸送量的增強型廣發消費者](enhanced-consumers.md)。

您可以使用 API 操作，為 Kinesis Data Streams 建置使用增強型散發功能的取用者。

**使用 Kinesis Data Streams API 註冊具有增強型散發功能的取用者**

1. 呼叫 [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html) 將您的應用程式註冊為使用增強型散發功能的取用者。Kinesis Data Streams 會為該取用者產生 Amazon Resource Name (ARN) 並隨回應傳回其值。

1. 若要開始接聽特定碎片，請呼叫 [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) 並傳遞取用者 ARN。Kinesis Data Streams 隨後會透過 HTTP/2 連線，開始從該碎片將記錄以 [SubscribeToShardEvent](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html) 類型的事件形式推送給您。此連線將保持開啟長達 5 分鐘。若您希望於 [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) 呼叫所傳回的 `future` 正常或異常完成後繼續從該碎片接收記錄，請再次呼叫 [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)。
**注意**  
當到達當前碎片的末尾時，`SubscribeToShard` API 還返回當前碎片的子碎片清單。

1. 若要將使用強化廣發功能的消費者取消註冊，請呼叫 [DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)。

以下範例程式碼示範如何為消費者訂閱碎片、定期續約訂閱和處理事件。

```
    import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
    import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
     
    import java.util.concurrent.CompletableFuture;
     
    /**
     * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java
     * for complete code and more examples.
     */
    public class SubscribeToShardSimpleImpl {
     
        private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737";
        private static final String SHARD_ID = "shardId-000000000000";
     
        public static void main(String[] args) {
     
            KinesisAsyncClient client = KinesisAsyncClient.create();
     
            SubscribeToShardRequest request = SubscribeToShardRequest.builder()
                    .consumerARN(CONSUMER_ARN)
                    .shardId(SHARD_ID)
                    .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
     
            // Call SubscribeToShard iteratively to renew the subscription periodically.
            while(true) {
                // Wait for the CompletableFuture to complete normally or exceptionally.
                callSubscribeToShardWithVisitor(client, request).join();
            }
     
            // Close the connection before exiting.
            // client.close();
        }
     
     
        /**
         * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface.
         */
        private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) {
            SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() {
                @Override
                public void visit(SubscribeToShardEvent event) {
                    System.out.println("Received subscribe to shard event " + event);
                }
            };
            SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                    .builder()
                    .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                    .subscriber(visitor)
                    .build();
            return client.subscribeToShard(request, responseHandler);
        }
    }
```

 如果 `event.ContinuationSequenceNumber` 傳回 `null`，則表示發生了涉及此碎片的碎片分割或合併。此碎片現在處於 `CLOSED` 狀態，並且您已從此碎片讀取所有可用的資料記錄。在這個案例中，根據上述範例，您可以使用 `event.childShards` 來了解分割或合併所建立之正在處理之碎片的新子碎片。如需詳細資訊，請參閱 [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html)。

# 使用 AWS Glue 結構描述登錄檔與資料互動
<a name="building-enhanced-consumers-glue-schema-registry"></a>

您可以將 Kinesis 資料串流與 AWS Glue 結構描述登錄檔整合。 AWS Glue 結構描述登錄檔可讓您集中探索、控制和發展結構描述，同時確保已註冊結構描述持續驗證產生的資料。結構描述定義資料記錄的結構和格式。結構描述是可靠的資料發佈、耗用或儲存的版本化規格。 AWS Glue 結構描述登錄檔可讓您改善串流應用程式中end-to-end資料品質和資料控管。如需詳細資訊，請參閱 [AWS Glue 結構描述登錄檔](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)。設定此整合的其中一種方法是透過 AWS Java 開發套件中提供的 `GetRecords` Kinesis Data Streams API。

如需如何使用 Kinesis Data Streams APIs 設定 `GetRecords` Kinesis Data Streams 與結構描述登錄檔整合的詳細說明，請參閱[使用案例：整合 Amazon Kinesis Data Streams 與 Glue 結構描述登錄檔中的「使用 Kinesis Data Streams AWS](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) APIs 與資料互動」一節。