本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
在 Java 中使用 KCL 開發消費者
先決條件
開始使用 KCL 3.x 之前,請確定您擁有下列項目:
-
Java 開發套件 (JDK) 8 或更新版本
-
AWS SDK for Java 2.x
-
Maven 或 Gradle 用於相依性管理
KCL 會從工作者正在執行的運算主機收集 CPU 使用率指標,例如 CPU 使用率,以平衡負載,實現工作者之間的平均資源使用率水準。若要讓 KCL 從工作者收集 CPU 使用率指標,您必須符合下列先決條件:
Amazon Elastic Compute Cloud(Amazon EC2)
-
您的作業系統必須是 Linux 作業系統。
-
您必須在 ECIMDSv22。 EC2
Amazon EC2 上的 Amazon EC2)
-
您的作業系統必須是 Linux 作業系統。
-
您必須啟用 ECS 任務中繼資料端點第 4 版。
-
您的 Amazon ECS 容器代理程式版本必須為 1.39.0 或更新版本。
上的 Amazon ECS AWS Fargate
-
您必須啟用 Fargate 任務中繼資料端點第 4 版。如果您使用 Fargate 平台 1.4.0 版或更新版本,預設會啟用此功能。
-
Fargate 平台 1.4.0 版或更新版本。
Amazon EC2 上的 Amazon Elastic Kubernetes Service (Amazon EKS)
-
您的作業系統必須是 Linux 作業系統。
上的 Amazon EKS AWS Fargate
-
Fargate 平台 1.3.0 或更新版本。
重要
如果 KCL 無法從工作者收集 CPU 使用率指標,KCL 會回復為使用每個工作者的輸送量來指派租用,並平衡機群中工作者的負載。如需詳細資訊,請參閱KCL 如何指派租用給工作者並平衡負載。
安裝和新增相依性
如果您使用的是 Maven,請將下列相依性新增至 pom.xml
檔案。請確定您已將 3.x.x 取代為最新的 KCL 版本。
<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>
如果您使用的是 Gradle,請將下列內容新增至您的 build.gradle
檔案。請確定您已將 3.x.x 取代為最新的 KCL 版本。
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
您可以在 Maven Central Repository
實作消費者
KCL 消費者應用程式包含下列關鍵元件:
RecordProcessor
RecordProcessor 是處理 Kinesis 資料串流記錄之業務邏輯所在的核心元件。它定義您的應用程式如何處理從 Kinesis 串流接收到的資料。
關鍵責任:
-
初始化碎片的處理
-
從 Kinesis 串流處理記錄批次
-
碎片的關閉處理 (例如,碎片分割或合併時,或租約移交給其他主機時)
-
處理檢查點以追蹤進度
以下顯示實作範例:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.*; import software.amazon.kinesis.processor.ShardRecordProcessor; public class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()) ); // Checkpoint periodically processRecordsInput.checkpointer().checkpoint(); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting.", t); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } }
以下是範例中使用的每個方法的詳細說明:
initialize(InitializationInput initializationInput)
-
目的:設定處理記錄所需的任何資源或狀態。
-
呼叫時:一次,當 KCL 將碎片指派給此記錄處理器時。
-
重點:
-
initializationInput.shardId()
:此處理器將處理的碎片 ID。 -
initializationInput.extendedSequenceNumber()
:開始處理的序號。
-
processRecords(ProcessRecordsInput processRecordsInput)
-
目的:處理傳入的記錄和選擇性檢查點進度。
-
呼叫時:重複,只要記錄處理器保留碎片的租用即可。
-
重點:
-
processRecordsInput.records()
:要處理的記錄清單。 -
processRecordsInput.checkpointer()
:用來檢查進度。 -
請確定您在處理期間處理任何例外狀況,以防止 KCL 失敗。
-
此方法應該是等冪的,因為在某些情況下,相同的記錄可能會處理多次,例如在意外工作者當機或重新啟動之前尚未檢查點的資料。
-
在檢查點之前,請務必清除任何緩衝的資料,以確保資料一致性。
-
leaseLost(LeaseLostInput leaseLostInput)
-
目的:清除處理此碎片的任何特定資源。
-
呼叫時:當另一個排程器接管此碎片的租用時。
-
重點:
-
此方法不允許檢查點。
-
shardEnded(ShardEndedInput shardEndedInput)
-
目的:完成此碎片和檢查點的處理。
-
呼叫時:碎片分割或合併時,表示已處理此碎片的所有資料。
-
重點:
-
shardEndedInput.checkpointer()
:用於執行最終檢查點。 -
此方法的檢查點為必要項目,才能完成處理。
-
未排清此處的資料和檢查點,可能會在碎片重新開啟時導致資料遺失或重複處理。
-
shutdownRequested(ShutdownRequestedInput shutdownRequestedInput)
-
目的:KCL 關閉時檢查和清除資源。
-
呼叫時:KCL 關閉時,例如應用程式終止時)。
-
重點:
-
shutdownRequestedInput.checkpointer()
:用於在關機前執行檢查點。 -
請確定您在 方法中實作檢查點,以便在應用程式停止之前儲存進度。
-
未排清此處的資料和檢查點,可能會導致應用程式重新啟動時資料遺失或重新處理記錄。
-
重要
KCL 3.x 可在前一個工作者關閉之前進行檢查點,確保從一個工作者將租用交付給另一個工作者時,減少資料重新處理。如果您未在 shutdownRequested()
方法中實作檢查點邏輯,則不會看到此利益。請確定您已在 shutdownRequested()
方法中實作檢查點邏輯。
RecordProcessorFactory
RecordProcessorFactory 負責建立新的 RecordProcessor 執行個體。KCL 使用此工廠為每個應用程式需要處理的碎片建立新的 RecordProcessor。
關鍵責任:
-
視需要建立新的 RecordProcessor 執行個體
-
確定每個 RecordProcessor 都已正確初始化
以下是實作範例:
import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } }
在此範例中,每次呼叫 shardRecordProcessor() 時,工廠都會建立新的 SampleRecordProcessor。 shardRecordProcessor 您可以將此延伸到包含任何必要的初始化邏輯。
排程器
排程器是高階元件,可協調 KCL 應用程式的所有活動。它負責資料處理的整體協調。
關鍵責任:
-
管理 RecordProcessors 的生命週期
-
處理碎片的租用管理
-
座標檢查點
-
在應用程式的多個工作者之間平衡碎片處理負載
-
處理正常關機和應用程式終止訊號
排程器通常是在主要應用程式中建立和啟動。您可以在下一節主要消費者應用程式中查看排程器的實作範例。
主要消費者應用程式
主要消費者應用程式將所有元件關聯在一起。它負責設定 KCL 取用者、建立必要的用戶端、設定排程器,以及管理應用程式的生命週期。
關鍵責任:
-
設定 AWS 服務用戶端 (Kinesis、DynamoDB、CloudWatch)
-
設定 KCL 應用程式
-
建立和啟動排程器
-
處理應用程式關閉
以下是實作範例:
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import java.util.UUID; public class SampleConsumer { private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; public SampleConsumer(String streamName, Region region) { this.streamName = streamName; this.region = region; this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } public void run() { DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoDbAsyncClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); } public static void main(String[] args) { String streamName = "your-stream-name"; // replace with your stream name Region region = Region.US_EAST_1; // replace with your region new SampleConsumer(streamName, region).run(); } }
KCL 預設會建立具有專用輸送量的增強型扇出 (EFO) 取用者。如需增強型扇出的詳細資訊,請參閱開發具有專用輸送量的增強型扇出消費者。如果您的取用者少於 2 個,或不需要 200 毫秒以下的讀取傳播延遲,則必須在排程器物件中設定下列組態,以使用共用輸送量取用者:
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
下列程式碼是建立排程器物件的範例,該物件使用共用輸送量取用者:
匯入:
import software.amazon.kinesis.retrieval.polling.PollingConfig;
程式碼:
Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) );/