

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 在 Java 中使用 KCL 開發消費者
<a name="develop-kcl-consumers-java"></a>

## 先決條件
<a name="develop-kcl-consumers-java-prerequisites"></a>

開始使用 KCL 3.x 之前，請確定您有下列項目：
+ Java 開發套件 (JDK) 8 或更新版本
+ 適用於 Java 的 AWS SDK 2.x
+ Maven 或 Gradle 用於相依性管理

KCL 會從工作者正在執行的運算主機收集 CPU 使用率指標，例如 CPU 使用率，以平衡負載，實現工作者之間的平均資源使用率層級。若要讓 KCL 從工作者收集 CPU 使用率指標，您必須符合下列先決條件：

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ 您的作業系統必須是 Linux 作業系統。
+ 您必須在 EC[IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)2。 EC2 

 **Amazon EC2 上的 Amazon EC2)**
+ 您的作業系統必須是 Linux 作業系統。
+ 您必須啟用 [ECS 任務中繼資料端點第 4 版](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html)。
+ 您的 Amazon ECS 容器代理程式版本必須為 1.39.0 或更新版本。

 **上的 Amazon ECS AWS Fargate**
+ 您必須啟用 [Fargate 任務中繼資料端點第 4 版](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html)。如果您使用 Fargate 平台 1.4.0 版或更新版本，預設會啟用此功能。
+ Fargate 平台 1.4.0 版或更新版本。

 **Amazon EC2 上的 Amazon Elastic Kubernetes Service (Amazon EKS)** 
+ 您的作業系統必須是 Linux 作業系統。

 **上的 Amazon EKS AWS Fargate**
+ Fargate 平台 1.3.0 或更新版本。

