Python と ElastiCache - Amazon ElastiCache

Python と ElastiCache

このチュートリアルでは、AWS SDK for Python (Boto3) を使用して次の ElastiCache (Redis OSS) オペレーションを実行するシンプルなプログラムを記述します。

  • ElastiCache (Redis OSS) クラスターを作成する (クラスターモードが有効およびクラスターモードが無効)

  • ユーザー/ユーザーグループが存在するかどうかを確認し、存在しない場合は作成します (この機能は、Valkey 7.2 以降、および Redis OSS 6.0 以降で使用できます)。

  • ElastiCache に接続する

  • 文字列の設定と取得、ストリームからの読み取りと書き込み、Pub/Sub チャンネルからの公開と登録などのオペレーションを実行します。

このチュートリアルは、AWS SDK for Python (Boto) ドキュメントを参照しながら実行できます。以下のセクションは ElastiCache の低レベルのクライアント に固有のものです。

チュートリアルの前提条件

チュートリアル: ElastiCache クラスターとユーザーの作成

次の例では、ElastiCache (Redis OSS) 管理オペレーション (クラスターまたはユーザーの作成) で boto3 SDK を使用し、データ処理で redis-py/redis-py-cluster を使用します。

クラスタモードが無効のクラスターを作成する

次のプログラムを CreateClusterModeDisabledCluster.py というファイルにコピーアンドペーストします。

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_disabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumCacheClusters=2,ReplicationGroupDescription='Sample cache cluster',ReplicationGroupId=None): """Creates an ElastiCache Cluster with cluster mode disabled Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumCacheClusters: Number of nodes in the cluster. Minimum 1 (just a primary node) and maximun 6 (1 primary and 5 replicas). If not specified, cluster will be created with 1 primary and 1 replica. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Name for the cluster :return: dictionary with the API results """ if not ReplicationGroupId: return 'ReplicationGroupId parameter is required' response = client.create_replication_group( AutomaticFailoverEnabled=True, CacheNodeType=CacheNodeType, Engine='valkey', EngineVersion=EngineVersion, NumCacheClusters=NumCacheClusters, ReplicationGroupDescription=ReplicationGroupDescription, ReplicationGroupId=ReplicationGroupId, SnapshotRetentionLimit=30, ) return response if __name__ == '__main__': # Creates an ElastiCache Cluster mode disabled cluster, based on cache.m6g.large nodes, Valkey 7.2, one primary and two replicas elasticacheResponse = create_cluster_mode_disabled( #CacheNodeType='cache.m6g.large', EngineVersion='7.2', NumCacheClusters=3, ReplicationGroupDescription='Valkey cluster mode disabled with replicas', ReplicationGroupId='valkey202104053' ) logging.info(elasticacheResponse)

このプログラムを実行するには、次のコマンドを入力します。

python CreateClusterModeDisabledCluster.py

詳細については、「ElastiCache でのクラスターの管理」を参照してください。

TLS と RBAC を用いてクラスターモードが無効のクラスターを作成する

セキュリティを確保するために、クラスタモードが無効のクラスターを作成するときに、Transport Layer Security (TLS) とロールベースのアクセスコントロール (RBAC) を使用できます。Valkey または Redis OSS AUTH では、トークンが認証されていれば認証済みのすべてのクライアントにレプリケーショングループへのフルアクセスが認められますが、RBAC はこれとは異なり、クラスターへのアクセスをユーザーグループを介して制御できます。これらのユーザーグループは、レプリケーショングループへのアクセスを分類する方法として設計されています。詳細については、「ロールベースのアクセスコントロール (RBAC)」を参照してください。

