使用 Python 在自行设计的 Redis OSS 集群上启用传输中加密 - Amazon ElastiCache

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Python 在自行设计的 Redis OSS 集群上启用传输中加密

以下指南将演示如何在 Redis OSS 7.0 集群上启用传输中加密,该集群最初是在禁用传输中加密的情况下创建的。在此过程中,TCP 和 TLS 客户端将继续与集群通信,无需停机。

Boto3 将从环境变量中获取它所需的凭证(aws_access_key_idaws_secret_access_keyaws_session_token)。这些凭证将提前粘贴到一个 bash 终端中,我们将在该同一个终端中运行 python3 来处理本指南中显示的 Python 代码。以下示例中的代码从一个 EC2 实例进行处理,该实例在一个 VPC 中启动,该同一个 VPC 将用于在其中创建 ElastiCache Redis OSS 集群。

注意
  • 以下示例使用 boto3 SDK for ElastiCache 管理操作(集群或用户创建),并使用 redis-py/redis-py-cluster 处理数据。

  • 您必须至少使用 boto3 版本 (=~) 1.26.39,才能通过集群修改 API 使用在线 TLS 迁移。

  • ElastiCache 仅对于 Valkey 7.2 及更高版本或者 Redis OSS 7.0 或更高版本集群支持在线 TLS 迁移。因此,如果您的集群运行的是 7.0 之前的 Redis OSS 版本,则需要升级集群的 Redis OSS 版本。有关版本差异的更多信息,请参阅引擎版本的主要行为和与 Redis 的兼容性差异 OSS

定义将启动 ElastiCache(兼容 Valkey 或 Redis OSS)集群的字符串常量

首先,让我们定义一些简单的 Python 字符串常量,这些常量将保留创建 ElastiCache 集群所需的 AWS 实体的名称,例如 security-groupCache Subnet groupdefault parameter group。所有这些 AWS 实体都必须事先在您愿意使用的区域的 AWS 账户中创建。

#Constants definitions SECURITY_GROUP = "sg-0492aa0a29c558427" CLUSTER_DESCRIPTION = "This cluster has been launched as part of the online TLS migration user guide" EC_SUBNET_GROUP = "client-testing" DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED = "default.redis7.cluster.on"

为集群配置定义类

现在,让我们定义一些简单的 Python 类,这些类将表示集群的配置,而这些配置将保留有关集群的元数据,例如 Valkey 或 Redis OSS 版本、实例类型以及是启用还是禁用传输中加密(TLS)。

#Class definitions class Config: def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, ): self.instance_type = instance_type self.version = version self.multi_az = multi_az self.TLS = TLS self.name = name or f"tls-test" def create_base_launch_request(self): return { "ReplicationGroupId": self.name, "TransitEncryptionEnabled": self.TLS, "MultiAZEnabled": self.multi_az, "CacheNodeType": self.instance_type, "Engine": "redis", "EngineVersion": self.version, "CacheSubnetGroupName": EC_SUBNET_GROUP , "CacheParameterGroupName": DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED , "ReplicationGroupDescription": CLUSTER_DESCRIPTION, "SecurityGroupIds": [SECURITY_GROUP], } class ConfigCME(Config): def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, num_shards: int = 2, num_replicas_per_shard: int = 1, ): super().__init__(instance_type, version, multi_az, TLS, name) self.num_shards = num_shards self.num_replicas_per_shard = num_replicas_per_shard def create_launch_request(self) -> dict: launch_request = self.create_base_launch_request() launch_request["NumNodeGroups"] = self.num_shards launch_request["ReplicasPerNodeGroup"] = self.num_replicas_per_shard return launch_request

定义一个表示集群本身的类

现在,让我们定义一些简单的 Python 类,它们将表示 ElastiCache(兼容 Valkey 或 Redis OSS)集群本身。该类将有一个客户端字段,此字段将保留用于执行 ElastiCache 管理操作(例如,创建集群和查询 ElastiCache API)的 boto3 客户端。

