翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
重要
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にend-of-supportとなります。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを強くお勧めします。最新の KCL バージョンを確認するには、GitHub Amazon Kinesis Client Library」ページ
Kinesis Data Streams のデータを処理するアプリケーションを構築するには Kinesis Client Library (KCL) を使用できます。Kinesis Client Library は、複数の言語で使用できます。このトピックでは、Python について説明します。
KCL は Java ライブラリであり、Java 以外の言語のサポートは、MultiLangDaemon と呼ばれる多言語インターフェースを使用して提供されます。このデーモンは Java ベースで、Java 以外の KCL 言語を使用しているときにバックグラウンドで実行されます。そのため、KCL for Python をインストールして、コンシューマーアプリケーションをすべて Python で書く場合でも、MultiLangDaemon を使用するために、Java をシステムにインストールする必要があります。さらに、MultiLangDaemon には、接続先の AWS リージョンなど、ユースケースに合わせてカスタマイズする必要があるデフォルト設定があります。GitHub の MultiLangDaemon の詳細については、KCL MultiLangDaemon project
GitHub から Python KCL をダウンロードするには、Kinesis Client Library (Python)
Python で KCL コンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。
RecordProcessor クラスのメソッドを実装する
RecordProcess
クラスでは、RecordProcessorBase
クラスを拡張して次のメソッドを実装する必要があります。
initialize process_records shutdown_requested
このサンプルでは、開始点として使用できる実装を提供しています。
#!/usr/bin/env python
# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from __future__ import print_function
import sys
import time
from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor
class RecordProcessor(processor.RecordProcessorBase):
"""
A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:
* initialize will be called once
* process_records will be called zero or more times
* shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
a scaling change.
"""
def __init__(self):
self._SLEEP_SECONDS = 5
self._CHECKPOINT_RETRIES = 5
self._CHECKPOINT_FREQ_SECONDS = 60
self._largest_seq = (None, None)
self._largest_sub_seq = None
self._last_checkpoint_time = None
def log(self, message):
sys.stderr.write(message)
def initialize(self, initialize_input):
"""
Called once by a KCLProcess before any calls to process_records
:param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
processor has been assigned.
"""
self._largest_seq = (None, None)
self._last_checkpoint_time = time.time()
def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
"""
Checkpoints with retries on retryable exceptions.
:param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
or shutdown
:param str or None sequence_number: the sequence number to checkpoint at.
:param int or None sub_sequence_number: the sub sequence number to checkpoint at.
"""
for n in range(0, self._CHECKPOINT_RETRIES):
try:
checkpointer.checkpoint(sequence_number, sub_sequence_number)
return
except kcl.CheckpointError as e:
if 'ShutdownException' == e.value:
#
# A ShutdownException indicates that this record processor should be shutdown. This is due to
# some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
#
print('Encountered shutdown exception, skipping checkpoint')
return
elif 'ThrottlingException' == e.value:
#
# A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
# dynamo writes. We will sleep temporarily to let it recover.
#
if self._CHECKPOINT_RETRIES - 1 == n:
sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
return
else:
print('Was throttled while checkpointing, will attempt again in {s} seconds'
.format(s=self._SLEEP_SECONDS))
elif 'InvalidStateException' == e.value:
sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
else: # Some other error
sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
time.sleep(self._SLEEP_SECONDS)
def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
"""
Called for each record that is passed to process_records.
:param str data: The blob of data that was contained in the record.
:param str partition_key: The key associated with this recod.
:param int sequence_number: The sequence number associated with this record.
:param int sub_sequence_number: the sub sequence number associated with this record.
"""
####################################
# Insert your processing logic here
####################################
self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
.format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))
def should_update_sequence(self, sequence_number, sub_sequence_number):
"""
Determines whether a new larger sequence number is available
:param int sequence_number: the sequence number from the current record
:param int sub_sequence_number: the sub sequence number from the current record
:return boolean: true if the largest sequence should be updated, false otherwise
"""
return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
(sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])
def process_records(self, process_records_input):
"""
Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
from the records to indicate where in the stream to checkpoint.
:param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
records.
"""
try:
for record in process_records_input.records:
data = record.binary_data
seq = int(record.sequence_number)
sub_seq = record.sub_sequence_number
key = record.partition_key
self.process_record(data, key, seq, sub_seq)
if self.should_update_sequence(seq, sub_seq):
self._largest_seq = (seq, sub_seq)
#
# Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
#
if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
self._last_checkpoint_time = time.time()
except Exception as e:
self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))
def lease_lost(self, lease_lost_input):
self.log("Lease has been lost")
def shard_ended(self, shard_ended_input):
self.log("Shard has ended checkpointing")
shard_ended_input.checkpointer.checkpoint()
def shutdown_requested(self, shutdown_requested_input):
self.log("Shutdown has been requested, checkpointing.")
shutdown_requested_input.checkpointer.checkpoint()
if __name__ == "__main__":
kcl_process = kcl.KCLProcess(RecordProcessor())
kcl_process.run()
設定プロパティを変更する
このサンプルでは、次のスクリプトに示すように、設定プロパティのデフォルト値を提供します。これらのプロパティを独自の値にオーバーライドできます。
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py
# The name of an Amazon Kinesis stream to process.
streamName = words
# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample
# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7
# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON
# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.
# The KCL defaults to us-east-1
#regionName = us-east-1
# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000
# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId =
# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000
# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000
# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000
# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false
# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000
# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true
# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500
# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000
# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000
# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true
# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
アプリケーション名
KCL には、複数のアプリケーション間、および同じリージョン内の Amazon DynamoDB テーブル間で一意のアプリケーションが必要です。次のようにアプリケーション名の設定値を使用します。
-
このアプリケーション名と関連付けられたワーカーはすべて、同じストリーム上で連携して処理しているとみなされます。これらのワーカーは複数のインスタンス間に分散している場合があります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。
-
KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報 (チェックポイントやワーカーとシャードのマッピングなど) を保存します。各アプリケーションには、それぞれ DynamoDB テーブルがあります。詳細については、「リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡する」を参照してください。
認証情報
デフォルトの AWS 認証情報プロバイダーチェーンの認証情報プロバイダーの 1 つが認証情報を利用できるようにする必要があります。AWSCredentialsProvider
プロパティを使用して認証情報プロバイダーを設定できます。Amazon EC2 インスタンスでコンシューマーアプリケーションを実行する場合は、この IAM ロールに関連付けられたアクセス許可を反映する IAM role. AWS credentials を使用してインスタンスを設定することをお勧めします。この IAM ロールは、インスタンスメタデータを介してインスタンス上のアプリケーションで使用できるようになります。これは、EC2 インスタンスで実行されるコンシューマーアプリケーションの認証情報を管理するための最も安全な方法です。