**重要**  
如果 KCL 無法從工作者收集 CPU 使用率指標，KCL 會回復為使用每個工作者的輸送量來指派租用，並平衡機群中工作者的負載。如需詳細資訊，請參閱[KCL 如何指派租用給工作者並平衡負載](kcl-dynamoDB.md#kcl-assign-leases)。

## 安裝和新增相依性
<a name="develop-kcl-consumers-java-installation"></a>

如果您使用的是 Maven，請將下列相依性新增至 `pom.xml` 檔案。請確定您已將 3.x.x 取代為最新的 KCL 版本。

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

如果您使用的是 Gradle，請將以下內容新增至您的 `build.gradle` 檔案。請確定您已將 3.x.x 取代為最新的 KCL 版本。

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

您可以在 [Maven Central Repository](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client) 上檢查 KCL 的最新版本。

## 實作消費者
<a name="develop-kcl-consumers-java-implemetation"></a>

KCL 取用者應用程式包含下列重要元件：

**Topics**
+ [RecordProcessor](#implementation-recordprocessor)
+ [RecordProcessorFactory](#implementation-recordprocessorfactory)
+ [排程器](#implementation-scheduler)
+ [主要消費者應用程式](#implementation-main)

### RecordProcessor
<a name="implementation-recordprocessor"></a>

RecordProcessor 是處理 Kinesis 資料串流記錄之業務邏輯所在的核心元件。它定義您的應用程式如何處理從 Kinesis 串流接收到的資料。

主要責任：
+ 初始化碎片的處理
+ 處理來自 Kinesis 串流的記錄批次
+ 碎片的關閉處理 （例如，當碎片分割或合併時，或將租用移交給另一個主機時）
+ 處理檢查點以追蹤進度

以下顯示實作範例：

```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class SampleRecordProcessor implements ShardRecordProcessor {
    private static final String SHARD_ID_MDC_KEY = "ShardId";
    private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.shardId();
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size());
            processRecordsInput.records().forEach(r -> 
                log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
            );
            
            // Checkpoint periodically
            processRecordsInput.checkpointer().checkpoint();
        } catch (Throwable t) {
            log.error("Caught throwable while processing records. Aborting.", t);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Lost lease, so terminating.");
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }
}
```

以下是範例中使用的每個方法的詳細說明：

**initialize(InitializationInput initializationInput)**
+ 目的：設定處理記錄所需的任何資源或狀態。
+ 呼叫時：一次，當 KCL 將碎片指派給此記錄處理器時。
+ 重點：
  + `initializationInput.shardId()`：此處理器將處理的碎片 ID。
  + `initializationInput.extendedSequenceNumber()`：開始處理的序號。

**processRecords(ProcessRecordsInput processRecordsInput)**
+ 目的：處理傳入的記錄和選擇性檢查點進度。
+ 呼叫時：重複，只要記錄處理器保留碎片的租用即可。
+ 重點：
  + `processRecordsInput.records()`：要處理的記錄清單。
  + `processRecordsInput.checkpointer()`：用於檢查點進度。
  + 請務必在處理期間處理任何例外狀況，以防止 KCL 失敗。
  + 此方法應該是等冪的，因為在某些情況下，相同的記錄可能會處理多次，例如在意外工作者當機或重新啟動之前尚未檢查點的資料。
  + 在檢查點之前，請務必清除任何緩衝的資料，以確保資料一致性。

**leaseLost(LeaseLostInput leaseLostInput)**
+ 目的：清除處理此碎片的任何特定資源。
+ 呼叫時：當另一個排程器接管此碎片的租用時。
+ 重點：
  + 此方法不允許檢查點。

**shardEnded(ShardEndedInput shardEndedInput)**
+ 目的：完成此碎片和檢查點的處理。
+ 當呼叫時：當碎片分割或合併時，表示已處理此碎片的所有資料。
+ 重點：
  + `shardEndedInput.checkpointer()`：用來執行最終檢查點。
  + 若要完成處理，必須使用此方法中的檢查點。
  + 若未在此排清資料和檢查點，可能會在碎片重新開啟時導致資料遺失或重複處理。

**shutdownRequested(ShutdownRequestedInput shutdownRequestedInput)**
+ 目的：KCL 關閉時檢查和清除資源。
+ 呼叫時：KCL 關閉時，例如應用程式終止時）。
+ 重點：
  + `shutdownRequestedInput.checkpointer()`：用於在關機前執行檢查點。
  + 請務必在 方法中實作檢查點，以便在應用程式停止之前儲存進度。
  + 若未在此排清資料和檢查點，可能會導致應用程式重新啟動時資料遺失或重新處理記錄。

**重要**  
KCL 3.x 透過在前一個工作者關閉之前進行檢查點，確保將租用從一個工作者交付到另一個工作者時，資料重新處理次數更少。如果您未在 `shutdownRequested()`方法中實作檢查點邏輯，則不會看到此好處。請確定您已在 `shutdownRequested()`方法中實作檢查點邏輯。

### RecordProcessorFactory
<a name="implementation-recordprocessorfactory"></a>

RecordProcessorFactory 負責建立新的 RecordProcessor 執行個體。KCL 使用此工廠為應用程式需要處理的每個碎片建立新的 RecordProcessor。

主要責任：
+ 視需要建立新的 RecordProcessor 執行個體
+ 確定每個 RecordProcessor 都已正確初始化

以下是實作範例：

```
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new SampleRecordProcessor();
    }
}
```

在此範例中，每次呼叫 shardRecordProcessor() 時，工廠都會建立新的 SampleRecordProcessor。 shardRecordProcessor 您可以將此延伸到包含任何必要的初始化邏輯。

### 排程器
<a name="implementation-scheduler"></a>

排程器是一種高階元件，可協調 KCL 應用程式的所有活動。它負責資料處理的整體協調。

主要責任：
+ 管理 RecordProcessors 的生命週期
+ 處理碎片的租用管理
+ 座標檢查點
+ 在應用程式的多個工作者之間平衡碎片處理負載
+ 處理正常關機和應用程式終止訊號

排程器通常會在主要應用程式中建立和啟動。您可以在下一節主要消費者應用程式中查看排程器的實作範例。

### 主要消費者應用程式
<a name="implementation-main"></a>

主要消費者應用程式將所有元件連結在一起。它負責設定 KCL 取用者、建立必要的用戶端、設定排程器，以及管理應用程式的生命週期。

主要責任：
+ 設定 AWS 服務用戶端 (Kinesis、DynamoDB、CloudWatch)
+ 設定 KCL 應用程式
+ 建立和啟動排程器
+ 處理應用程式關閉

以下是實作範例：

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;

public class SampleConsumer {
    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    public SampleConsumer(String streamName, Region region) {
        this.streamName = streamName;
        this.region = region;
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    public void run() {
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamName, 
            streamName, 
            kinesisClient, 
            dynamoDbAsyncClient,
            cloudWatchClient, 
            UUID.randomUUID().toString(), 
            new SampleRecordProcessorFactory()
        );

        Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

    public static void main(String[] args) {
        String streamName = "your-stream-name"; // replace with your stream name
        Region region = Region.US_EAST_1; // replace with your region
        new SampleConsumer(streamName, region).run();
    }
}
```

 KCL 預設會建立具有專用輸送量的增強型廣發 (EFO) 取用者。如需增強型廣發功能的詳細資訊，請參閱[開發具有專用輸送量的增強型廣發消費者](enhanced-consumers.md)。如果您的取用者少於 2 個，或不需要 200 毫秒以下的讀取傳播延遲，您必須在排程器物件中設定下列組態，以使用共用輸送量取用者：

```
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
```

下列程式碼是建立使用共用輸送量取用者之排程器物件的範例：

**匯入**：

```
import software.amazon.kinesis.retrieval.polling.PollingConfig;
```

**程式碼**：

```
Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );/
```