Python 和 ElastiCache - Amazon ElastiCache

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Python 和 ElastiCache

在本教程中,您将使用 for Python (Boto3) 编写 AWS SDK用于执行以下 ElastiCache 操作的简单程序:

  • ElastiCache 为 Redis OSS 集群创建(启用集群模式和禁用集群模式)

  • 检查用户或用户组是否存在,如不存在,请进行创建。(此功能在 Valkey 7.2 及更高版本以及 Redis OSS 6.0 到 7.1 中可用。)

  • 连接到 ElastiCache

  • 执行操作,如设置和获取字符串,读取和写入流,以及从 Pub/Sub 频道发布和订阅。

在学习本教程时,你可以参阅 Python (Boto) 文档。 AWS SDK以下部分特定于 ElastiCache:ElastiCache 低级客户端

教程的先决条件

教程:创建 ElastiCache 集群和用户

以下示例使用 boto3 SDK 进行 Redis OSS 管理操作(创建集群或用户),使用 redis-py/ 进行数据处理redis-py-cluster 。 ElastiCache

创建已禁用集群模式的集群

复制以下程序并将其粘贴到名为 CreateClusterModeDisabledCluster.py 的文件中。

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_disabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumCacheClusters=2,ReplicationGroupDescription='Sample cache cluster',ReplicationGroupId=None): """Creates an ElastiCache Cluster with cluster mode disabled Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumCacheClusters: Number of nodes in the cluster. Minimum 1 (just a primary node) and maximun 6 (1 primary and 5 replicas). If not specified, cluster will be created with 1 primary and 1 replica. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Name for the cluster :return: dictionary with the API results """ if not ReplicationGroupId: return 'ReplicationGroupId parameter is required' response = client.create_replication_group( AutomaticFailoverEnabled=True, CacheNodeType=CacheNodeType, Engine='valkey', EngineVersion=EngineVersion, NumCacheClusters=NumCacheClusters, ReplicationGroupDescription=ReplicationGroupDescription, ReplicationGroupId=ReplicationGroupId, SnapshotRetentionLimit=30, ) return response if __name__ == '__main__': # Creates an ElastiCache Cluster mode disabled cluster, based on cache.m6g.large nodes, Valkey 8.0, one primary and two replicas elasticacheResponse = create_cluster_mode_disabled( #CacheNodeType='cache.m6g.large', EngineVersion='8.0', NumCacheClusters=3, ReplicationGroupDescription='Valkey cluster mode disabled with replicas', ReplicationGroupId='valkey202104053' ) logging.info(elasticacheResponse)

要运行该程序,请输入以下命令:

python CreateClusterModeDisabledCluster.py

有关更多信息,请参阅 管理中的集群 ElastiCache

使用和创建已禁用集群模式TLS的集群 RBAC

为确保安全,您可以在创建禁用集群模式的集群时使用传输层安全 (TLSRBAC) 和基于角色的访问控制 ()。与 Valkey 或 Redis 不同 OSSAUTH,所有经过身份验证的客户端在令牌经过身份验证后都具有完全的复制组访问权限,这RBAC使您可以通过用户组控制集群访问权限。这些用户组旨在作为一种管理对复制组的访问权限的方式。有关更多信息,请参阅 基于角色的访问控制 () RBAC

