Cookie の設定を選択する

当社は、当社のサイトおよびサービスを提供するために必要な必須 Cookie および類似のツールを使用しています。当社は、パフォーマンス Cookie を使用して匿名の統計情報を収集することで、お客様が当社のサイトをどのように利用しているかを把握し、改善に役立てています。必須 Cookie は無効化できませんが、[カスタマイズ] または [拒否] をクリックしてパフォーマンス Cookie を拒否することはできます。

お客様が同意した場合、AWS および承認された第三者は、Cookie を使用して便利なサイト機能を提供したり、お客様の選択を記憶したり、関連する広告を含む関連コンテンツを表示したりします。すべての必須ではない Cookie を受け入れるか拒否するには、[受け入れる] または [拒否] をクリックしてください。より詳細な選択を行うには、[カスタマイズ] をクリックしてください。

Java での Kinesis クライアントライブラリコンシューマーを開発する

フォーカスモード
Java での Kinesis クライアントライブラリコンシューマーを開発する - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

重要

Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にend-of-supportとなります。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを強くお勧めします。最新の KCL バージョンを確認するには、GitHub Amazon Kinesis Client Library」ページを参照してください。最新の KCL バージョンについては、「」を参照してくださいKinesis Client Library を使用する。KCL 1.x から KCL 3.x への移行については、「」を参照してくださいKCL 1.x から KCL 3.x への移行

次のコードは、ProcessorFactory および RecordProcessor の Java のサンプル実装を示しています。拡張ファンアウト機能を活用する方法については、拡張ファンアウトでコンシューマーを使用するを参照してください。

/* * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/asl/ * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ /* * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://www.apache.org/licenses/LICENSE-2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.retrieval.polling.PollingConfig; /** * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data. * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK. */ public class SampleSingle { private static final Logger log = LoggerFactory.getLogger(SampleSingle.class); /** * Invoke the main method with 2 args: the stream name and (optionally) the region. * Verifies valid inputs and then starts running the app. */ public static void main(String... args) { if (args.length < 1) { log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument."); System.exit(1); } String streamName = args[0]; String region = null; if (args.length > 1) { region = args[1]; } new SampleSingle(streamName, region).run(); } private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; /** * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis. * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used * indirectly by the KCL to handle the consumption of the data. */ private SampleSingle(String streamName, String region) { this.streamName = streamName; this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2")); this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } private void run() { /** * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL */ ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor(); ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS); /** * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private * class below. */ DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); /** * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This * instance is configured with defaults provided by the ConfigsBuilder. */ Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) ); /** * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely * until an exit is triggered. */ Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); /** * Allows termination of app by pressing Enter. */ System.out.println("Press enter to shutdown"); BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); try { reader.readLine(); } catch (IOException ioex) { log.error("Caught exception while waiting for confirm. Shutting down.", ioex); } /** * Stops sending dummy data. */ log.info("Cancelling producer and shutting down executor."); producerFuture.cancel(true); producerExecutor.shutdownNow(); /** * Stops consuming data. Finishes processing the current batch of data already received from Kinesis * before shutting down. */ Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown(); log.info("Waiting up to 20 seconds for shutdown to complete."); try { gracefulShutdownFuture.get(20, TimeUnit.SECONDS); } catch (InterruptedException e) { log.info("Interrupted while waiting for graceful shutdown. Continuing."); } catch (ExecutionException e) { log.error("Exception while executing graceful shutdown.", e); } catch (TimeoutException e) { log.error("Timeout while waiting for shutdown. Scheduler may not have exited."); } log.info("Completed, shutting down now."); } /** * Sends a single record of dummy data to Kinesis. */ private void publishRecord() { PutRecordRequest request = PutRecordRequest.builder() .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) .streamName(streamName) .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10))) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { log.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { log.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } } private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } } /** * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives. * In this example all we do to 'process' is log info about the records. */ private static class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; /** * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via * processRecords). In this example we do nothing except some logging. * * @param initializationInput Provides information related to initialization. */ public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } /** * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver * data records to the application. In this example we simply log our records. * * @param processRecordsInput Provides the records to be processed as well as information and capabilities * related to them (e.g. checkpointing). */ public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting."); Runtime.getRuntime().halt(1); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } /** Called when the lease tied to this record processor has been lost. Once the lease has been lost, * the record processor can no longer checkpoint. * * @param leaseLostInput Provides access to functions and data related to the loss of the lease. */ public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } /** * Called when all data on this shard has been processed. Checkpointing must occur in the method for record * processing to be considered complete; an exception will be thrown otherwise. * * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard. */ public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } /** * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing * Enter). Checkpoints and logs the data a final time. * * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint * before the shutdown is completed. */ public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } } }
プライバシーサイト規約Cookie の設定
© 2025, Amazon Web Services, Inc. or its affiliates.All rights reserved.