Java KCLで を使用してコンシューマーを開発する - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Java KCLで を使用してコンシューマーを開発する

前提条件

3.x KCL の使用を開始する前に、以下があることを確認してください。

  • Java 開発キット (JDK) 8 以降

  • AWS SDK for Java 2.x

  • Maven または Gradle による依存関係管理

KCL は、ワーカーが実行しているコンピューティングホストからCPU使用率などのCPU使用率メトリクスを収集して負荷のバランスを取り、ワーカー全体で均等なリソース使用率レベルを実現します。がワーカーからCPU使用率メトリクスKCLを収集できるようにするには、次の前提条件を満たす必要があります。

Amazon Elastic Compute Cloud(Amazon EC2)

  • オペレーティングシステムは Linux OS である必要があります。

  • EC2 インスタンスIMDSv2で を有効にする必要があります。

Amazon 上の Amazon Elastic Container Service (Amazon ECS) EC2

Amazon ECS での AWS Fargate

Amazon 上の Amazon Elastic Kubernetes Service (Amazon EKS) EC2

  • オペレーティングシステムは Linux OS である必要があります。

Amazon EKS の AWS Fargate

  • Fargate プラットフォーム 1.3.0 以降。

重要

がワーカーからCPU使用率メトリクスを収集KCLできない場合、 KCLはワーカーあたりのスループットを使用してリースを割り当て、フリート内のワーカー間で負荷のバランスを取ります。詳細については、「がリースをワーカーにKCL割り当て、負荷のバランスをとる方法」を参照してください。

依存関係をインストールして追加する

Maven を使用している場合は、次の依存関係をpom.xmlファイルに追加します。3.x.x をKCL最新バージョンに置き換えたことを確認してください。

<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>

Gradle を使用している場合は、build.gradleファイルに以下を追加します。3.x.x をKCL最新バージョンに置き換えたことを確認してください。

implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'

Maven Central Repository KCLで の最新バージョンを確認できます。

コンシューマーを実装する

KCL コンシューマーアプリケーションは、次のキーコンポーネントで構成されます。

RecordProcessor

RecordProcessor は、Kinesis データストリームレコードを処理するためのビジネスロジックが存在するコアコンポーネントです。これは、アプリケーションが Kinesis ストリームから受信したデータを処理する方法を定義します。

主な責任:

  • シャードの処理を初期化する

  • Kinesis ストリームからレコードのバッチを処理する

  • シャードのシャットダウン処理 (シャードが分割またはマージされた場合、リースが別のホストに引き渡された場合など)

  • チェックポイントを処理して進捗状況を追跡する

実装例を次に示します。

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.*; import software.amazon.kinesis.processor.ShardRecordProcessor; public class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()) ); // Checkpoint periodically processRecordsInput.checkpointer().checkpoint(); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting.", t); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } }

以下は、この例で使用されている各メソッドの詳細な説明です。

initialize(InitializationInput initializationInput)

  • 目的: レコードを処理するために必要なリソースまたは状態を設定します。

  • 呼び出されたとき: 1 回、 がこのレコードプロセッサにシャードKCLを割り当てるとき。

  • キーポイント:

    • initializationInput.shardId(): このプロセッサが処理するシャードの ID。

    • initializationInput.extendedSequenceNumber(): 処理を開始するシーケンス番号。

processRecords(ProcessRecordsInput processRecordsInput)

  • 目的: 受信レコードを処理し、オプションでチェックポイントの進行状況を処理します。

  • 呼び出されたとき: レコードプロセッサがシャードのリースを保持する限り、繰り返し。

  • キーポイント:

    • processRecordsInput.records(): 処理するレコードのリスト。

    • processRecordsInput.checkpointer(): 進捗状況をチェックポイントするために使用されます。

    • 処理中に例外を処理して、障害KCLが発生しないようにしてください。

    • この方法は、予期しないワーカーのクラッシュや再起動の前にチェックポイントされていないデータなど、一部のシナリオでは同じレコードが複数回処理される可能性があるため、冪力があるはずです。

    • データの整合性を確保するために、チェックポイントの前にバッファリングされたデータを必ずフラッシュしてください。

leaseLost(LeaseLostInput leaseLostInput)

  • 目的: このシャードの処理に固有のリソースをクリーンアップします。

  • 呼び出し時: 別のスケジューラがこのシャードのリースを引き継ぐとき。

  • キーポイント:

    • この方法ではチェックポイントは許可されません。