次のプログラムを ClusterModeDisabledWithRBAC.py というファイルにコピーアンドペーストします。

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_disabled_rbac(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumCacheClusters=2,ReplicationGroupDescription='Sample cache cluster',ReplicationGroupId=None, UserGroupIds=None, SecurityGroupIds=None,CacheSubnetGroupName=None): """Creates an ElastiCache Cluster with cluster mode disabled and RBAC Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumCacheClusters: Number of nodes in the cluster. Minimum 1 (just a primary node) and maximun 6 (1 primary and 5 replicas). If not specified, cluster will be created with 1 primary and 1 replica. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Mandatory name for the cluster. :param UserGroupIds: The ID of the user group to be assigned to the cluster. :param SecurityGroupIds: List of security groups to be assigned. If not defined, default will be used :param CacheSubnetGroupName: subnet group where the cluster will be placed. If not defined, default will be used. :return: dictionary with the API results """ if not ReplicationGroupId: return {'Error': 'ReplicationGroupId parameter is required'} elif not isinstance(UserGroupIds,(list)): return {'Error': 'UserGroupIds parameter is required and must be a list'} params={'AutomaticFailoverEnabled': True, 'CacheNodeType': CacheNodeType, 'Engine': 'valkey', 'EngineVersion': EngineVersion, 'NumCacheClusters': NumCacheClusters, 'ReplicationGroupDescription': ReplicationGroupDescription, 'ReplicationGroupId': ReplicationGroupId, 'SnapshotRetentionLimit': 30, 'TransitEncryptionEnabled': True, 'UserGroupIds':UserGroupIds } # defaults will be used if CacheSubnetGroupName or SecurityGroups are not explicit. if isinstance(SecurityGroupIds,(list)): params.update({'SecurityGroupIds':SecurityGroupIds}) if CacheSubnetGroupName: params.update({'CacheSubnetGroupName':CacheSubnetGroupName}) response = client.create_replication_group(**params) return response if __name__ == '__main__': # Creates an ElastiCache Cluster mode disabled cluster, based on cache.m6g.large nodes, Valkey 7.2, one primary and two replicas. # Assigns the existent user group "mygroup" for RBAC authentication response=create_cluster_mode_disabled_rbac( CacheNodeType='cache.m6g.large', EngineVersion='7.2', NumCacheClusters=3, ReplicationGroupDescription='Valkey cluster mode disabled with replicas', ReplicationGroupId='valkey202104', UserGroupIds=[ 'mygroup' ], SecurityGroupIds=[ 'sg-7cc73803' ], CacheSubnetGroupName='default' ) logging.info(response)

このプログラムを実行するには、次のコマンドを入力します。

python ClusterModeDisabledWithRBAC.py

詳細については、「ElastiCache でのクラスターの管理」を参照してください。

クラスタモードが有効のクラスターの作成

