

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 將消費者從 KCL 1.x 遷移至 KCL 2.x
<a name="kcl-migration"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

此主題說明 Kinesis Client Library (KCL) 版本 1.x 和 2.x 之間的差異。她還說明如何將取用者從 KCL 的版本 1.x 遷移至版本 2.x。遷移用戶端後，該用戶端會從前一個檢查點的位置開始處理記錄。

2.0 版 KCL 引進了以下的介面變更：


**KCL 介面變更**  

| KCL 1.x 介面 | KCL 2.0 介面 | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | 整併至 software.amazon.kinesis.processor.ShardRecordProcessor | 

**Topics**
+ [遷移記錄處理器](#recrod-processor-migration)
+ [遷移記錄處理器工廠](#recrod-processor-factory-migration)
+ [遷移工作者](#worker-migration)
+ [設定 Amazon Kinesis 用戶端](#client-configuration)
+ [閒置時間移除](#idle-time-removal)
+ [用戶端組態移除](#client-configuration-removals)

## 遷移記錄處理器
<a name="recrod-processor-migration"></a>

以下範例顯示基於 KCL 1.x 所實作的記錄處理器：

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**遷移記錄處理器類別**

1. 將介面從 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` 和 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` 更改為 `software.amazon.kinesis.processor.ShardRecordProcessor`，如下所示：

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. 更新 `import` 和 `initialize` 方法的 `processRecords` 陳述式。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. 將 `shutdown` 方法取代為以下的新方法：`leaseLost`、`shardEnded` 和 `shutdownRequested`。

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

記錄處理器類別經更新後的版本如下。

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## 遷移記錄處理器工廠
<a name="recrod-processor-factory-migration"></a>

記錄處理器處理站負責在取得租用時建立記錄處理器。以下是 KCL 1.x 處理站的範例。

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**遷移記錄處理器處理站**

1. 將實作的介面從 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` 更改為 `software.amazon.kinesis.processor.ShardRecordProcessorFactory`，如下所示。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. 更改 `createProcessor` 的傳回簽章。

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

以下是 2.0 版記錄處理器處理站的範例：

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## 遷移工作者
<a name="worker-migration"></a>

在 KCL 的版本 2.0，名為 `Scheduler`​ 的新類別會取代 ​`Worker` 類別。以下是 KCL 1.x 工作者的範例。

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**移轉至工作者**

1. 將 `Worker` 類別的 `import` 陳述式變更為 `Scheduler` 和 `ConfigsBuilder` 類別的匯入陳述式。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. 建立 `ConfigsBuilder` 和 `Scheduler`，如下列範例所示：

   建議您在 `KinesisAsyncClient` 中，使用 `KinesisClientUtil` 來建立 `KinesisAsyncClient` 及設定 `maxConcurrency`。
**重要**  
Amazon Kinesis Client 可能會明顯發生延遲，除非您設定 `KinesisAsyncClient` 的 `maxConcurrency` 夠高，足以運作所有的租賃服務，並可額外使用 `KinesisAsyncClient`。

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## 設定 Amazon Kinesis 用戶端
<a name="client-configuration"></a>

隨著 2.0 版 Kinesis Client Library 的推出，用戶端組態已從單一組態類別 (`KinesisClientLibConfiguration`) 進展為六個組態類別。下表說明遷移情形。


**組態欄位及其新類別**  

| 原始欄位 | 新的組態類別 | Description | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | 此 KCL 應用程式的名稱。用做為 tableName 和 consumerName 的預設值。 | 
| tableName | ConfigsBuilder | 允許覆寫用於 Amazon DynamoDB 租用資料表的資料表名稱。 | 
| streamName | ConfigsBuilder | 此應用程式從中處理其記錄的串流名稱。 | 
| kinesisEndpoint | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| dynamoDBEndpoint | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| initialPositionInStreamExtended | RetrievalConfig | KCL 開始擷記錄所在碎片中的位置，透過應用程式初次執行開始。 | 
| kinesisCredentialsProvider | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| dynamoDBCredentialsProvider | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| cloudWatchCredentialsProvider | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| failoverTimeMillis | LeaseManagementConfig | 將租用擁有者視為失敗前必須經過的毫秒數。 | 
| workerIdentifier | ConfigsBuilder | 代表應用程式處理器本項實例的唯一識別符。其必須獨一無二。 | 
| shardSyncIntervalMillis | LeaseManagementConfig | 碎片同步呼叫的時間。 | 
| maxRecords | PollingConfig | 允許設定 Kinesis 傳回的記錄數上限。 | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | 此選項已經移除。請參閱「閒置時間移除項目」一節。 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | 設定時，即使 Kinesis 無提供任何記錄也會呼叫記錄處理器。 | 
| parentShardPollIntervalMillis | CoordinatorConfig | 記錄處理器應該輪詢以檢查父碎片是否已完成的頻率。 | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | 設定時，只要已開始處理子租用就會隨即移除租用。 | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | 設定時，會忽略具有開放碎片的子碎片。此項主要用於 DynamoDB Streams。 | 
| kinesisClientConfig | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| dynamoDBClientConfig | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| cloudWatchClientConfig | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| taskBackoffTimeMillis | LifecycleConfig | 重試失敗任務的等待時間。 | 
| metricsBufferTimeMillis | MetricsConfig | 控制 CloudWatch 指標發佈。 | 
| metricsMaxQueueSize | MetricsConfig | 控制 CloudWatch 指標發佈。 | 
| metricsLevel | MetricsConfig | 控制 CloudWatch 指標發佈。 | 
| metricsEnabledDimensions | MetricsConfig | 控制 CloudWatch 指標發佈。 | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | 此選項已經移除。請參閱「檢查點序號驗證」一節。 | 
| regionName | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| maxLeasesForWorker | LeaseManagementConfig | 應用程式的單一執行個體應該接受的租用數上限。 | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | 應用程式一次應該嘗試挪用的租用數上限。 | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | Kinesis Client Library 需要建立新的 DynamoDB 租用資料表時所使用的 DynamoDB 讀取 IOP。 | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | Kinesis Client Library 需要建立新的 DynamoDB 租用資料表時所使用的 DynamoDB 讀取 IOP。 | 
| initialPositionInStreamExtended | LeaseManagementConfig | 應用程式應該在串流中開始的初始位置。這僅在初次建立租用時使用。 | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | 如果租用資料表包含現有的租用，即停用同步處理碎片資料。TODO：KinesisEco-438 | 
| shardPrioritization | CoordinatorConfig | 要使用哪些碎片優先順序 | 
| shutdownGraceMillis | N/A | 此選項已經移除。請參閱「MultiLang 移除項目」一節。 | 
| timeoutInSeconds | N/A | 此選項已經移除。請參閱「MultiLang 移除項目」一節。 | 
| retryGetRecordsInSeconds | PollingConfig | 為故障設定 GetRecords 嘗試間的延遲。 | 
| maxGetRecordsThreadPool | PollingConfig | 用於 GetRecords 的執行緒集區大小。 | 
| maxLeaseRenewalThreads | LeaseManagementConfig | 控制租用續約執行緒集區的大小。應用程式可容納的租用數愈多，此集區就應該愈大。 | 
| recordsFetcherFactory | PollingConfig | 允許將工廠進行替換，該工廠會用於建立擷取程式，而該擷取程式會從串流進行擷取。 | 
| logWarningForTaskAfterMillis | LifecycleConfig | 任務未完成的情況下要等待多久的時間才記錄警告。 | 
| listShardsBackoffTimeInMillis | RetrievalConfig | 呼叫 ListShards 發生錯誤時將等待的間隔毫秒數。 | 
| maxListShardsRetryAttempts | RetrievalConfig | ListShards 在放棄之前重試的次數上限。 | 

## 閒置時間移除
<a name="idle-time-removal"></a>

1.x 版 KCL 的 `idleTimeBetweenReadsInMillis` 對應於兩種計量：
+ 任務分派檢查的間隔時間量。您現在可以透過設定 `CoordinatorConfig#shardConsumerDispatchPollIntervalMillis`，設定各任務的此一間隔時間。
+ 當 Kinesis Data Streams 未傳回任何記錄時將休眠的時間量。在 2.0 版中，具強化廣發功能的記錄是自其各自的擷取器推送。僅當推送的請求送達時，碎片消費者才會發生活動。

## 用戶端組態移除
<a name="client-configuration-removals"></a>

在 2.0 版中，KCL 不再建立用戶端。其端賴使用者提供有效的用戶端。基於此項變更，所有控制用戶端建立的組態參數皆已移除。若您需要這類參數，可以先就用戶端進行所需設定再將用戶端提供予 `ConfigsBuilder`。


****  

| 已移除的欄位 | 等效組態 | 
| --- | --- | 
| kinesisEndpoint | 使用慣用的端點設定開發套件 KinesisAsyncClient：KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build()。 | 
| dynamoDBEndpoint | 使用慣用的端點設定開發套件 DynamoDbAsyncClient：DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build()。 | 
| kinesisClientConfig | 使用所需的組態設定開發套件 KinesisAsyncClient：KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build()。 | 
| dynamoDBClientConfig | 使用所需的組態設定開發套件 DynamoDbAsyncClient：DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build()。 | 
| cloudWatchClientConfig | 使用所需的組態設定開發套件 CloudWatchAsyncClient：CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build()。 | 
| regionName | 使用慣用的區域設定開發套件。所有開發套件用戶端的做法皆相同。例如 KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build()。 | 