import botocore.config import boto3 # Create boto3 client def init_client(region: str = "us-east-1"): config = botocore.config.Config(retries={"max_attempts": 10, "mode": "standard"}) init_request = dict() init_request["config"] = config init_request["service_name"] = "elasticache" init_request["region_name"] = region return boto3.client(**init_request) class ElastiCacheClusterBase: def __init__(self, name: str): self.name = name self.elasticache_client = init_client() def get_first_replication_group(self): return self.elasticache_client.describe_replication_groups( ReplicationGroupId=self.name )["ReplicationGroups"][0] def get_status(self) -> str: return self.get_first_replication_group()["Status"] def get_transit_encryption_enabled(self) -> bool: return self.get_first_replication_group()["TransitEncryptionEnabled"] def is_available(self) -> bool: return self.get_status() == "available" def is_modifying(self) -> bool: return self.get_status() == "modifying" def wait_for_available(self): while True: if self.is_available(): break else: time.sleep(5) def wait_for_modifying(self): while True: if self.is_modifying(): break else: time.sleep(5) def delete_cluster(self) -> bool: self.elasticache_client.delete_replication_group( ReplicationGroupId=self.name, RetainPrimaryCluster=False ) def modify_transit_encryption_mode(self, new_transit_encryption_mode: str): # generate api call to migrate the cluster to TLS preffered or to TLS required self.elasticache_client.modify_replication_group( ReplicationGroupId=self.name, TransitEncryptionMode=new_transit_encryption_mode, TransitEncryptionEnabled=True, ApplyImmediately=True, ) self.wait_for_modifying() class ElastiCacheClusterCME(ElastiCacheClusterBase): def __init__(self, name: str): super().__init__(name) @classmethod def launch(cls, config: ConfigCME = None) -> ElastiCacheClusterCME: config = config or ConfigCME() print(config) new_cluster = ElastiCacheClusterCME(config.name) launch_request = config.create_launch_request() new_cluster.elasticache_client.create_replication_group(**launch_request) new_cluster.wait_for_available() return new_cluster def get_configuration_endpoint(self) -> str: return self.get_first_replication_group()["ConfigurationEndpoint"]["Address"] #Since the code can throw exceptions, we define this class to make the code more readable and #so we won't forget to delete the cluster class ElastiCacheCMEManager: def __init__(self, config: ConfigCME = None): self.config = config or ConfigCME() def __enter__(self) -> ElastiCacheClusterCME: self.cluster = ElastiCacheClusterCME.launch(self.config) return self.cluster def __exit__(self, exc_type, exc_val, exc_tb): self.cluster.delete_cluster()

(可选)创建一个包装器类来演示客户端与 Valkey 或 Redis OSS 集群的连接

现在,让我们为 redis-py-cluster 客户端创建一个包装器类。这个包装器类将支持在集群中预先填充一些密钥,然后执行随机重复的 get 命令。

注意

这是一个可选步骤,但它简化了后面步骤中出现的主函数的代码。

import redis improt random from time import perf_counter_ns, time class DowntimeTestClient: def __init__(self, client): self.client = client # num of keys prefilled self.prefilled = 0 # percent of get above prefilled self.percent_get_above_prefilled = 10 # nil result expected when get hit above prefilled # total downtime in nano seconds self.downtime_ns = 0 # num of success and fail operations self.success_ops = 0 self.fail_ops = 0 self.connection_errors = 0 self.timeout_errors = 0 def replace_client(self, client): self.client = client def prefill_data(self, timelimit_sec=60): end_time = time() + timelimit_sec while time() < end_time: self.client.set(self.prefilled, self.prefilled) self.prefilled += 1 # unsuccesful operations throw exceptions def _exec(self, func): try: start_ns = perf_counter_ns() func() self.success_ops += 1 elapsed_ms = (perf_counter_ns() - start_ns) // 10 ** 6 # upon succesful execution of func # reset random_key to None so that the next command # will use a new random key self.random_key = None except Exception as e: elapsed_ns = perf_counter_ns() - start_ns self.downtime_ns += elapsed_ns # in case of failure- increment the relevant counters so that we will keep track # of how many connection issues we had while trying to communicate with # the cluster. self.fail_ops += 1 if e.__class__ is redis.exceptions.ConnectionError: self.connection_errors += 1 if e.__class__ is redis.exceptions.TimeoutError: self.timeout_errors += 1 def _repeat_exec(self, func, seconds): end_time = time() + seconds while time() < end_time: self._exec(func) def _new_random_key_if_needed(self, percent_above_prefilled): if self.random_key is None: max = int((self.prefilled * (100 + percent_above_prefilled)) / 100) return random.randint(0, max) return self.random_key def _random_get(self): key = self._new_random_key_if_needed(self.percent_get_above_prefilled) result = self.client.get(key) # we know the key was set for sure only in the case key < self.prefilled if key < self.prefilled: assert result.decode("UTF-8") == str(key) def repeat_get(self, seconds=60): self._repeat_exec(self._random_get, seconds) def get_downtime_ms(self) -> int: return self.downtime_ns // 10 ** 6 def do_get_until(self, cond_check): while not cond_check(): self.repeat_get() # do one more get cycle once condition is met self.repeat_get()