复制以下程序并将其粘贴到名为 ClusterModeDisabledWithRBAC.py 的文件中。

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_disabled_rbac(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumCacheClusters=2,ReplicationGroupDescription='Sample cache cluster',ReplicationGroupId=None, UserGroupIds=None, SecurityGroupIds=None,CacheSubnetGroupName=None): """Creates an ElastiCache Cluster with cluster mode disabled and RBAC Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumCacheClusters: Number of nodes in the cluster. Minimum 1 (just a primary node) and maximun 6 (1 primary and 5 replicas). If not specified, cluster will be created with 1 primary and 1 replica. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Mandatory name for the cluster. :param UserGroupIds: The ID of the user group to be assigned to the cluster. :param SecurityGroupIds: List of security groups to be assigned. If not defined, default will be used :param CacheSubnetGroupName: subnet group where the cluster will be placed. If not defined, default will be used. :return: dictionary with the API results """ if not ReplicationGroupId: return {'Error': 'ReplicationGroupId parameter is required'} elif not isinstance(UserGroupIds,(list)): return {'Error': 'UserGroupIds parameter is required and must be a list'} params={'AutomaticFailoverEnabled': True, 'CacheNodeType': CacheNodeType, 'Engine': 'valkey', 'EngineVersion': EngineVersion, 'NumCacheClusters': NumCacheClusters, 'ReplicationGroupDescription': ReplicationGroupDescription, 'ReplicationGroupId': ReplicationGroupId, 'SnapshotRetentionLimit': 30, 'TransitEncryptionEnabled': True, 'UserGroupIds':UserGroupIds } # defaults will be used if CacheSubnetGroupName or SecurityGroups are not explicit. if isinstance(SecurityGroupIds,(list)): params.update({'SecurityGroupIds':SecurityGroupIds}) if CacheSubnetGroupName: params.update({'CacheSubnetGroupName':CacheSubnetGroupName}) response = client.create_replication_group(**params) return response if __name__ == '__main__': # Creates an ElastiCache Cluster mode disabled cluster, based on cache.m6g.large nodes, Valkey 8.0, one primary and two replicas. # Assigns the existent user group "mygroup" for RBAC authentication response=create_cluster_mode_disabled_rbac( CacheNodeType='cache.m6g.large', EngineVersion='8.0', NumCacheClusters=3, ReplicationGroupDescription='Valkey cluster mode disabled with replicas', ReplicationGroupId='valkey202104', UserGroupIds=[ 'mygroup' ], SecurityGroupIds=[ 'sg-7cc73803' ], CacheSubnetGroupName='default' ) logging.info(response)

要运行该程序,请输入以下命令:

python ClusterModeDisabledWithRBAC.py

有关更多信息,请参阅 管理中的集群 ElastiCache

创建已启用集群模式的集群