次のプログラムを ClusterModeEnabled.py というファイルにコピーアンドペーストします。

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_enabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumNodeGroups=1,ReplicasPerNodeGroup=1, ReplicationGroupDescription='Sample cache with cluster mode enabled',ReplicationGroupId=None): """Creates an ElastiCache Cluster with cluster mode enabled Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumNodeGroups: Number of shards in the cluster. Minimum 1 and maximun 90. If not specified, cluster will be created with 1 shard. :param ReplicasPerNodeGroup: Number of replicas per shard. If not specified 1 replica per shard will be created. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Name for the cluster :return: dictionary with the API results """ if not ReplicationGroupId: return 'ReplicationGroupId parameter is required' response = client.create_replication_group( AutomaticFailoverEnabled=True, CacheNodeType=CacheNodeType, Engine='valkey', EngineVersion=EngineVersion, ReplicationGroupDescription=ReplicationGroupDescription, ReplicationGroupId=ReplicationGroupId, # Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary node (implicit) and 2 replicas (replicasPerNodeGroup) NumNodeGroups=NumNodeGroups, ReplicasPerNodeGroup=ReplicasPerNodeGroup, CacheParameterGroupName='default.valkey7.2.cluster.on' ) return response # Creates a cluster mode enabled response = create_cluster_mode_enabled( CacheNodeType='cache.m6g.large', EngineVersion='6.0', ReplicationGroupDescription='Valkey cluster mode enabled with replicas', ReplicationGroupId='valkey20210', # Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary (implicit) and 2 replicas (replicasPerNodeGroup) NumNodeGroups=2, ReplicasPerNodeGroup=1, ) logging.info(response)

このプログラムを実行するには、次のコマンドを入力します。

python ClusterModeEnabled.py

詳細については、「ElastiCache でのクラスターの管理」を参照してください。

TLS および RBAC を用いたクラスターモードが有効のクラスターの作成

セキュリティを確保するために、クラスタモードが有効のクラスターを作成するときに、Transport Layer Security (TLS) とロールベースのアクセスコントロール (RBAC) を使用できます。Valkey または Redis OSS AUTH では、トークンが認証されていれば認証済みのすべてのクライアントにレプリケーショングループへのフルアクセスが認められますが、RBAC はこれとは異なり、クラスターへのアクセスをユーザーグループを介して制御できます。これらのユーザーグループは、レプリケーショングループへのアクセスを分類する方法として設計されています。詳細については、「ロールベースのアクセスコントロール (RBAC)」を参照してください。

次のプログラムを ClusterModeEnabledWithRBAC.py というファイルにコピーアンドペーストします。

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_enabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumNodeGroups=1,ReplicasPerNodeGroup=1, ReplicationGroupDescription='Sample cache with cluster mode enabled',ReplicationGroupId=None,UserGroupIds=None, SecurityGroupIds=None,CacheSubnetGroupName=None,CacheParameterGroupName='default.valkey7.2.cluster.on'): """Creates an ElastiCache Cluster with cluster mode enabled and RBAC Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumNodeGroups: Number of shards in the cluster. Minimum 1 and maximun 90. If not specified, cluster will be created with 1 shard. :param ReplicasPerNodeGroup: Number of replicas per shard. If not specified 1 replica per shard will be created. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Name for the cluster. :param CacheParameterGroupName: Parameter group to be used. Must be compatible with the engine version and cluster mode enabled. :return: dictionary with the API results """ if not ReplicationGroupId: return 'ReplicationGroupId parameter is required' elif not isinstance(UserGroupIds,(list)): return {'Error': 'UserGroupIds parameter is required and must be a list'} params={'AutomaticFailoverEnabled': True, 'CacheNodeType': CacheNodeType, 'Engine': 'valkey', 'EngineVersion': EngineVersion, 'ReplicationGroupDescription': ReplicationGroupDescription, 'ReplicationGroupId': ReplicationGroupId, 'SnapshotRetentionLimit': 30, 'TransitEncryptionEnabled': True, 'UserGroupIds':UserGroupIds, 'NumNodeGroups': NumNodeGroups, 'ReplicasPerNodeGroup': ReplicasPerNodeGroup, 'CacheParameterGroupName': CacheParameterGroupName } # defaults will be used if CacheSubnetGroupName or SecurityGroups are not explicit. if isinstance(SecurityGroupIds,(list)): params.update({'SecurityGroupIds':SecurityGroupIds}) if CacheSubnetGroupName: params.update({'CacheSubnetGroupName':CacheSubnetGroupName}) response = client.create_replication_group(**params) return response if __name__ == '__main__': # Creates a cluster mode enabled cluster response = create_cluster_mode_enabled( CacheNodeType='cache.m6g.large', EngineVersion='7.2', ReplicationGroupDescription='Valkey cluster mode enabled with replicas', ReplicationGroupId='valkey2021', # Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary (implicit) and 2 replicas (replicasPerNodeGroup) NumNodeGroups=2, ReplicasPerNodeGroup=1, UserGroupIds=[ 'mygroup' ], SecurityGroupIds=[ 'sg-7cc73803' ], CacheSubnetGroupName='default' ) logging.info(response)

このプログラムを実行するには、次のコマンドを入力します。

python ClusterModeEnabledWithRBAC.py

詳細については、「ElastiCache でのクラスターの管理」を参照してください。

ユーザー/ユーザーグループが存在するかどうかを確認し、そうでない場合は作成する

RBAC では、ユーザーを作成し、アクセス文字列を使用して特定のアクセス許可を割り当てます。特定の役割 (管理者、人事) に対応したユーザーグループにユーザーを割り当て、その後、ユーザーグループが 1 つ以上の ElastiCache (Redis OSS) レプリケーショングループにデプロイされます。これにより、同じ Valkey または Redis OSS レプリケーショングループを使用するクライアント間にセキュリティ境界を設定し、クライアントが互いのデータにアクセスできないようにすることができます。詳細については、「ロールベースのアクセスコントロール (RBAC)」を参照してください。

次のプログラムを UserAndUserGroups.py というファイルにコピーアンドペーストします。認証情報を提供するメカニズムを更新します。この例の認証情報は交換可能と表示され、宣言されていない項目が割り当てられています。認証情報をハードコーディングすることは避けてください。