shardEnded(ShardEndedInput shardEndedInput)

  • 目的: このシャードとチェックポイントの処理を完了します。

  • 呼び出されたとき: シャードが分割またはマージされると、このシャードのすべてのデータが処理されたことを示します。

  • キーポイント:

    • shardEndedInput.checkpointer(): 最終チェックポイントの実行に使用されます。

    • 処理を完了するには、この方法のチェックポイントが必要です。

    • ここでデータとチェックポイントをフラッシュしないと、シャードが再開されたときにデータが失われたり、処理が重複したりする可能性があります。

shutdownRequested(ShutdownRequestedInput shutdownRequestedInput)

  • 目的: KCLがシャットダウンするときにリソースをチェックポイントしてクリーンアップします。

  • 呼び出されるとき: アプリケーションが終了する場合など、 KCL がシャットダウンするとき)。

  • キーポイント:

    • shutdownRequestedInput.checkpointer(): シャットダウン前にチェックポイントを実行するために使用されます。

    • アプリケーションが停止する前に進行状況が保存されるように、 メソッドでチェックポイントを実装していることを確認してください。

    • ここでデータとチェックポイントをフラッシュしないと、アプリケーションの再起動時にデータの損失やレコードの再処理が発生する可能性があります。

重要

KCL 3.x は、前のワーカーがシャットダウンする前にチェックポイントを行うことで、リースが 1 人のワーカーから別のワーカーに引き渡されたときのデータ再処理を減らします。shutdownRequested() メソッドにチェックポイントロジックを実装しない場合、この利点は表示されません。shutdownRequested() メソッド内にチェックポイントロジックを実装していることを確認します。

RecordProcessorFactory

RecordProcessorFactory は、新しい RecordProcessorインスタンスの作成を担当します。KCL は、このファクトリーを使用して、アプリケーションが処理する必要があるシャード RecordProcessor ごとに新しい を作成します。

主な責任:

  • オンデマンドで新しい RecordProcessor インスタンスを作成する

  • それぞれ RecordProcessor が適切に初期化されていることを確認します

以下は実装例です。

import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } }

この例では、ファクトリーは shardRecordProcessor() が呼び出 SampleRecordProcessor されるたびに新しい を作成します。これを拡張して、必要な初期化ロジックを含めることができます。

スケジューラー

スケジューラは、KCLアプリケーションのすべてのアクティビティを調整する高レベルのコンポーネントです。データ処理の全体的なオーケストレーションを担当します。

主な責任:

  • のライフサイクルを管理する RecordProcessors

  • シャードのリース管理を処理する

  • チェックポイントの調整

  • アプリケーションの複数のワーカー間でシャード処理負荷のバランスをとる

  • 正常なシャットダウンとアプリケーション終了のシグナルを処理する

スケジューラは通常、メインアプリケーションで作成および開始されます。スケジューラの実装例は、次のセクションの Main Consumer Application で確認できます。

メインコンシューマーアプリケーション

メインコンシューマーアプリケーションは、すべてのコンポーネントを結び付けます。KCL コンシューマーの設定、必要なクライアントの作成、スケジューラの設定、アプリケーションのライフサイクルの管理を担当します。

主な責任:

  • AWS サービスクライアントの設定 (Kinesis、DynamoDB 、 CloudWatch)

  • KCL アプリケーションを設定する

  • スケジューラを作成して起動する

  • アプリケーションのシャットダウンを処理する

以下は実装例です。

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import java.util.UUID; public class SampleConsumer { private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; public SampleConsumer(String streamName, Region region) { this.streamName = streamName; this.region = region; this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } public void run() { DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoDbAsyncClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); } public static void main(String[] args) { String streamName = "your-stream-name"; // replace with your stream name Region region = Region.US_EAST_1; // replace with your region new SampleConsumer(streamName, region).run(); } }

KCL は、デフォルトで専用スループットを持つ拡張ファンアウト (EFO) コンシューマーを作成します。拡張ファンアウトの詳細については、「」を参照してください専用スループットによる拡張ファンアウトコンシューマーの開発。コンシューマーが 2 つ未満の場合、または 200 ミリ秒未満の読み取り伝播遅延が不要の場合は、共有スループットコンシューマーを使用するようにスケジューラオブジェクトで次の設定を設定する必要があります。

configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))

次のコードは、共有スループットコンシューマーを使用するスケジューラオブジェクトを作成する例です。

インポート:

import software.amazon.kinesis.retrieval.polling.PollingConfig;

コード:

Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) );/