复制以下程序并将其粘贴到名为 ClusterModeEnabled.py 的文件中。

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_enabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumNodeGroups=1,ReplicasPerNodeGroup=1, ReplicationGroupDescription='Sample cache with cluster mode enabled',ReplicationGroupId=None): """Creates an ElastiCache Cluster with cluster mode enabled Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumNodeGroups: Number of shards in the cluster. Minimum 1 and maximun 90. If not specified, cluster will be created with 1 shard. :param ReplicasPerNodeGroup: Number of replicas per shard. If not specified 1 replica per shard will be created. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Name for the cluster :return: dictionary with the API results """ if not ReplicationGroupId: return 'ReplicationGroupId parameter is required' response = client.create_replication_group( AutomaticFailoverEnabled=True, CacheNodeType=CacheNodeType, Engine='valkey', EngineVersion=EngineVersion, ReplicationGroupDescription=ReplicationGroupDescription, ReplicationGroupId=ReplicationGroupId, # Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary node (implicit) and 2 replicas (replicasPerNodeGroup) NumNodeGroups=NumNodeGroups, ReplicasPerNodeGroup=ReplicasPerNodeGroup, CacheParameterGroupName='default.valkey7.2.cluster.on' ) return response # Creates a cluster mode enabled response = create_cluster_mode_enabled( CacheNodeType='cache.m6g.large', EngineVersion='6.0', ReplicationGroupDescription='Valkey cluster mode enabled with replicas', ReplicationGroupId='valkey20210', # Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary (implicit) and 2 replicas (replicasPerNodeGroup) NumNodeGroups=2, ReplicasPerNodeGroup=1, ) logging.info(response)

要运行该程序,请输入以下命令:

python ClusterModeEnabled.py

有关更多信息,请参阅 管理中的集群 ElastiCache

使用和创建启用集群模式TLS的集群 RBAC

为确保安全,您可以在创建启用集群模式的集群时使用传输层安全 (TLSRBAC) 和基于角色的访问控制 ()。与 Valkey 或 Redis 不同 OSSAUTH,所有经过身份验证的客户端在令牌经过身份验证后都具有完全的复制组访问权限,这RBAC使您可以通过用户组控制集群访问权限。这些用户组旨在作为一种管理对复制组的访问权限的方式。有关更多信息,请参阅 基于角色的访问控制 () RBAC

复制以下程序并将其粘贴到名为 ClusterModeEnabledWithRBAC.py 的文件中。

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_enabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumNodeGroups=1,ReplicasPerNodeGroup=1, ReplicationGroupDescription='Sample cache with cluster mode enabled',ReplicationGroupId=None,UserGroupIds=None, SecurityGroupIds=None,CacheSubnetGroupName=None,CacheParameterGroupName='default.valkey7.2.cluster.on'): """Creates an ElastiCache Cluster with cluster mode enabled and RBAC Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumNodeGroups: Number of shards in the cluster. Minimum 1 and maximun 90. If not specified, cluster will be created with 1 shard. :param ReplicasPerNodeGroup: Number of replicas per shard. If not specified 1 replica per shard will be created. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Name for the cluster. :param CacheParameterGroupName: Parameter group to be used. Must be compatible with the engine version and cluster mode enabled. :return: dictionary with the API results """ if not ReplicationGroupId: return 'ReplicationGroupId parameter is required' elif not isinstance(UserGroupIds,(list)): return {'Error': 'UserGroupIds parameter is required and must be a list'} params={'AutomaticFailoverEnabled': True, 'CacheNodeType': CacheNodeType, 'Engine': 'valkey', 'EngineVersion': EngineVersion, 'ReplicationGroupDescription': ReplicationGroupDescription, 'ReplicationGroupId': ReplicationGroupId, 'SnapshotRetentionLimit': 30, 'TransitEncryptionEnabled': True, 'UserGroupIds':UserGroupIds, 'NumNodeGroups': NumNodeGroups, 'ReplicasPerNodeGroup': ReplicasPerNodeGroup, 'CacheParameterGroupName': CacheParameterGroupName } # defaults will be used if CacheSubnetGroupName or SecurityGroups are not explicit. if isinstance(SecurityGroupIds,(list)): params.update({'SecurityGroupIds':SecurityGroupIds}) if CacheSubnetGroupName: params.update({'CacheSubnetGroupName':CacheSubnetGroupName}) response = client.create_replication_group(**params) return response if __name__ == '__main__': # Creates a cluster mode enabled cluster response = create_cluster_mode_enabled( CacheNodeType='cache.m6g.large', EngineVersion='7.2', ReplicationGroupDescription='Valkey cluster mode enabled with replicas', ReplicationGroupId='valkey2021', # Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary (implicit) and 2 replicas (replicasPerNodeGroup) NumNodeGroups=2, ReplicasPerNodeGroup=1, UserGroupIds=[ 'mygroup' ], SecurityGroupIds=[ 'sg-7cc73803' ], CacheSubnetGroupName='default' ) logging.info(response)

要运行该程序,请输入以下命令:

python ClusterModeEnabledWithRBAC.py

有关更多信息,请参阅 管理中的集群 ElastiCache

检查用户/用户组是否存在,如不存在,请创建用户/用户组

使用RBAC,您可以创建用户并使用访问字符串为他们分配特定权限。您可以将用户分配到与特定角色(管理员、人力资源)对应的用户组,然后将这些用户组部署到一个或多 ElastiCache 个 Redis OSS 复制组中。通过这样做,您可以在使用相同的 Valkey 或 Redis OSS 复制组的客户端之间建立安全边界,并防止客户端访问彼此的数据。有关更多信息,请参阅 基于角色的访问控制 () RBAC

复制以下程序并将其粘贴到名为 UserAndUserGroups.py 的文件中。更新用于提供凭证的机制。此示例中的凭证显示为可替换凭证,并分配了未声明的项目。避免对凭证进行硬编码。