この例では、ユーザーのアクセス許可を持つアクセス文字列を使用します。アクセス文字列の詳細については、「アクセス文字列を使用したアクセス許可の指定」を参照してください。

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def check_user_exists(UserId): """Checks if UserId exists Returns True if UserId exists, otherwise False :param UserId: ElastiCache User ID :return: True|False """ try: response = client.describe_users( UserId=UserId, ) if response['Users'][0]['UserId'].lower() == UserId.lower(): return True except Exception as e: if e.response['Error']['Code'] == 'UserNotFound': logging.info(e.response['Error']) return False else: raise def check_group_exists(UserGroupId): """Checks if UserGroupID exists Returns True if Group ID exists, otherwise False :param UserGroupId: ElastiCache User ID :return: True|False """ try: response = client.describe_user_groups( UserGroupId=UserGroupId ) if response['UserGroups'][0]['UserGroupId'].lower() == UserGroupId.lower(): return True except Exception as e: if e.response['Error']['Code'] == 'UserGroupNotFound': logging.info(e.response['Error']) return False else: raise def create_user(UserId=None,UserName=None,Password=None,AccessString=None): """Creates a new user Returns the ARN for the newly created user or the error message :param UserId: ElastiCache user ID. User IDs must be unique :param UserName: ElastiCache user name. ElastiCache allows multiple users with the same name as long as the associated user ID is unique. :param Password: Password for user. Must have at least 16 chars. :param AccessString: Access string with the permissions for the user. :return: user ARN """ try: response = client.create_user( UserId=UserId, UserName=UserName, Engine='Redis', Passwords=[Password], AccessString=AccessString, NoPasswordRequired=False ) return response['ARN'] except Exception as e: logging.info(e.response['Error']) return e.response['Error'] def create_group(UserGroupId=None, UserIds=None): """Creates a new group. A default user is required (mandatory) and should be specified in the UserIds list Return: Group ARN :param UserIds: List with user IDs to be associated with the new group. A default user is required :param UserGroupId: The ID (name) for the group :return: Group ARN """ try: response = client.create_user_group( UserGroupId=UserGroupId, Engine='Redis', UserIds=UserIds ) return response['ARN'] except Exception as e: logging.info(e.response['Error']) if __name__ == '__main__': groupName='mygroup2' userName = 'myuser2' userId=groupName+'-'+userName # Creates a new user if the user ID does not exist. for tmpUserId,tmpUserName in [ (userId,userName), (groupName+'-default','default')]: if not check_user_exists(tmpUserId): response=create_user(UserId=tmpUserId, UserName=EXAMPLE,Password=EXAMPLE,AccessString='on ~* +@all') logging.info(response) # assigns the new user ID to the user group if not check_group_exists(groupName): UserIds = [ userId , groupName+'-default'] response=create_group(UserGroupId=groupName,UserIds=UserIds) logging.info(response)

このプログラムを実行するには、次のコマンドを入力します。

python UserAndUserGroups.py

チュートリアル: ElastiCache への接続

次の例では、Valkey または Redis OSS クライアントを使用して ElastiCache に接続します。

クラスタモードが無効のクラスターへの接続

次のプログラムを ConnectClusterModeDisabled.py というファイルにコピーアンドペーストします。認証情報を提供するメカニズムを更新します。この例の認証情報は交換可能と表示され、宣言されていない項目が割り当てられています。認証情報をハードコーディングすることは避けてください。

from redis import Redis import logging logging.basicConfig(level=logging.INFO) redis = Redis(host='primary.xxx.yyyyyy.zzz1.cache.amazonaws.com', port=6379, decode_responses=True, ssl=True, username=example, password=EXAMPLE) if redis.ping(): logging.info("Connected to Redis")

このプログラムを実行するには、次のコマンドを入力します。

python ConnectClusterModeDisabled.py

クラスタモードが有効のクラスターへの接続

次のプログラムを ConnectClusterModeEnabled.py というファイルにコピーアンドペーストします。

from rediscluster import RedisCluster import logging logging.basicConfig(level=logging.INFO) redis = RedisCluster(startup_nodes=[{"host": "xxx.yyy.clustercfg.zzz1.cache.amazonaws.com","port": "6379"}], decode_responses=True,skip_full_coverage_check=True) if redis.ping(): logging.info("Connected to Redis")

このプログラムを実行するには、次のコマンドを入力します。

python ConnectClusterModeEnabled.py

使用例

次の例では、ElastiCache 用の boto3 SDK を使用して ElastiCache (Redis OSS) を操作します。

文字列の設定と取得

次のプログラムを SetAndGetStrings.py というファイルにコピーアンドペーストします。

import time import logging logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s') keyName='mykey' currTime=time.ctime(time.time()) # Set the key 'mykey' with the current date and time as value. # The Key will expire and removed from cache in 60 seconds. redis.set(keyName, currTime, ex=60) # Sleep just for better illustration of TTL (expiration) value time.sleep(5) # Retrieve the key value and current TTL keyValue=redis.get(keyName) keyTTL=redis.ttl(keyName) logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValue, keyTTL))

このプログラムを実行するには、次のコマンドを入力します。

