本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
以下指南将演示如何在 Redis OSS 7.0 集群上启用传输中加密,该集群最初是在禁用传输中加密的情况下创建的。在此过程中,TCP 和 TLS 客户端将继续与集群通信,无需停机。
Boto3 将从环境变量中获取它所需的凭证(aws_access_key_id
、aws_secret_access_key
和 aws_session_token
)。这些凭证将提前粘贴到一个 bash 终端中,我们将在该同一个终端中运行 python3
来处理本指南中显示的 Python 代码。以下示例中的代码是在将用于在其中创建 ElastiCache Redis OSS 集群的同一 VPC 中启动的 EC2 实例中的进程。
注意
-
以下示例使用 boto3 SDK 进行 ElastiCache 管理操作(创建集群或用户),使用 redis-p redis-py-cluster y/ 进行数据处理。
-
您必须至少使用 boto3 版本 (=~) 1.26.39,才能通过集群修改 API 使用在线 TLS 迁移。
-
ElastiCache 仅支持 Valkey 版本 7.2 及以上或 Redis OSS 版本 7.0 或更高版本的集群的在线 TLS 迁移。因此,如果您的集群运行的是 7.0 之前的 Redis OSS 版本,则需要升级集群的 Redis OSS 版本。有关版本差异的更多信息,请参阅主要引擎版本行为和与 Redis OSS 的兼容性差异。
主题
定义将启动 ElastiCache Valkey 或 Redis OSS 集群的字符串常量
首先,让我们定义一些简单的 Python 字符串常量,用于保存创建 ElastiCache 集群所需的 AWS 实体的名称security-group
,例如Cache Subnet group
、和 a default parameter group
。所有这些 AWS 实体都必须事先在您愿意使用的区域的 AWS 账户中创建。
#Constants definitions
SECURITY_GROUP = "sg-0492aa0a29c558427"
CLUSTER_DESCRIPTION = "This cluster has been launched as part of the online TLS migration user guide"
EC_SUBNET_GROUP = "client-testing"
DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED = "default.redis7.cluster.on"
为集群配置定义类
现在,让我们定义一些简单的 Python 类,这些类将表示集群的配置,而这些配置将保留有关集群的元数据,例如 Valkey 或 Redis OSS 版本、实例类型以及是启用还是禁用传输中加密(TLS)。
#Class definitions
class Config:
def __init__(
self,
instance_type: str = "cache.t4g.small",
version: str = "7.0",
multi_az: bool = True,
TLS: bool = True,
name: str = None,
):
self.instance_type = instance_type
self.version = version
self.multi_az = multi_az
self.TLS = TLS
self.name = name or f"tls-test"
def create_base_launch_request(self):
return {
"ReplicationGroupId": self.name,
"TransitEncryptionEnabled": self.TLS,
"MultiAZEnabled": self.multi_az,
"CacheNodeType": self.instance_type,
"Engine": "redis",
"EngineVersion": self.version,
"CacheSubnetGroupName": EC_SUBNET_GROUP ,
"CacheParameterGroupName": DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED ,
"ReplicationGroupDescription": CLUSTER_DESCRIPTION,
"SecurityGroupIds": [SECURITY_GROUP],
}
class ConfigCME(Config):
def __init__(
self,
instance_type: str = "cache.t4g.small",
version: str = "7.0",
multi_az: bool = True,
TLS: bool = True,
name: str = None,
num_shards: int = 2,
num_replicas_per_shard: int = 1,
):
super().__init__(instance_type, version, multi_az, TLS, name)
self.num_shards = num_shards
self.num_replicas_per_shard = num_replicas_per_shard
def create_launch_request(self) -> dict:
launch_request = self.create_base_launch_request()
launch_request["NumNodeGroups"] = self.num_shards
launch_request["ReplicasPerNodeGroup"] = self.num_replicas_per_shard
return launch_request
定义一个表示集群本身的类
现在,让我们定义一些简单的 Python 类来表示 ElastiCache Valkey 或 Redis OSS 集群本身。该类将有一个 client 字段,其中包含一个 boto3 客户端,用于 ElastiCache 管理操作,例如创建集群和查询 API。 ElastiCache
import botocore.config
import boto3
# Create boto3 client
def init_client(region: str = "us-east-1"):
config = botocore.config.Config(retries={"max_attempts": 10, "mode": "standard"})
init_request = dict()
init_request["config"] = config
init_request["service_name"] = "elasticache"
init_request["region_name"] = region
return boto3.client(**init_request)
class ElastiCacheClusterBase:
def __init__(self, name: str):
self.name = name
self.elasticache_client = init_client()
def get_first_replication_group(self):
return self.elasticache_client.describe_replication_groups(
ReplicationGroupId=self.name
)["ReplicationGroups"][0]
def get_status(self) -> str:
return self.get_first_replication_group()["Status"]
def get_transit_encryption_enabled(self) -> bool:
return self.get_first_replication_group()["TransitEncryptionEnabled"]
def is_available(self) -> bool:
return self.get_status() == "available"
def is_modifying(self) -> bool:
return self.get_status() == "modifying"
def wait_for_available(self):
while True:
if self.is_available():
break
else:
time.sleep(5)
def wait_for_modifying(self):
while True:
if self.is_modifying():
break
else:
time.sleep(5)
def delete_cluster(self) -> bool:
self.elasticache_client.delete_replication_group(
ReplicationGroupId=self.name, RetainPrimaryCluster=False
)
def modify_transit_encryption_mode(self, new_transit_encryption_mode: str):
# generate api call to migrate the cluster to TLS preffered or to TLS required
self.elasticache_client.modify_replication_group(
ReplicationGroupId=self.name,
TransitEncryptionMode=new_transit_encryption_mode,
TransitEncryptionEnabled=True,
ApplyImmediately=True,
)
self.wait_for_modifying()
class ElastiCacheClusterCME(ElastiCacheClusterBase):
def __init__(self, name: str):
super().__init__(name)
@classmethod
def launch(cls, config: ConfigCME = None) -> ElastiCacheClusterCME:
config = config or ConfigCME()
print(config)
new_cluster = ElastiCacheClusterCME(config.name)
launch_request = config.create_launch_request()
new_cluster.elasticache_client.create_replication_group(**launch_request)
new_cluster.wait_for_available()
return new_cluster
def get_configuration_endpoint(self) -> str:
return self.get_first_replication_group()["ConfigurationEndpoint"]["Address"]
#Since the code can throw exceptions, we define this class to make the code more readable and
#so we won't forget to delete the cluster
class ElastiCacheCMEManager:
def __init__(self, config: ConfigCME = None):
self.config = config or ConfigCME()
def __enter__(self) -> ElastiCacheClusterCME:
self.cluster = ElastiCacheClusterCME.launch(self.config)
return self.cluster
def __exit__(self, exc_type, exc_val, exc_tb):
self.cluster.delete_cluster()
(可选)创建一个包装器类来演示客户端与 Valkey 或 Redis OSS 集群的连接
现在,让我们为 redis-py-cluster
客户端创建一个包装器类。这个包装器类将支持在集群中预先填充一些密钥,然后执行随机重复的 get
命令。
注意
这是一个可选步骤,但它简化了后面步骤中出现的主函数的代码。
import redis
improt random
from time import perf_counter_ns, time
class DowntimeTestClient:
def __init__(self, client):
self.client = client
# num of keys prefilled
self.prefilled = 0
# percent of get above prefilled
self.percent_get_above_prefilled = 10 # nil result expected when get hit above prefilled
# total downtime in nano seconds
self.downtime_ns = 0
# num of success and fail operations
self.success_ops = 0
self.fail_ops = 0
self.connection_errors = 0
self.timeout_errors = 0
def replace_client(self, client):
self.client = client
def prefill_data(self, timelimit_sec=60):
end_time = time() + timelimit_sec
while time() < end_time:
self.client.set(self.prefilled, self.prefilled)
self.prefilled += 1
# unsuccesful operations throw exceptions
def _exec(self, func):
try:
start_ns = perf_counter_ns()
func()
self.success_ops += 1
elapsed_ms = (perf_counter_ns() - start_ns) // 10 ** 6
# upon succesful execution of func
# reset random_key to None so that the next command
# will use a new random key
self.random_key = None
except Exception as e:
elapsed_ns = perf_counter_ns() - start_ns
self.downtime_ns += elapsed_ns
# in case of failure- increment the relevant counters so that we will keep track
# of how many connection issues we had while trying to communicate with
# the cluster.
self.fail_ops += 1
if e.__class__ is redis.exceptions.ConnectionError:
self.connection_errors += 1
if e.__class__ is redis.exceptions.TimeoutError:
self.timeout_errors += 1
def _repeat_exec(self, func, seconds):
end_time = time() + seconds
while time() < end_time:
self._exec(func)
def _new_random_key_if_needed(self, percent_above_prefilled):
if self.random_key is None:
max = int((self.prefilled * (100 + percent_above_prefilled)) / 100)
return random.randint(0, max)
return self.random_key
def _random_get(self):
key = self._new_random_key_if_needed(self.percent_get_above_prefilled)
result = self.client.get(key)
# we know the key was set for sure only in the case key < self.prefilled
if key < self.prefilled:
assert result.decode("UTF-8") == str(key)
def repeat_get(self, seconds=60):
self._repeat_exec(self._random_get, seconds)
def get_downtime_ms(self) -> int:
return self.downtime_ns // 10 ** 6
def do_get_until(self, cond_check):
while not cond_check():
self.repeat_get()
# do one more get cycle once condition is met
self.repeat_get()
创建主函数,用于演示更改传输中加密配置的过程
现在,让我们定义主函数,它将执行以下操作:
-
使用 boto3 ElastiCache 客户端创建集群。
-
初始化将使用不带 TLS 的清晰 TCP 连接来连接到集群的
redis-py-cluster
客户端。 -
redis-py-cluster
客户端在集群中预先填充一些数据。 -
boto3 客户端将触发 TLS 从无 TLS 迁移到首选 TLS。
-
当将集群迁移到 TLS
Preferred
时,redis-py-cluster
TCP 客户端将向集群发送重复的get
操作,直到迁移完成。 -
完成向 TLS
Preferred
的迁移后,我们将断言集群支持传输中加密。之后,我们将创建一个使用 TLS 连接到集群的redis-py-cluster
客户端。 -
我们将使用新的 TLS 客户端和旧的 TCP 客户端发送一些
get
命令。 -
boto3 客户端将触发 TLS 从 TLS
Preferred
迁移到需要 TLS。 -
当集群需要迁移到 TLS 时, redis-py-clusterTLS 客户端将向集群发送重复的
get
操作,直到迁移完成。
import redis
def init_cluster_client(
cluster: ElastiCacheClusterCME, prefill_data: bool, TLS: bool = True) -> DowntimeTestClient:
# we must use for the host name the cluster configuration endpoint.
redis_client = redis.RedisCluster(
host=cluster.get_configuration_endpoint(), ssl=TLS, socket_timeout=0.25, socket_connect_timeout=0.1
)
test_client = DowntimeTestClient(redis_client)
if prefill_data:
test_client.prefill_data()
return test_client
if __name__ == '__main__':
config = ConfigCME(TLS=False, instance_type="cache.m5.large")
with ElastiCacheCMEManager(config) as cluster:
# create a client that will connect to the cluster with clear tcp connection
test_client_tcp = init_cluster_client(cluster, prefill_data=True, TLS=False)
# migrate the cluster to TLS Preferred
cluster.modify_transit_encryption_mode(new_transit_encryption_mode="preferred")
# do repeated get commands until the cluster finishes the migration to TLS Preferred
test_client_tcp.do_get_until(cluster.is_available)
# verify that in transit encryption is enabled so that clients will be able to connect to the cluster with TLS
assert cluster.get_transit_encryption_enabled() == True
# create a client that will connect to the cluster with TLS connection.
# we must first make sure that the cluster indeed supports TLS
test_client_tls = init_cluster_client(cluster, prefill_data=True, TLS=True)
# by doing get commands with the tcp client for 60 more seconds
# we can verify that the existing tcp connection to the cluster still works
test_client_tcp.repeat_get(seconds=60)
# do get commands with the new TLS client for 60 more seconds
test_client_tcp.repeat_get(seconds=60)
# migrate the cluster to TLS required
cluster.modify_transit_encryption_mode(new_transit_encryption_mode="required")
# from this point the tcp clients will be disconnected and we must not use them anymore.
# do get commands with the TLS client until the cluster finishes migartion to TLS required mode.
test_client_tls.do_get_until(cluster.is_available)