此示例使用了包含用户权限的访问字符串。有关访问字符串的更多信息,请参阅使用访问字符串指定权限

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def check_user_exists(UserId): """Checks if UserId exists Returns True if UserId exists, otherwise False :param UserId: ElastiCache User ID :return: True|False """ try: response = client.describe_users( UserId=UserId, ) if response['Users'][0]['UserId'].lower() == UserId.lower(): return True except Exception as e: if e.response['Error']['Code'] == 'UserNotFound': logging.info(e.response['Error']) return False else: raise def check_group_exists(UserGroupId): """Checks if UserGroupID exists Returns True if Group ID exists, otherwise False :param UserGroupId: ElastiCache User ID :return: True|False """ try: response = client.describe_user_groups( UserGroupId=UserGroupId ) if response['UserGroups'][0]['UserGroupId'].lower() == UserGroupId.lower(): return True except Exception as e: if e.response['Error']['Code'] == 'UserGroupNotFound': logging.info(e.response['Error']) return False else: raise def create_user(UserId=None,UserName=None,Password=None,AccessString=None): """Creates a new user Returns the ARN for the newly created user or the error message :param UserId: ElastiCache user ID. User IDs must be unique :param UserName: ElastiCache user name. ElastiCache allows multiple users with the same name as long as the associated user ID is unique. :param Password: Password for user. Must have at least 16 chars. :param AccessString: Access string with the permissions for the user. :return: user ARN """ try: response = client.create_user( UserId=UserId, UserName=UserName, Engine='Redis', Passwords=[Password], AccessString=AccessString, NoPasswordRequired=False ) return response['ARN'] except Exception as e: logging.info(e.response['Error']) return e.response['Error'] def create_group(UserGroupId=None, UserIds=None): """Creates a new group. A default user is required (mandatory) and should be specified in the UserIds list Return: Group ARN :param UserIds: List with user IDs to be associated with the new group. A default user is required :param UserGroupId: The ID (name) for the group :return: Group ARN """ try: response = client.create_user_group( UserGroupId=UserGroupId, Engine='Redis', UserIds=UserIds ) return response['ARN'] except Exception as e: logging.info(e.response['Error']) if __name__ == '__main__': groupName='mygroup2' userName = 'myuser2' userId=groupName+'-'+userName # Creates a new user if the user ID does not exist. for tmpUserId,tmpUserName in [ (userId,userName), (groupName+'-default','default')]: if not check_user_exists(tmpUserId): response=create_user(UserId=tmpUserId, UserName=EXAMPLE,Password=EXAMPLE,AccessString='on ~* +@all') logging.info(response) # assigns the new user ID to the user group if not check_group_exists(groupName): UserIds = [ userId , groupName+'-default'] response=create_group(UserGroupId=groupName,UserIds=UserIds) logging.info(response)

要运行该程序,请输入以下命令:

python UserAndUserGroups.py

教程:连接到 ElastiCache

以下示例使用 Valkey 或 Redis OSS 客户端进行连接。 ElastiCache

连接到已禁用集群模式的集群

复制以下程序并将其粘贴到名为 ConnectClusterModeDisabled.py 的文件中。更新用于提供凭证的机制。此示例中的凭证显示为可替换凭证,并分配了未声明的项目。避免对凭证进行硬编码。

from redis import Redis import logging logging.basicConfig(level=logging.INFO) redis = Redis(host='primary.xxx.yyyyyy.zzz1.cache.amazonaws.com', port=6379, decode_responses=True, ssl=True, username=example, password=EXAMPLE) if redis.ping(): logging.info("Connected to Redis")

要运行该程序,请输入以下命令:

python ConnectClusterModeDisabled.py

连接到已启用集群模式的集群

复制以下程序并将其粘贴到名为 ConnectClusterModeEnabled.py 的文件中。

from rediscluster import RedisCluster import logging logging.basicConfig(level=logging.INFO) redis = RedisCluster(startup_nodes=[{"host": "xxx.yyy.clustercfg.zzz1.cache.amazonaws.com","port": "6379"}], decode_responses=True,skip_full_coverage_check=True) if redis.ping(): logging.info("Connected to Redis")

要运行该程序,请输入以下命令:

python ConnectClusterModeEnabled.py

用法示例

以下示例使用 boto3 for 在 SDK Red ElastiCache is 中使用 ElastiCache 。OSS

设置和获取字符串

复制以下程序并将其粘贴到名为 SetAndGetStrings.py 的文件中。

import time import logging logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s') keyName='mykey' currTime=time.ctime(time.time()) # Set the key 'mykey' with the current date and time as value. # The Key will expire and removed from cache in 60 seconds. redis.set(keyName, currTime, ex=60) # Sleep just for better illustration of TTL (expiration) value time.sleep(5) # Retrieve the key value and current TTL keyValue=redis.get(keyName) keyTTL=redis.ttl(keyName) logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValue, keyTTL))

要运行该程序,请输入以下命令:

python SetAndGetStrings.py

设置和获取包含多个项目的哈希

