本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在本教程中,您将使用适用于 Python 的 AWS SDK (Boto3) 来编写用于执行以下操作的简单程序: ElastiCache
ElastiCache 为 Redis OSS 集群创建(启用集群模式和禁用集群模式)
检查用户或用户组是否存在,如不存在,请进行创建。(此功能在 Valkey 7.2 及更高版本以及 Redis OSS 6.0 至 7.1 中可用。)
连接到 ElastiCache
执行操作,如设置和获取字符串,读取和写入流,以及从 Pub/Sub 频道发布和订阅。
在学习本教程时,你可以参考适用于 Python 的 AWS 软件开发工具包 (Boto) 文档。以下部分特定于 ElastiCache:ElastiCache 低级客户端
教程的先决条件
设置 AWS 访问密钥以使用 AWS SDKs。有关更多信息,请参阅 设置 ElastiCache。
安装 Python 3.0 或更高版本。欲了解更多信息,请参阅 https://www.python.org/downloads
。有关说明,请参阅 Boto 3 文档中的快速入门 。
教程:创建 ElastiCache 集群和用户
以下示例使用 boto3 开发工具包进行 Redis OSS 管理操作(创建集群或用户),使用 redis redis-py-cluster -py/ 进行数据处理。 ElastiCache
主题
创建已禁用集群模式的集群
复制以下程序并将其粘贴到名为 CreateClusterModeDisabledCluster.py 的文件中。
import boto3
import logging
logging.basicConfig(level=logging.INFO)
client = boto3.client('elasticache')
def create_cluster_mode_disabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumCacheClusters=2,ReplicationGroupDescription='Sample cache cluster',ReplicationGroupId=None):
"""Creates an ElastiCache Cluster with cluster mode disabled
Returns a dictionary with the API response
:param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used
Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types
:param EngineVersion: Engine version to be used. If not specified, latest will be used.
:param NumCacheClusters: Number of nodes in the cluster. Minimum 1 (just a primary node) and maximun 6 (1 primary and 5 replicas).
If not specified, cluster will be created with 1 primary and 1 replica.
:param ReplicationGroupDescription: Description for the cluster.
:param ReplicationGroupId: Name for the cluster
:return: dictionary with the API results
"""
if not ReplicationGroupId:
return 'ReplicationGroupId parameter is required'
response = client.create_replication_group(
AutomaticFailoverEnabled=True,
CacheNodeType=CacheNodeType,
Engine='valkey',
EngineVersion=EngineVersion,
NumCacheClusters=NumCacheClusters,
ReplicationGroupDescription=ReplicationGroupDescription,
ReplicationGroupId=ReplicationGroupId,
SnapshotRetentionLimit=30,
)
return response
if __name__ == '__main__':
# Creates an ElastiCache Cluster mode disabled cluster, based on cache.m6g.large nodes, Valkey 8.0, one primary and two replicas
elasticacheResponse = create_cluster_mode_disabled(
#CacheNodeType='cache.m6g.large',
EngineVersion='8.0',
NumCacheClusters=3,
ReplicationGroupDescription='Valkey cluster mode disabled with replicas',
ReplicationGroupId='valkey202104053'
)
logging.info(elasticacheResponse)
要运行该程序,请输入以下命令:
python CreateClusterModeDisabledCluster.py
有关更多信息,请参阅 在中管理集群 ElastiCache。
创建采用 TLS 和 RBAC 且已禁用集群模式的集群
为确保安全性,在创建已禁用集群模式的集群时,您可以使用传输层安全性 (TLS) 和基于角色的访问控制 (RBAC)。与 Valkey 或 Redis OSS AUTH(如果对客户端令牌进行身份验证,则所有经过身份验证的客户端都对复制组具有完全的访问权限)不同的是,RBAC 让您能够通过用户组控制集群访问权限。这些用户组旨在作为一种管理对复制组的访问权限的方式。有关更多信息,请参阅 基于角色的访问控制(RBAC)。
复制以下程序并将其粘贴到名为 ClusterModeDisabledWithRBAC.py 的文件中。
import boto3
import logging
logging.basicConfig(level=logging.INFO)
client = boto3.client('elasticache')
def create_cluster_mode_disabled_rbac(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumCacheClusters=2,ReplicationGroupDescription='Sample cache cluster',ReplicationGroupId=None, UserGroupIds=None, SecurityGroupIds=None,CacheSubnetGroupName=None):
"""Creates an ElastiCache Cluster with cluster mode disabled and RBAC
Returns a dictionary with the API response
:param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used
Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types
:param EngineVersion: Engine version to be used. If not specified, latest will be used.
:param NumCacheClusters: Number of nodes in the cluster. Minimum 1 (just a primary node) and maximun 6 (1 primary and 5 replicas).
If not specified, cluster will be created with 1 primary and 1 replica.
:param ReplicationGroupDescription: Description for the cluster.
:param ReplicationGroupId: Mandatory name for the cluster.
:param UserGroupIds: The ID of the user group to be assigned to the cluster.
:param SecurityGroupIds: List of security groups to be assigned. If not defined, default will be used
:param CacheSubnetGroupName: subnet group where the cluster will be placed. If not defined, default will be used.
:return: dictionary with the API results
"""
if not ReplicationGroupId:
return {'Error': 'ReplicationGroupId parameter is required'}
elif not isinstance(UserGroupIds,(list)):
return {'Error': 'UserGroupIds parameter is required and must be a list'}
params={'AutomaticFailoverEnabled': True,
'CacheNodeType': CacheNodeType,
'Engine': 'valkey',
'EngineVersion': EngineVersion,
'NumCacheClusters': NumCacheClusters,
'ReplicationGroupDescription': ReplicationGroupDescription,
'ReplicationGroupId': ReplicationGroupId,
'SnapshotRetentionLimit': 30,
'TransitEncryptionEnabled': True,
'UserGroupIds':UserGroupIds
}
# defaults will be used if CacheSubnetGroupName or SecurityGroups are not explicit.
if isinstance(SecurityGroupIds,(list)):
params.update({'SecurityGroupIds':SecurityGroupIds})
if CacheSubnetGroupName:
params.update({'CacheSubnetGroupName':CacheSubnetGroupName})
response = client.create_replication_group(**params)
return response
if __name__ == '__main__':
# Creates an ElastiCache Cluster mode disabled cluster, based on cache.m6g.large nodes, Valkey 8.0, one primary and two replicas.
# Assigns the existent user group "mygroup" for RBAC authentication
response=create_cluster_mode_disabled_rbac(
CacheNodeType='cache.m6g.large',
EngineVersion='8.0',
NumCacheClusters=3,
ReplicationGroupDescription='Valkey cluster mode disabled with replicas',
ReplicationGroupId='valkey202104',
UserGroupIds=[
'mygroup'
],
SecurityGroupIds=[
'sg-7cc73803'
],
CacheSubnetGroupName='default'
)
logging.info(response)
要运行该程序,请输入以下命令:
python ClusterModeDisabledWithRBAC.py
有关更多信息,请参阅 在中管理集群 ElastiCache。
创建已启用集群模式的集群
复制以下程序并将其粘贴到名为 ClusterModeEnabled.py 的文件中。
import boto3
import logging
logging.basicConfig(level=logging.INFO)
client = boto3.client('elasticache')
def create_cluster_mode_enabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumNodeGroups=1,ReplicasPerNodeGroup=1, ReplicationGroupDescription='Sample cache with cluster mode enabled',ReplicationGroupId=None):
"""Creates an ElastiCache Cluster with cluster mode enabled
Returns a dictionary with the API response
:param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used
Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types
:param EngineVersion: Engine version to be used. If not specified, latest will be used.
:param NumNodeGroups: Number of shards in the cluster. Minimum 1 and maximun 90.
If not specified, cluster will be created with 1 shard.
:param ReplicasPerNodeGroup: Number of replicas per shard. If not specified 1 replica per shard will be created.
:param ReplicationGroupDescription: Description for the cluster.
:param ReplicationGroupId: Name for the cluster
:return: dictionary with the API results
"""
if not ReplicationGroupId:
return 'ReplicationGroupId parameter is required'
response = client.create_replication_group(
AutomaticFailoverEnabled=True,
CacheNodeType=CacheNodeType,
Engine='valkey',
EngineVersion=EngineVersion,
ReplicationGroupDescription=ReplicationGroupDescription,
ReplicationGroupId=ReplicationGroupId,
# Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary node (implicit) and 2 replicas (replicasPerNodeGroup)
NumNodeGroups=NumNodeGroups,
ReplicasPerNodeGroup=ReplicasPerNodeGroup,
CacheParameterGroupName='default.valkey7.2.cluster.on'
)
return response
# Creates a cluster mode enabled
response = create_cluster_mode_enabled(
CacheNodeType='cache.m6g.large',
EngineVersion='6.0',
ReplicationGroupDescription='Valkey cluster mode enabled with replicas',
ReplicationGroupId='valkey20210',
# Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary (implicit) and 2 replicas (replicasPerNodeGroup)
NumNodeGroups=2,
ReplicasPerNodeGroup=1,
)
logging.info(response)
要运行该程序,请输入以下命令:
python ClusterModeEnabled.py
有关更多信息,请参阅 在中管理集群 ElastiCache。
创建采用 TLS 和 RBAC 且已启用集群模式的集群
为确保安全性,在创建已启用集群模式的集群时,您可以使用传输层安全性 (TLS) 和基于角色的访问控制 (RBAC)。与 Valkey 或 Redis OSS AUTH(如果对客户端令牌进行身份验证,则所有经过身份验证的客户端都对复制组具有完全的访问权限)不同的是,RBAC 让您能够通过用户组控制集群访问权限。这些用户组旨在作为一种管理对复制组的访问权限的方式。有关更多信息,请参阅 基于角色的访问控制(RBAC)。
复制以下程序并将其粘贴到名为 ClusterModeEnabledWithRBAC.py 的文件中。
import boto3
import logging
logging.basicConfig(level=logging.INFO)
client = boto3.client('elasticache')
def create_cluster_mode_enabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumNodeGroups=1,ReplicasPerNodeGroup=1, ReplicationGroupDescription='Sample cache with cluster mode enabled',ReplicationGroupId=None,UserGroupIds=None, SecurityGroupIds=None,CacheSubnetGroupName=None,CacheParameterGroupName='default.valkey7.2.cluster.on'):
"""Creates an ElastiCache Cluster with cluster mode enabled and RBAC
Returns a dictionary with the API response
:param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used
Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types
:param EngineVersion: Engine version to be used. If not specified, latest will be used.
:param NumNodeGroups: Number of shards in the cluster. Minimum 1 and maximun 90.
If not specified, cluster will be created with 1 shard.
:param ReplicasPerNodeGroup: Number of replicas per shard. If not specified 1 replica per shard will be created.
:param ReplicationGroupDescription: Description for the cluster.
:param ReplicationGroupId: Name for the cluster.
:param CacheParameterGroupName: Parameter group to be used. Must be compatible with the engine version and cluster mode enabled.
:return: dictionary with the API results
"""
if not ReplicationGroupId:
return 'ReplicationGroupId parameter is required'
elif not isinstance(UserGroupIds,(list)):
return {'Error': 'UserGroupIds parameter is required and must be a list'}
params={'AutomaticFailoverEnabled': True,
'CacheNodeType': CacheNodeType,
'Engine': 'valkey',
'EngineVersion': EngineVersion,
'ReplicationGroupDescription': ReplicationGroupDescription,
'ReplicationGroupId': ReplicationGroupId,
'SnapshotRetentionLimit': 30,
'TransitEncryptionEnabled': True,
'UserGroupIds':UserGroupIds,
'NumNodeGroups': NumNodeGroups,
'ReplicasPerNodeGroup': ReplicasPerNodeGroup,
'CacheParameterGroupName': CacheParameterGroupName
}
# defaults will be used if CacheSubnetGroupName or SecurityGroups are not explicit.
if isinstance(SecurityGroupIds,(list)):
params.update({'SecurityGroupIds':SecurityGroupIds})
if CacheSubnetGroupName:
params.update({'CacheSubnetGroupName':CacheSubnetGroupName})
response = client.create_replication_group(**params)
return response
if __name__ == '__main__':
# Creates a cluster mode enabled cluster
response = create_cluster_mode_enabled(
CacheNodeType='cache.m6g.large',
EngineVersion='7.2',
ReplicationGroupDescription='Valkey cluster mode enabled with replicas',
ReplicationGroupId='valkey2021',
# Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary (implicit) and 2 replicas (replicasPerNodeGroup)
NumNodeGroups=2,
ReplicasPerNodeGroup=1,
UserGroupIds=[
'mygroup'
],
SecurityGroupIds=[
'sg-7cc73803'
],
CacheSubnetGroupName='default'
)
logging.info(response)
要运行该程序,请输入以下命令:
python ClusterModeEnabledWithRBAC.py
有关更多信息,请参阅 在中管理集群 ElastiCache。
检查用户/用户组是否存在,如不存在,请创建用户/用户组
通过 RBAC,您可以使用访问字符串创建用户并为其分配特定权限。您可以将用户分配到与特定角色(管理员、人力资源)对应的用户组,然后将这些用户组部署到一个或多 ElastiCache 个 Redis OSS 复制组中。这样,您可以在使用相同 Valkey 或 Redis OSS 复制组的客户端之间建立安全边界,并阻止客户端彼此访问数据。有关更多信息,请参阅 基于角色的访问控制(RBAC)。
复制以下程序并将其粘贴到名为 UserAndUserGroups.py 的文件中。更新用于提供凭证的机制。此示例中的凭证显示为可替换凭证,并分配了未声明的项目。避免对凭证进行硬编码。
此示例使用了包含用户权限的访问字符串。有关访问字符串的更多信息,请参阅使用访问字符串指定权限。
import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def check_user_exists(UserId): """Checks if UserId exists Returns True if UserId exists, otherwise False :param UserId: ElastiCache User ID :return: True|False """ try: response = client.describe_users( UserId=UserId, ) if response['Users'][0]['UserId'].lower() == UserId.lower(): return True except Exception as e: if e.response['Error']['Code'] == 'UserNotFound': logging.info(e.response['Error']) return False else: raise def check_group_exists(UserGroupId): """Checks if UserGroupID exists Returns True if Group ID exists, otherwise False :param UserGroupId: ElastiCache User ID :return: True|False """ try: response = client.describe_user_groups( UserGroupId=UserGroupId ) if response['UserGroups'][0]['UserGroupId'].lower() == UserGroupId.lower(): return True except Exception as e: if e.response['Error']['Code'] == 'UserGroupNotFound': logging.info(e.response['Error']) return False else: raise def create_user(UserId=None,UserName=None,Password=None,AccessString=None): """Creates a new user Returns the ARN for the newly created user or the error message :param UserId: ElastiCache user ID. User IDs must be unique :param UserName: ElastiCache user name. ElastiCache allows multiple users with the same name as long as the associated user ID is unique. :param Password: Password for user. Must have at least 16 chars. :param AccessString: Access string with the permissions for the user. :return: user ARN """ try: response = client.create_user( UserId=UserId, UserName=UserName, Engine='Redis', Passwords=[Password], AccessString=AccessString, NoPasswordRequired=False ) return response['ARN'] except Exception as e: logging.info(e.response['Error']) return e.response['Error'] def create_group(UserGroupId=None, UserIds=None): """Creates a new group. A default user is required (mandatory) and should be specified in the UserIds list Return: Group ARN :param UserIds: List with user IDs to be associated with the new group. A default user is required :param UserGroupId: The ID (name) for the group :return: Group ARN """ try: response = client.create_user_group( UserGroupId=UserGroupId, Engine='Redis', UserIds=UserIds ) return response['ARN'] except Exception as e: logging.info(e.response['Error']) if __name__ == '__main__': groupName='mygroup2' userName = 'myuser2' userId=groupName+'-'+userName # Creates a new user if the user ID does not exist. for tmpUserId,tmpUserName in [ (userId,userName), (groupName+'-default','default')]: if not check_user_exists(tmpUserId): response=create_user(UserId=tmpUserId, UserName=
EXAMPLE
,Password=EXAMPLE
,AccessString='on ~* +@all') logging.info(response) # assigns the new user ID to the user group if not check_group_exists(groupName): UserIds = [ userId , groupName+'-default'] response=create_group(UserGroupId=groupName,UserIds=UserIds) logging.info(response)
要运行该程序,请输入以下命令:
python UserAndUserGroups.py
教程:连接到 ElastiCache
以下示例使用 Valkey 或 Redis OSS 客户端进行连接。 ElastiCache
连接到已禁用集群模式的集群
复制以下程序并将其粘贴到名为 ConnectClusterModeDisabled.py 的文件中。更新用于提供凭证的机制。此示例中的凭证显示为可替换凭证,并分配了未声明的项目。避免对凭证进行硬编码。
from redis import Redis import logging logging.basicConfig(level=logging.INFO) redis = Redis(host='primary.xxx.yyyyyy.zzz1.cache.amazonaws.com', port=6379, decode_responses=True, ssl=True, username=
example
, password=EXAMPLE
) if redis.ping(): logging.info("Connected to Redis")
要运行该程序,请输入以下命令:
python ConnectClusterModeDisabled.py
连接到已启用集群模式的集群
复制以下程序并将其粘贴到名为 ConnectClusterModeEnabled.py 的文件中。
from rediscluster import RedisCluster
import logging
logging.basicConfig(level=logging.INFO)
redis = RedisCluster(startup_nodes=[{"host": "xxx.yyy.clustercfg.zzz1.cache.amazonaws.com","port": "6379"}], decode_responses=True,skip_full_coverage_check=True)
if redis.ping():
logging.info("Connected to Redis")
要运行该程序,请输入以下命令:
python ConnectClusterModeEnabled.py
用法示例
以下示例使用适用于 Redis OSS 的 ElastiCache boto3 开发工具包。 ElastiCache
设置和获取字符串
复制以下程序并将其粘贴到名为 SetAndGetStrings.py 的文件中。
import time
import logging
logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s')
keyName='mykey'
currTime=time.ctime(time.time())
# Set the key 'mykey' with the current date and time as value.
# The Key will expire and removed from cache in 60 seconds.
redis.set(keyName, currTime, ex=60)
# Sleep just for better illustration of TTL (expiration) value
time.sleep(5)
# Retrieve the key value and current TTL
keyValue=redis.get(keyName)
keyTTL=redis.ttl(keyName)
logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValue, keyTTL))
要运行该程序,请输入以下命令:
python SetAndGetStrings.py
设置和获取包含多个项目的哈希
复制以下程序并将其粘贴到名为 SetAndGetHash.py 的文件中。
import logging
import time
logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s')
keyName='mykey'
keyValues={'datetime': time.ctime(time.time()), 'epochtime': time.time()}
# Set the hash 'mykey' with the current date and time in human readable format (datetime field) and epoch number (epochtime field).
redis.hset(keyName, mapping=keyValues)
# Set the key to expire and removed from cache in 60 seconds.
redis.expire(keyName, 60)
# Sleep just for better illustration of TTL (expiration) value
time.sleep(5)
# Retrieves all the fields and current TTL
keyValues=redis.hgetall(keyName)
keyTTL=redis.ttl(keyName)
logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValues, keyTTL))
要运行该程序,请输入以下命令:
python SetAndGetHash.py
从 Pub/Sub 频道发布(写入)和订阅(读取)
复制以下程序并将其粘贴到名为 PubAndSub.py 的文件中。
import logging
import time
def handlerFunction(message):
"""Prints message got from PubSub channel to the log output
Return None
:param message: message to log
"""
logging.info(message)
logging.basicConfig(level=logging.INFO)
redis = Redis(host="redis202104053.tihewd.ng.0001.use1.cache.amazonaws.com", port=6379, decode_responses=True)
# Creates the subscriber connection on "mychannel"
subscriber = redis.pubsub()
subscriber.subscribe(**{'mychannel': handlerFunction})
# Creates a new thread to watch for messages while the main process continues with its routines
thread = subscriber.run_in_thread(sleep_time=0.01)
# Creates publisher connection on "mychannel"
redis.publish('mychannel', 'My message')
# Publishes several messages. Subscriber thread will read and print on log.
while True:
redis.publish('mychannel',time.ctime(time.time()))
time.sleep(1)
要运行该程序,请输入以下命令:
python PubAndSub.py
从流中写入和读取
复制以下程序并将其粘贴到名为 ReadWriteStream.py 的文件中。
from redis import Redis
import redis.exceptions as exceptions
import logging
import time
import threading
logging.basicConfig(level=logging.INFO)
def writeMessage(streamName):
"""Starts a loop writting the current time and thread name to 'streamName'
:param streamName: Stream (key) name to write messages.
"""
fieldsDict={'writerId':threading.currentThread().getName(),'myvalue':None}
while True:
fieldsDict['myvalue'] = time.ctime(time.time())
redis.xadd(streamName,fieldsDict)
time.sleep(1)
def readMessage(groupName=None,streamName=None):
"""Starts a loop reading from 'streamName'
Multiple threads will read from the same stream consumer group. Consumer group is used to coordinate data distribution.
Once a thread acknowleges the message, it won't be provided again. If message wasn't acknowledged, it can be served to another thread.
:param groupName: stream group were multiple threads will read.
:param streamName: Stream (key) name where messages will be read.
"""
readerID=threading.currentThread().getName()
while True:
try:
# Check if the stream has any message
if redis.xlen(streamName)>0:
# Check if if the messages are new (not acknowledged) or not (already processed)
streamData=redis.xreadgroup(groupName,readerID,{streamName:'>'},count=1)
if len(streamData) > 0:
msgId,message = streamData[0][1][0]
logging.info("{}: Got {} from ID {}".format(readerID,message,msgId))
#Do some processing here. If the message has been processed sucessfuly, acknowledge it and (optional) delete the message.
redis.xack(streamName,groupName,msgId)
logging.info("Stream message ID {} read and processed successfuly by {}".format(msgId,readerID))
redis.xdel(streamName,msgId)
else:
pass
except:
raise
time.sleep(0.5)
# Creates the stream 'mystream' and consumer group 'myworkergroup' where multiple threads will write/read.
try:
redis.xgroup_create('mystream','myworkergroup',mkstream=True)
except exceptions.ResponseError as e:
logging.info("Consumer group already exists. Will continue despite the error: {}".format(e))
except:
raise
# Starts 5 writer threads.
for writer_no in range(5):
writerThread = threading.Thread(target=writeMessage, name='writer-'+str(writer_no), args=('mystream',),daemon=True)
writerThread.start()
# Starts 10 reader threads
for reader_no in range(10):
readerThread = threading.Thread(target=readMessage, name='reader-'+str(reader_no), args=('myworkergroup','mystream',),daemon=True)
readerThread.daemon = True
readerThread.start()
# Keep the code running for 30 seconds
time.sleep(30)
要运行该程序,请输入以下命令:
python ReadWriteStream.py