Develop a Kinesis Client Library consumer in Python
Note
Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. We recommend migrating to KCL version 3.x, which offers improved performance and new features. For the latest KCL documentation and migration guide, see Use Kinesis Client Library.
You can use the Kinesis Client Library (KCL) to build applications that process data from your Kinesis data streams. The Kinesis Client Library is available in multiple languages. This topic discusses Python.
The KCL is a Java library; support for languages other than Java is provided using a
multi-language interface called the MultiLangDaemon. This daemon is
Java-based and runs in the background when you are using a KCL language other than Java. Therefore, if you install the KCL for Python and write your consumer
app entirely in Python, you still need Java installed on your system because of the
MultiLangDaemon. Further, MultiLangDaemon has some default settings you may need to
customize for your use case, for example, the AWS Region that it connects to. For more
information about the MultiLangDaemon on GitHub, go to the KCL MultiLangDaemon project
To download the Python KCL from GitHub, go to Kinesis Client Library (Python)
You must complete the following tasks when implementing a KCL consumer application in Python:
Implement the RecordProcessor class methods
The RecordProcess
class must extend the
RecordProcessorBase
class to implement the following
methods:
initialize process_records shutdown_requested
This sample provides implementations that you can use as a starting point.
#!/usr/bin/env python # Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Amazon Software License (the "License"). # You may not use this file except in compliance with the License. # A copy of the License is located at # # http://aws.amazon.com/asl/ # # or in the "license" file accompanying this file. This file is distributed # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either # express or implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import print_function import sys import time from amazon_kclpy import kcl from amazon_kclpy.v3 import processor class RecordProcessor(processor.RecordProcessorBase): """ A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern: * initialize will be called once * process_records will be called zero or more times * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due a scaling change. """ def __init__(self): self._SLEEP_SECONDS = 5 self._CHECKPOINT_RETRIES = 5 self._CHECKPOINT_FREQ_SECONDS = 60 self._largest_seq = (None, None) self._largest_sub_seq = None self._last_checkpoint_time = None def log(self, message): sys.stderr.write(message) def initialize(self, initialize_input): """ Called once by a KCLProcess before any calls to process_records :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record processor has been assigned. """ self._largest_seq = (None, None) self._last_checkpoint_time = time.time() def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None): """ Checkpoints with retries on retryable exceptions. :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records or shutdown :param str or None sequence_number: the sequence number to checkpoint at. :param int or None sub_sequence_number: the sub sequence number to checkpoint at. """ for n in range(0, self._CHECKPOINT_RETRIES): try: checkpointer.checkpoint(sequence_number, sub_sequence_number) return except kcl.CheckpointError as e: if 'ShutdownException' == e.value: # # A ShutdownException indicates that this record processor should be shutdown. This is due to # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard. # print('Encountered shutdown exception, skipping checkpoint') return elif 'ThrottlingException' == e.value: # # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many # dynamo writes. We will sleep temporarily to let it recover. # if self._CHECKPOINT_RETRIES - 1 == n: sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n)) return else: print('Was throttled while checkpointing, will attempt again in {s} seconds' .format(s=self._SLEEP_SECONDS)) elif 'InvalidStateException' == e.value: sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n') else: # Some other error sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e)) time.sleep(self._SLEEP_SECONDS) def process_record(self, data, partition_key, sequence_number, sub_sequence_number): """ Called for each record that is passed to process_records. :param str data: The blob of data that was contained in the record. :param str partition_key: The key associated with this recod. :param int sequence_number: The sequence number associated with this record. :param int sub_sequence_number: the sub sequence number associated with this record. """ #################################### # Insert your processing logic here #################################### self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}" .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data))) def should_update_sequence(self, sequence_number, sub_sequence_number): """ Determines whether a new larger sequence number is available :param int sequence_number: the sequence number from the current record :param int sub_sequence_number: the sub sequence number from the current record :return boolean: true if the largest sequence should be updated, false otherwise """ return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \ (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1]) def process_records(self, process_records_input): """ Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers from the records to indicate where in the stream to checkpoint. :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the records. """ try: for record in process_records_input.records: data = record.binary_data seq = int(record.sequence_number) sub_seq = record.sub_sequence_number key = record.partition_key self.process_record(data, key, seq, sub_seq) if self.should_update_sequence(seq, sub_seq): self._largest_seq = (seq, sub_seq) # # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds # if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS: self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1]) self._last_checkpoint_time = time.time() except Exception as e: self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e)) def lease_lost(self, lease_lost_input): self.log("Lease has been lost") def shard_ended(self, shard_ended_input): self.log("Shard has ended checkpointing") shard_ended_input.checkpointer.checkpoint() def shutdown_requested(self, shutdown_requested_input): self.log("Shutdown has been requested, checkpointing.") shutdown_requested_input.checkpointer.checkpoint() if __name__ == "__main__": kcl_process = kcl.KCLProcess(RecordProcessor()) kcl_process.run()
Modify the configuration properties
The sample provides default values for the configuration properties, as shown in the following script. You can override any of these properties with your own values.
# The script that abides by the multi-language protocol. This script will # be executed by the MultiLangDaemon, which will communicate with this script # over STDIN and STDOUT according to the multi-language protocol. executableName = sample_kclpy_app.py # The name of an Amazon Kinesis stream to process. streamName = words # Used by the KCL as the name of this application. Will be used as the name # of an Amazon DynamoDB table which will store the lease and checkpoint # information for workers with this application name applicationName = PythonKCLSample # Users can change the credentials provider the KCL will use to retrieve credentials. # The DefaultAWSCredentialsProviderChain checks several other providers, which is # described here: # http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Appended to the user agent of the KCL. Does not impact the functionality of the # KCL in any other way. processingLanguage = python/2.7 # Valid options at TRIM_HORIZON or LATEST. # See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax initialPositionInStream = TRIM_HORIZON # The following properties are also available for configuring the KCL Worker that is created # by the MultiLangDaemon. # The KCL defaults to us-east-1 #regionName = us-east-1 # Fail over time in milliseconds. A worker which does not renew it's lease within this time interval # will be regarded as having problems and it's shards will be assigned to other workers. # For applications that have a large number of shards, this msy be set to a higher number to reduce # the number of DynamoDB IOPS required for tracking leases #failoverTimeMillis = 10000 # A worker id that uniquely identifies this worker among all workers using the same applicationName # If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself. #workerId = # Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. #shardSyncIntervalMillis = 60000 # Max records to fetch from Kinesis in a single GetRecords call. #maxRecords = 10000 # Idle time between record reads in milliseconds. #idleTimeBetweenReadsInMillis = 1000 # Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while) #callProcessRecordsEvenForEmptyRecordList = false # Interval in milliseconds between polling to check for parent shard completion. # Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on # completion of parent shards). #parentShardPollIntervalMillis = 10000 # Cleanup leases upon shards completion (don't wait until they expire in Kinesis). # Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try # to delete the ones we don't need any longer. #cleanupLeasesUponShardCompletion = true # Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). #taskBackoffTimeMillis = 500 # Buffer metrics for at most this long before publishing to CloudWatch. #metricsBufferTimeMillis = 10000 # Buffer at most this many metrics before publishing to CloudWatch. #metricsMaxQueueSize = 10000 # KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls # to RecordProcessorCheckpointer#checkpoint(String) by default. #validateSequenceNumberBeforeCheckpointing = true # The maximum number of active threads for the MultiLangDaemon to permit. # If a value is provided then a FixedThreadPool is used with the maximum # active threads set to the provided value. If a non-positive integer or no # value is provided a CachedThreadPool is used. #maxActiveThreads = 0
Application name
The KCL requires an application name that is unique among your applications and among Amazon DynamoDB tables in the same Region. It uses the application name configuration value in the following ways:
-
All workers that are associated with this application name are assumed to be working together on the same stream. These workers can be distributed across multiple instances. If you run an additional instance of the same application code, but with a different application name, the KCL treats the second instance as an entirely separate application that is also operating on the same stream.
-
The KCL creates a DynamoDB table with the application name and uses the table to maintain state information (such as checkpoints and worker-shard mapping) for the application. Each application has its own DynamoDB table. For more information, see Use a lease table to track the shards processed by the KCL consumer application.
Credentials
You must make your AWS credentials available to one of the credential
providers in the default credential providers chain. You can you use the
AWSCredentialsProvider
property to set a credentials provider.
If you run your consumer application on an Amazon EC2 instance, we recommend that you
configure the instance with an IAM role. AWS credentials that reflect the
permissions associated with this IAM role are made available to applications
on the instance through its instance metadata. This is the most secure way to
manage credentials for a consumer application running on an EC2 instance.