复制以下程序并将其粘贴到名为 SetAndGetHash.py 的文件中。

import logging import time logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s') keyName='mykey' keyValues={'datetime': time.ctime(time.time()), 'epochtime': time.time()} # Set the hash 'mykey' with the current date and time in human readable format (datetime field) and epoch number (epochtime field). redis.hset(keyName, mapping=keyValues) # Set the key to expire and removed from cache in 60 seconds. redis.expire(keyName, 60) # Sleep just for better illustration of TTL (expiration) value time.sleep(5) # Retrieves all the fields and current TTL keyValues=redis.hgetall(keyName) keyTTL=redis.ttl(keyName) logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValues, keyTTL))

要运行该程序,请输入以下命令:

python SetAndGetHash.py

从 Pub/Sub 频道发布(写入)和订阅(读取)

复制以下程序并将其粘贴到名为 PubAndSub.py 的文件中。

import logging import time def handlerFunction(message): """Prints message got from PubSub channel to the log output Return None :param message: message to log """ logging.info(message) logging.basicConfig(level=logging.INFO) redis = Redis(host="redis202104053.tihewd.ng.0001.use1.cache.amazonaws.com", port=6379, decode_responses=True) # Creates the subscriber connection on "mychannel" subscriber = redis.pubsub() subscriber.subscribe(**{'mychannel': handlerFunction}) # Creates a new thread to watch for messages while the main process continues with its routines thread = subscriber.run_in_thread(sleep_time=0.01) # Creates publisher connection on "mychannel" redis.publish('mychannel', 'My message') # Publishes several messages. Subscriber thread will read and print on log. while True: redis.publish('mychannel',time.ctime(time.time())) time.sleep(1)

要运行该程序,请输入以下命令:

python PubAndSub.py

从流中写入和读取

复制以下程序并将其粘贴到名为 ReadWriteStream.py 的文件中。

from redis import Redis import redis.exceptions as exceptions import logging import time import threading logging.basicConfig(level=logging.INFO) def writeMessage(streamName): """Starts a loop writting the current time and thread name to 'streamName' :param streamName: Stream (key) name to write messages. """ fieldsDict={'writerId':threading.currentThread().getName(),'myvalue':None} while True: fieldsDict['myvalue'] = time.ctime(time.time()) redis.xadd(streamName,fieldsDict) time.sleep(1) def readMessage(groupName=None,streamName=None): """Starts a loop reading from 'streamName' Multiple threads will read from the same stream consumer group. Consumer group is used to coordinate data distribution. Once a thread acknowleges the message, it won't be provided again. If message wasn't acknowledged, it can be served to another thread. :param groupName: stream group were multiple threads will read. :param streamName: Stream (key) name where messages will be read. """ readerID=threading.currentThread().getName() while True: try: # Check if the stream has any message if redis.xlen(streamName)>0: # Check if if the messages are new (not acknowledged) or not (already processed) streamData=redis.xreadgroup(groupName,readerID,{streamName:'>'},count=1) if len(streamData) > 0: msgId,message = streamData[0][1][0] logging.info("{}: Got {} from ID {}".format(readerID,message,msgId)) #Do some processing here. If the message has been processed sucessfuly, acknowledge it and (optional) delete the message. redis.xack(streamName,groupName,msgId) logging.info("Stream message ID {} read and processed successfuly by {}".format(msgId,readerID)) redis.xdel(streamName,msgId) else: pass except: raise time.sleep(0.5) # Creates the stream 'mystream' and consumer group 'myworkergroup' where multiple threads will write/read. try: redis.xgroup_create('mystream','myworkergroup',mkstream=True) except exceptions.ResponseError as e: logging.info("Consumer group already exists. Will continue despite the error: {}".format(e)) except: raise # Starts 5 writer threads. for writer_no in range(5): writerThread = threading.Thread(target=writeMessage, name='writer-'+str(writer_no), args=('mystream',),daemon=True) writerThread.start() # Starts 10 reader threads for reader_no in range(10): readerThread = threading.Thread(target=readMessage, name='reader-'+str(reader_no), args=('myworkergroup','mystream',),daemon=True) readerThread.daemon = True readerThread.start() # Keep the code running for 30 seconds time.sleep(30)

要运行该程序,请输入以下命令:

python ReadWriteStream.py