Develop enhanced fan-out consumers using KCL 2.x in Java - Amazon Kinesis Data Streams

Develop enhanced fan-out consumers using KCL 2.x in Java

Note

Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. We recommend migrating to KCL version 3.x, which offers improved performance and new features. For the latest KCL documentation and migration guide, see Use Kinesis Client Library.

You can use version 2.0 or later of the Kinesis Client Library (KCL) to develop applications in Amazon Kinesis Data Streams to receive data from streams using enhanced fan-out. The following code shows an example implementation in Java of ProcessorFactory and RecordProcessor.

It is recommended that you use KinesisClientUtil to create KinesisAsyncClient and to configure maxConcurrency in KinesisAsyncClient.

Important

The Amazon Kinesis Client might see significantly increased latency, unless you configure KinesisAsyncClient to have a maxConcurrency high enough to allow all leases plus additional usages of KinesisAsyncClient.

/* * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/asl/ * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ /* * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://www.apache.org/licenses/LICENSE-2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleSingle { private static final Logger log = LoggerFactory.getLogger(SampleSingle.class); public static void main(String... args) { if (args.length < 1) { log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument."); System.exit(1); } String streamName = args[0]; String region = null; if (args.length > 1) { region = args[1]; } new SampleSingle(streamName, region).run(); } private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; private SampleSingle(String streamName, String region) { this.streamName = streamName; this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2")); this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } private void run() { ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor(); ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); System.out.println("Press enter to shutdown"); BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); try { reader.readLine(); } catch (IOException ioex) { log.error("Caught exception while waiting for confirm. Shutting down.", ioex); } log.info("Cancelling producer, and shutting down executor."); producerFuture.cancel(true); producerExecutor.shutdownNow(); Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown(); log.info("Waiting up to 20 seconds for shutdown to complete."); try { gracefulShutdownFuture.get(20, TimeUnit.SECONDS); } catch (InterruptedException e) { log.info("Interrupted while waiting for graceful shutdown. Continuing."); } catch (ExecutionException e) { log.error("Exception while executing graceful shutdown.", e); } catch (TimeoutException e) { log.error("Timeout while waiting for shutdown. Scheduler may not have exited."); } log.info("Completed, shutting down now."); } private void publishRecord() { PutRecordRequest request = PutRecordRequest.builder() .partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) .streamName(streamName) .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10))) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { log.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { log.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } } private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } } private static class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting."); Runtime.getRuntime().halt(1); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } } }