在 Python 中開發 Kinesis Client Library 消費者 - Amazon Kinesis Data Streams

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Python 中開發 Kinesis Client Library 消費者

注意

Kinesis Client Library (KCL) 1.x 和 2.x 版已過期。我們建議您遷移至 KCL 3.x 版,這可提供更好的效能和新功能。如需最新的 KCL 文件和遷移指南,請參閱 使用 Kinesis 用戶端程式庫

您可以使用 Kinesis Client Library (KCL) 建置應用程式,處理來自 Kinesis 資料串流的資料。Kinesis Client Library 支援多種語言。本主題將討論 Python。

KCL 是一種 Java 程式庫,使用稱為 MultiLangDaemon 的多語言介面提供對 Java 以外語言的支援。此常駐程式是以 Java 為基礎,並在您使用 Java 以外的 KCL 語言時在背景執行。因此,若您安裝了適用於 Python 的 KCL 並完全以 Python 撰寫取用者應用程式,則由於 MultiLangDaemon 的緣故,您的系統仍需要安裝 Java。此外,MultiLangDaemon 有一些預設設定,您可能需要針對您的使用案例進行自訂,例如,其連線 AWS 的區域。如需 MultiLangDaemon 的詳細資訊,請前往 GitHub 上的 KCL MultiLangDaemon 專案頁面。

若要從 GitHub 下載 Python KCL,請前往 Kinesis Client Library (Python)。如需下載 Python KCL 取用者應用程式的範本程式碼,請至 GitHub 前往適用於 Python 的 KCL 範例專案頁面。

以 Python 實作 KCL 取用者應用程式時,您必須完成以下任務:

實作 RecordProcessor 類別方法

RecordProcess 類別必須擴充 RecordProcessorBase 類別以實作下列方法。

initialize process_records shutdown_requested

範例提供的實作可讓您用於做為起點。

#!/usr/bin/env python # Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Amazon Software License (the "License"). # You may not use this file except in compliance with the License. # A copy of the License is located at # # http://aws.amazon.com/asl/ # # or in the "license" file accompanying this file. This file is distributed # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either # express or implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import print_function import sys import time from amazon_kclpy import kcl from amazon_kclpy.v3 import processor class RecordProcessor(processor.RecordProcessorBase): """ A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern: * initialize will be called once * process_records will be called zero or more times * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due a scaling change. """ def __init__(self): self._SLEEP_SECONDS = 5 self._CHECKPOINT_RETRIES = 5 self._CHECKPOINT_FREQ_SECONDS = 60 self._largest_seq = (None, None) self._largest_sub_seq = None self._last_checkpoint_time = None def log(self, message): sys.stderr.write(message) def initialize(self, initialize_input): """ Called once by a KCLProcess before any calls to process_records :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record processor has been assigned. """ self._largest_seq = (None, None) self._last_checkpoint_time = time.time() def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None): """ Checkpoints with retries on retryable exceptions. :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records or shutdown :param str or None sequence_number: the sequence number to checkpoint at. :param int or None sub_sequence_number: the sub sequence number to checkpoint at. """ for n in range(0, self._CHECKPOINT_RETRIES): try: checkpointer.checkpoint(sequence_number, sub_sequence_number) return except kcl.CheckpointError as e: if 'ShutdownException' == e.value: # # A ShutdownException indicates that this record processor should be shutdown. This is due to # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard. # print('Encountered shutdown exception, skipping checkpoint') return elif 'ThrottlingException' == e.value: # # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many # dynamo writes. We will sleep temporarily to let it recover. # if self._CHECKPOINT_RETRIES - 1 == n: sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n)) return else: print('Was throttled while checkpointing, will attempt again in {s} seconds' .format(s=self._SLEEP_SECONDS)) elif 'InvalidStateException' == e.value: sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n') else: # Some other error sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e)) time.sleep(self._SLEEP_SECONDS) def process_record(self, data, partition_key, sequence_number, sub_sequence_number): """ Called for each record that is passed to process_records. :param str data: The blob of data that was contained in the record. :param str partition_key: The key associated with this recod. :param int sequence_number: The sequence number associated with this record. :param int sub_sequence_number: the sub sequence number associated with this record. """ #################################### # Insert your processing logic here #################################### self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}" .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data))) def should_update_sequence(self, sequence_number, sub_sequence_number): """ Determines whether a new larger sequence number is available :param int sequence_number: the sequence number from the current record :param int sub_sequence_number: the sub sequence number from the current record :return boolean: true if the largest sequence should be updated, false otherwise """ return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \ (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1]) def process_records(self, process_records_input): """ Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers from the records to indicate where in the stream to checkpoint. :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the records. """ try: for record in process_records_input.records: data = record.binary_data seq = int(record.sequence_number) sub_seq = record.sub_sequence_number key = record.partition_key self.process_record(data, key, seq, sub_seq) if self.should_update_sequence(seq, sub_seq): self._largest_seq = (seq, sub_seq) # # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds # if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS: self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1]) self._last_checkpoint_time = time.time() except Exception as e: self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e)) def lease_lost(self, lease_lost_input): self.log("Lease has been lost") def shard_ended(self, shard_ended_input): self.log("Shard has ended checkpointing") shard_ended_input.checkpointer.checkpoint() def shutdown_requested(self, shutdown_requested_input): self.log("Shutdown has been requested, checkpointing.") shutdown_requested_input.checkpointer.checkpoint() if __name__ == "__main__": kcl_process = kcl.KCLProcess(RecordProcessor()) kcl_process.run()

修改組態屬性

範例提供了組態屬性的預設值,如下列指令碼所示。您可使用自訂值覆寫任何這些屬性。

# The script that abides by the multi-language protocol. This script will # be executed by the MultiLangDaemon, which will communicate with this script # over STDIN and STDOUT according to the multi-language protocol. executableName = sample_kclpy_app.py # The name of an Amazon Kinesis stream to process. streamName = words # Used by the KCL as the name of this application. Will be used as the name # of an Amazon DynamoDB table which will store the lease and checkpoint # information for workers with this application name applicationName = PythonKCLSample # Users can change the credentials provider the KCL will use to retrieve credentials. # The DefaultAWSCredentialsProviderChain checks several other providers, which is # described here: # http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Appended to the user agent of the KCL. Does not impact the functionality of the # KCL in any other way. processingLanguage = python/2.7 # Valid options at TRIM_HORIZON or LATEST. # See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax initialPositionInStream = TRIM_HORIZON # The following properties are also available for configuring the KCL Worker that is created # by the MultiLangDaemon. # The KCL defaults to us-east-1 #regionName = us-east-1 # Fail over time in milliseconds. A worker which does not renew it's lease within this time interval # will be regarded as having problems and it's shards will be assigned to other workers. # For applications that have a large number of shards, this msy be set to a higher number to reduce # the number of DynamoDB IOPS required for tracking leases #failoverTimeMillis = 10000 # A worker id that uniquely identifies this worker among all workers using the same applicationName # If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself. #workerId = # Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. #shardSyncIntervalMillis = 60000 # Max records to fetch from Kinesis in a single GetRecords call. #maxRecords = 10000 # Idle time between record reads in milliseconds. #idleTimeBetweenReadsInMillis = 1000 # Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while) #callProcessRecordsEvenForEmptyRecordList = false # Interval in milliseconds between polling to check for parent shard completion. # Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on # completion of parent shards). #parentShardPollIntervalMillis = 10000 # Cleanup leases upon shards completion (don't wait until they expire in Kinesis). # Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try # to delete the ones we don't need any longer. #cleanupLeasesUponShardCompletion = true # Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). #taskBackoffTimeMillis = 500 # Buffer metrics for at most this long before publishing to CloudWatch. #metricsBufferTimeMillis = 10000 # Buffer at most this many metrics before publishing to CloudWatch. #metricsMaxQueueSize = 10000 # KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls # to RecordProcessorCheckpointer#checkpoint(String) by default. #validateSequenceNumberBeforeCheckpointing = true # The maximum number of active threads for the MultiLangDaemon to permit. # If a value is provided then a FixedThreadPool is used with the maximum # active threads set to the provided value. If a non-positive integer or no # value is provided a CachedThreadPool is used. #maxActiveThreads = 0

應用程式名稱

KCL 要求所有應用程式和同一區域內的 Amazon DynamoDB 資料表必須具有獨一無二的應用程式名稱。其使用應用程式名稱組態值的方式如下:

  • 假定所有與此應用程式名稱相關聯的工作者合作處理同一串流。這些工作者可分佈於多個執行個體。如果您以相同應用程式的程式碼執行另一執行個體但使用不同的應用程式名稱,KCL 便會將第二個執行個體視為亦對同一串流進行操作的完全獨立應用程式。

  • KCl 將使用應用程式名稱建立 DynamoDB 資料表並由該資料表維護應用程式的狀態資訊 (例如檢查點及工作者與碎片間對應)。每個應用程式都有其自身的 DynamoDB 資料表。如需詳細資訊,請參閱使用租用資料表追蹤 KCL 消費者應用程式處理的碎片

登入資料

您必須將 AWS 登入資料提供給預設登入資料提供者鏈結中的其中一個登入資料提供者。您可以使用 AWSCredentialsProvider 屬性,設定登入資料供應者。如果您在 Amazon EC2 執行個體上執行取用者應用程式,建議您使用 IAM 角色來設定執行個體。 AWS 反映與此 IAM 角色相關聯許可的憑證,可透過執行個體中繼資料提供給執行個體上的應用程式。以這種方式管理 EC2 執行個體上執行的消費者應用程式的登入資料最為安全。