创建主函数,用于演示更改传输中加密配置的过程

现在,让我们定义主函数,它将执行以下操作:

  1. 使用 boto3 ElastiCache 客户端创建集群。

  2. 初始化将使用不带 TLS 的清晰 TCP 连接来连接到集群的 redis-py-cluster 客户端。

  3. redis-py-cluster 客户端在集群中预先填充一些数据。

  4. boto3 客户端将触发 TLS 从无 TLS 迁移到首选 TLS。

  5. 当将集群迁移到 TLS Preferred 时,redis-py-cluster TCP 客户端将向集群发送重复的 get 操作,直到迁移完成。

  6. 完成向 TLS Preferred 的迁移后,我们将断言集群支持传输中加密。之后,我们将创建一个使用 TLS 连接到集群的 redis-py-cluster 客户端。

  7. 我们将使用新的 TLS 客户端和旧的 TCP 客户端发送一些 get 命令。

  8. boto3 客户端将触发 TLS 从 TLS Preferred 迁移到需要 TLS。

  9. 当将集群迁移到需要 TLS 时,redis-py-cluster TLS 客户端将向集群发送重复的 get 操作,直到迁移完成。

import redis def init_cluster_client( cluster: ElastiCacheClusterCME, prefill_data: bool, TLS: bool = True) -> DowntimeTestClient: # we must use for the host name the cluster configuration endpoint. redis_client = redis.RedisCluster( host=cluster.get_configuration_endpoint(), ssl=TLS, socket_timeout=0.25, socket_connect_timeout=0.1 ) test_client = DowntimeTestClient(redis_client) if prefill_data: test_client.prefill_data() return test_client if __name__ == '__main__': config = ConfigCME(TLS=False, instance_type="cache.m5.large") with ElastiCacheCMEManager(config) as cluster: # create a client that will connect to the cluster with clear tcp connection test_client_tcp = init_cluster_client(cluster, prefill_data=True, TLS=False) # migrate the cluster to TLS Preferred cluster.modify_transit_encryption_mode(new_transit_encryption_mode="preferred") # do repeated get commands until the cluster finishes the migration to TLS Preferred test_client_tcp.do_get_until(cluster.is_available) # verify that in transit encryption is enabled so that clients will be able to connect to the cluster with TLS assert cluster.get_transit_encryption_enabled() == True # create a client that will connect to the cluster with TLS connection. # we must first make sure that the cluster indeed supports TLS test_client_tls = init_cluster_client(cluster, prefill_data=True, TLS=True) # by doing get commands with the tcp client for 60 more seconds # we can verify that the existing tcp connection to the cluster still works test_client_tcp.repeat_get(seconds=60) # do get commands with the new TLS client for 60 more seconds test_client_tcp.repeat_get(seconds=60) # migrate the cluster to TLS required cluster.modify_transit_encryption_mode(new_transit_encryption_mode="required") # from this point the tcp clients will be disconnected and we must not use them anymore. # do get commands with the TLS client until the cluster finishes migartion to TLS required mode. test_client_tls.do_get_until(cluster.is_available)