本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Python 在自己设计的 Redis OSS 集群上启用传输中加密
以下指南将演示如何在 Redis OSS 7.0 集群上启用传输中加密,该集群最初是在禁用传输中加密的情况下创建的。TCP在此过程中,TLS客户端将继续与群集通信,而不会停机。
Boto3 将从环境变量中获取它所需的凭证(aws_access_key_id
、aws_secret_access_key
和 aws_session_token
)。这些凭证将提前粘贴到一个 bash 终端中,我们将在该同一个终端中运行 python3
来处理本指南中显示的 Python 代码。以下示例中的代码来自一个EC2实例,该实例是在同一实例中启动的VPC,该实例将用于在其中创建 ElastiCache Redis OSS 集群。
注意
-
以下示例使用 boto3 SDK 进行 ElastiCache 管理操作(创建集群或用户),使用 redis-p redis-py-cluster y/ 进行数据处理。
-
您必须至少使用 boto3 版本 (=~) 1.26.39 才能将在线TLS迁移与集群修改一起使用。API
-
ElastiCache 仅支持 Valkey 版本 7.2 及以上或 Redis 版本 7.0 或更高OSS版本的集群进行在线TLS迁移。因此,如果您的集群运行的 Redis OSS 版本低于 7.0,则需要升级集群的 Redis OSS 版本。有关版本差异的更多信息,请参阅主要版本行为和与 Redis 的兼容性差异 OSS。
主题
定义将启动 ElastiCache Valkey 或 Redis 集群的字符串常量 OSS
首先,让我们定义一些简单的 Python 字符串常量,用于保存创建 ElastiCache 集群所需的 AWS 实体的名称security-group
,例如Cache Subnet group
、和 a default parameter group
。所有这些 AWS 实体都必须事先在您愿意使用的区域的 AWS 账户中创建。
#Constants definitions SECURITY_GROUP = "sg-0492aa0a29c558427" CLUSTER_DESCRIPTION = "This cluster has been launched as part of the online TLS migration user guide" EC_SUBNET_GROUP = "client-testing" DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED = "default.redis7.cluster.on"
为集群配置定义类
现在,让我们定义一些简单的 Python 类来表示集群的配置,这些类将保存有关集群的元数据,例如 Valkey 或 Redis OSS 版本、实例类型以及是启用还是禁用传输中加密 (TLS)。
#Class definitions class Config: def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, ): self.instance_type = instance_type self.version = version self.multi_az = multi_az self.TLS = TLS self.name = name or f"tls-test" def create_base_launch_request(self): return { "ReplicationGroupId": self.name, "TransitEncryptionEnabled": self.TLS, "MultiAZEnabled": self.multi_az, "CacheNodeType": self.instance_type, "Engine": "redis", "EngineVersion": self.version, "CacheSubnetGroupName": EC_SUBNET_GROUP , "CacheParameterGroupName": DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED , "ReplicationGroupDescription": CLUSTER_DESCRIPTION, "SecurityGroupIds": [SECURITY_GROUP], } class ConfigCME(Config): def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, num_shards: int = 2, num_replicas_per_shard: int = 1, ): super().__init__(instance_type, version, multi_az, TLS, name) self.num_shards = num_shards self.num_replicas_per_shard = num_replicas_per_shard def create_launch_request(self) -> dict: launch_request = self.create_base_launch_request() launch_request["NumNodeGroups"] = self.num_shards launch_request["ReplicasPerNodeGroup"] = self.num_replicas_per_shard return launch_request
定义一个表示集群本身的类
现在,让我们定义一些简单的 Python 类来表示 ElastiCache Valkey 或 Redis OSS 集群本身。该类将有一个 client 字段,其中包含一个 boto3 客户端,用于 ElastiCache 管理操作,例如创建集群和查询。 ElastiCache API
import botocore.config import boto3 # Create boto3 client def init_client(region: str = "us-east-1"): config = botocore.config.Config(retries={"max_attempts": 10, "mode": "standard"}) init_request = dict() init_request["config"] = config init_request["service_name"] = "elasticache" init_request["region_name"] = region return boto3.client(**init_request) class ElastiCacheClusterBase: def __init__(self, name: str): self.name = name self.elasticache_client = init_client() def get_first_replication_group(self): return self.elasticache_client.describe_replication_groups( ReplicationGroupId=self.name )["ReplicationGroups"][0] def get_status(self) -> str: return self.get_first_replication_group()["Status"] def get_transit_encryption_enabled(self) -> bool: return self.get_first_replication_group()["TransitEncryptionEnabled"] def is_available(self) -> bool: return self.get_status() == "available" def is_modifying(self) -> bool: return self.get_status() == "modifying" def wait_for_available(self): while True: if self.is_available(): break else: time.sleep(5) def wait_for_modifying(self): while True: if self.is_modifying(): break else: time.sleep(5) def delete_cluster(self) -> bool: self.elasticache_client.delete_replication_group( ReplicationGroupId=self.name, RetainPrimaryCluster=False ) def modify_transit_encryption_mode(self, new_transit_encryption_mode: str): # generate api call to migrate the cluster to TLS preffered or to TLS required self.elasticache_client.modify_replication_group( ReplicationGroupId=self.name, TransitEncryptionMode=new_transit_encryption_mode, TransitEncryptionEnabled=True, ApplyImmediately=True, ) self.wait_for_modifying() class ElastiCacheClusterCME(ElastiCacheClusterBase): def __init__(self, name: str): super().__init__(name) @classmethod def launch(cls, config: ConfigCME = None) -> ElastiCacheClusterCME: config = config or ConfigCME() print(config) new_cluster = ElastiCacheClusterCME(config.name) launch_request = config.create_launch_request() new_cluster.elasticache_client.create_replication_group(**launch_request) new_cluster.wait_for_available() return new_cluster def get_configuration_endpoint(self) -> str: return self.get_first_replication_group()["ConfigurationEndpoint"]["Address"] #Since the code can throw exceptions, we define this class to make the code more readable and #so we won't forget to delete the cluster class ElastiCacheCMEManager: def __init__(self, config: ConfigCME = None): self.config = config or ConfigCME() def __enter__(self) -> ElastiCacheClusterCME: self.cluster = ElastiCacheClusterCME.launch(self.config) return self.cluster def __exit__(self, exc_type, exc_val, exc_tb): self.cluster.delete_cluster()
(可选)创建包装类来演示客户端与 Valkey 或 Redis 集群的连接 OSS
现在,让我们为 redis-py-cluster
客户端创建一个包装器类。这个包装器类将支持在集群中预先填充一些密钥,然后执行随机重复的 get
命令。
注意
这是一个可选步骤,但它简化了后面步骤中出现的主函数的代码。
import redis improt random from time import perf_counter_ns, time class DowntimeTestClient: def __init__(self, client): self.client = client # num of keys prefilled self.prefilled = 0 # percent of get above prefilled self.percent_get_above_prefilled = 10 # nil result expected when get hit above prefilled # total downtime in nano seconds self.downtime_ns = 0 # num of success and fail operations self.success_ops = 0 self.fail_ops = 0 self.connection_errors = 0 self.timeout_errors = 0 def replace_client(self, client): self.client = client def prefill_data(self, timelimit_sec=60): end_time = time() + timelimit_sec while time() < end_time: self.client.set(self.prefilled, self.prefilled) self.prefilled += 1 # unsuccesful operations throw exceptions def _exec(self, func): try: start_ns = perf_counter_ns() func() self.success_ops += 1 elapsed_ms = (perf_counter_ns() - start_ns) // 10 ** 6 # upon succesful execution of func # reset random_key to None so that the next command # will use a new random key self.random_key = None except Exception as e: elapsed_ns = perf_counter_ns() - start_ns self.downtime_ns += elapsed_ns # in case of failure- increment the relevant counters so that we will keep track # of how many connection issues we had while trying to communicate with # the cluster. self.fail_ops += 1 if e.__class__ is redis.exceptions.ConnectionError: self.connection_errors += 1 if e.__class__ is redis.exceptions.TimeoutError: self.timeout_errors += 1 def _repeat_exec(self, func, seconds): end_time = time() + seconds while time() < end_time: self._exec(func) def _new_random_key_if_needed(self, percent_above_prefilled): if self.random_key is None: max = int((self.prefilled * (100 + percent_above_prefilled)) / 100) return random.randint(0, max) return self.random_key def _random_get(self): key = self._new_random_key_if_needed(self.percent_get_above_prefilled) result = self.client.get(key) # we know the key was set for sure only in the case key < self.prefilled if key < self.prefilled: assert result.decode("UTF-8") == str(key) def repeat_get(self, seconds=60): self._repeat_exec(self._random_get, seconds) def get_downtime_ms(self) -> int: return self.downtime_ns // 10 ** 6 def do_get_until(self, cond_check): while not cond_check(): self.repeat_get() # do one more get cycle once condition is met self.repeat_get()
创建主函数,用于演示更改传输中加密配置的过程
现在,让我们定义主函数,它将执行以下操作:
-
使用 boto3 ElastiCache 客户端创建集群。
-
初始化将通过清晰TCP连接连接到集群的
redis-py-cluster
客户端TLS。 -
redis-py-cluster
客户端在集群中预先填充一些数据。 -
boto3 客户端将触发从 “否” 到 “首选” TLS 的TLSTLS迁移。
-
在将集群迁移到时 TLS
Preferred
,redis-py-cluster
TCP客户端将向集群重复发送get
操作,直到迁移完成。 -
迁移到TLS
Preferred
完成后,我们将断言集群支持传输中加密。之后,我们将创建一个用于连接到集群的redis-py-cluster
客户端TLS。 -
我们将使用新TLS客户端和旧TCP客户端发送一些
get
命令。 -
boto3 客户端将触发从TLS所需的迁移TLS
Preferred
。TLS -
在将集群迁移到TLS必需状态时, redis-py-clusterTLS客户端将向集群发送重复的
get
操作,直到迁移完成。
import redis def init_cluster_client( cluster: ElastiCacheClusterCME, prefill_data: bool, TLS: bool = True) -> DowntimeTestClient: # we must use for the host name the cluster configuration endpoint. redis_client = redis.RedisCluster( host=cluster.get_configuration_endpoint(), ssl=TLS, socket_timeout=0.25, socket_connect_timeout=0.1 ) test_client = DowntimeTestClient(redis_client) if prefill_data: test_client.prefill_data() return test_client if __name__ == '__main__': config = ConfigCME(TLS=False, instance_type="cache.m5.large") with ElastiCacheCMEManager(config) as cluster: # create a client that will connect to the cluster with clear tcp connection test_client_tcp = init_cluster_client(cluster, prefill_data=True, TLS=False) # migrate the cluster to TLS Preferred cluster.modify_transit_encryption_mode(new_transit_encryption_mode="preferred") # do repeated get commands until the cluster finishes the migration to TLS Preferred test_client_tcp.do_get_until(cluster.is_available) # verify that in transit encryption is enabled so that clients will be able to connect to the cluster with TLS assert cluster.get_transit_encryption_enabled() == True # create a client that will connect to the cluster with TLS connection. # we must first make sure that the cluster indeed supports TLS test_client_tls = init_cluster_client(cluster, prefill_data=True, TLS=True) # by doing get commands with the tcp client for 60 more seconds # we can verify that the existing tcp connection to the cluster still works test_client_tcp.repeat_get(seconds=60) # do get commands with the new TLS client for 60 more seconds test_client_tcp.repeat_get(seconds=60) # migrate the cluster to TLS required cluster.modify_transit_encryption_mode(new_transit_encryption_mode="required") # from this point the tcp clients will be disconnected and we must not use them anymore. # do get commands with the TLS client until the cluster finishes migartion to TLS required mode. test_client_tls.do_get_until(cluster.is_available)