python SetAndGetStrings.py

複数の項目があるハッシュを設定して取得する

次のプログラムを SetAndGetHash.py というファイルにコピーアンドペーストします。

import logging import time logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s') keyName='mykey' keyValues={'datetime': time.ctime(time.time()), 'epochtime': time.time()} # Set the hash 'mykey' with the current date and time in human readable format (datetime field) and epoch number (epochtime field). redis.hset(keyName, mapping=keyValues) # Set the key to expire and removed from cache in 60 seconds. redis.expire(keyName, 60) # Sleep just for better illustration of TTL (expiration) value time.sleep(5) # Retrieves all the fields and current TTL keyValues=redis.hgetall(keyName) keyTTL=redis.ttl(keyName) logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValues, keyTTL))

このプログラムを実行するには、次のコマンドを入力します。

python SetAndGetHash.py

Pub/Sub チャンネルから公開 (書き込み) および登録 (読み取り) する

次のプログラムを PubAndSub.py というファイルにコピーアンドペーストします。

import logging import time def handlerFunction(message): """Prints message got from PubSub channel to the log output Return None :param message: message to log """ logging.info(message) logging.basicConfig(level=logging.INFO) redis = Redis(host="redis202104053.tihewd.ng.0001.use1.cache.amazonaws.com", port=6379, decode_responses=True) # Creates the subscriber connection on "mychannel" subscriber = redis.pubsub() subscriber.subscribe(**{'mychannel': handlerFunction}) # Creates a new thread to watch for messages while the main process continues with its routines thread = subscriber.run_in_thread(sleep_time=0.01) # Creates publisher connection on "mychannel" redis.publish('mychannel', 'My message') # Publishes several messages. Subscriber thread will read and print on log. while True: redis.publish('mychannel',time.ctime(time.time())) time.sleep(1)

このプログラムを実行するには、次のコマンドを入力します。

python PubAndSub.py

ストリームからの書き込みおよび読み取り

次のプログラムを ReadWriteStream.py というファイルにコピーアンドペーストします。

from redis import Redis import redis.exceptions as exceptions import logging import time import threading logging.basicConfig(level=logging.INFO) def writeMessage(streamName): """Starts a loop writting the current time and thread name to 'streamName' :param streamName: Stream (key) name to write messages. """ fieldsDict={'writerId':threading.currentThread().getName(),'myvalue':None} while True: fieldsDict['myvalue'] = time.ctime(time.time()) redis.xadd(streamName,fieldsDict) time.sleep(1) def readMessage(groupName=None,streamName=None): """Starts a loop reading from 'streamName' Multiple threads will read from the same stream consumer group. Consumer group is used to coordinate data distribution. Once a thread acknowleges the message, it won't be provided again. If message wasn't acknowledged, it can be served to another thread. :param groupName: stream group were multiple threads will read. :param streamName: Stream (key) name where messages will be read. """ readerID=threading.currentThread().getName() while True: try: # Check if the stream has any message if redis.xlen(streamName)>0: # Check if if the messages are new (not acknowledged) or not (already processed) streamData=redis.xreadgroup(groupName,readerID,{streamName:'>'},count=1) if len(streamData) > 0: msgId,message = streamData[0][1][0] logging.info("{}: Got {} from ID {}".format(readerID,message,msgId)) #Do some processing here. If the message has been processed sucessfuly, acknowledge it and (optional) delete the message. redis.xack(streamName,groupName,msgId) logging.info("Stream message ID {} read and processed successfuly by {}".format(msgId,readerID)) redis.xdel(streamName,msgId) else: pass except: raise time.sleep(0.5) # Creates the stream 'mystream' and consumer group 'myworkergroup' where multiple threads will write/read. try: redis.xgroup_create('mystream','myworkergroup',mkstream=True) except exceptions.ResponseError as e: logging.info("Consumer group already exists. Will continue despite the error: {}".format(e)) except: raise # Starts 5 writer threads. for writer_no in range(5): writerThread = threading.Thread(target=writeMessage, name='writer-'+str(writer_no), args=('mystream',),daemon=True) writerThread.start() # Starts 10 reader threads for reader_no in range(10): readerThread = threading.Thread(target=readMessage, name='reader-'+str(reader_no), args=('myworkergroup','mystream',),daemon=True) readerThread.daemon = True readerThread.start() # Keep the code running for 30 seconds time.sleep(30)

このプログラムを実行するには、次のコマンドを入力します。

python ReadWriteStream.py