使用 Python 在自行設計的 Redis OSS 叢集上啟用傳輸中加密 - Amazon ElastiCache (雷迪OSS斯)

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Python 在自行設計的 Redis OSS 叢集上啟用傳輸中加密

以下指南將示範如何在最初停用傳輸中加密的情況下建立的 Redis OSS 7.0 叢集上啟用傳輸中加密。在此程序期間,TCP 和 TLS 用戶端將繼續與叢集通訊,而不會停機。

Boto3 將從環境變量中取得所需的憑證 (aws_access_key_idaws_secret_access_keyaws_session_token)。這些憑證將事先貼到同一個 bash 終端中,我們將在其中執行 python3 以處理本指南中顯示的 Python 程式碼。以下範例中的程式碼是來自 EC2 執行個體的程序,該執行個體在同一個 VPC 中啟動,該執行個體將用於在其中建立 ElastiCache Redis OSS 叢集。

注意
  • 下列範例使用 boto3 SDK 進行 ElastiCache 管理作業 (叢集或使用者建立),並使用 redis-redis-py-cluster py/ 來處理資料。

  • 您必須至少使用 boto3 版本 (=~) 1.26.39,才能搭配叢集修改 API 使用線上 TLS 遷移。

  • ElastiCache 僅支援 7.0 版或更新版本之 Redis OSS 叢集的線上 TLS 移轉。因此,如果您有執行 Redis OSS 版本早於 7.0 的叢集,您需要升級叢集的 Redis OSS 版本。如需這些版本差異的詳細資訊,請參閱 主要版本行為和相容性差異

定義將啟動 ElastiCache Redis OSS 叢集的字串常數

首先,讓我們定義一些簡單的 Python 字符串常量,這些常量將保存創建 ElastiCache 集群所需的 AWS 實體的名稱 security-groupCache Subnet group,例如,和default parameter group。所有這些 AWS 實體都必須事先在您願意使用的區域中的 AWS 帳戶中創建。

#Constants definitions SECURITY_GROUP = "sg-0492aa0a29c558427" CLUSTER_DESCRIPTION = "This cluster has been launched as part of the online TLS migration user guide" EC_SUBNET_GROUP = "client-testing" DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED = "default.redis7.cluster.on"

定義叢集組態類別

現在,讓我們定義一些簡單的 Python 類別,這些類別代表叢集的組態,這些類別會保留叢集的中繼資料,例如 Redis OSS 版本、執行個體類型,以及是否啟用或停用傳輸中加密 (TLS)。

#Class definitions class Config: def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, ): self.instance_type = instance_type self.version = version self.multi_az = multi_az self.TLS = TLS self.name = name or f"tls-test" def create_base_launch_request(self): return { "ReplicationGroupId": self.name, "TransitEncryptionEnabled": self.TLS, "MultiAZEnabled": self.multi_az, "CacheNodeType": self.instance_type, "Engine": "redis", "EngineVersion": self.version, "CacheSubnetGroupName": EC_SUBNET_GROUP , "CacheParameterGroupName": DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED , "ReplicationGroupDescription": CLUSTER_DESCRIPTION, "SecurityGroupIds": [SECURITY_GROUP], } class ConfigCME(Config): def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, num_shards: int = 2, num_replicas_per_shard: int = 1, ): super().__init__(instance_type, version, multi_az, TLS, name) self.num_shards = num_shards self.num_replicas_per_shard = num_replicas_per_shard def create_launch_request(self) -> dict: launch_request = self.create_base_launch_request() launch_request["NumNodeGroups"] = self.num_shards launch_request["ReplicasPerNodeGroup"] = self.num_replicas_per_shard return launch_request

定義一個將代表叢集本身的類別

現在,讓我們定義一些簡單的 Python 類,這將代表 ElastiCache Redis 的 OSS 集群本身。這個類將有一個客戶端字段,這將持有一個 boto3 客戶端進行 ElastiCache 管理操作,如創建集群和查詢 API。 ElastiCache

import botocore.config import boto3 # Create boto3 client def init_client(region: str = "us-east-1"): config = botocore.config.Config(retries={"max_attempts": 10, "mode": "standard"}) init_request = dict() init_request["config"] = config init_request["service_name"] = "elasticache" init_request["region_name"] = region return boto3.client(**init_request) class ElastiCacheClusterBase: def __init__(self, name: str): self.name = name self.elasticache_client = init_client() def get_first_replication_group(self): return self.elasticache_client.describe_replication_groups( ReplicationGroupId=self.name )["ReplicationGroups"][0] def get_status(self) -> str: return self.get_first_replication_group()["Status"] def get_transit_encryption_enabled(self) -> bool: return self.get_first_replication_group()["TransitEncryptionEnabled"] def is_available(self) -> bool: return self.get_status() == "available" def is_modifying(self) -> bool: return self.get_status() == "modifying" def wait_for_available(self): while True: if self.is_available(): break else: time.sleep(5) def wait_for_modifying(self): while True: if self.is_modifying(): break else: time.sleep(5) def delete_cluster(self) -> bool: self.elasticache_client.delete_replication_group( ReplicationGroupId=self.name, RetainPrimaryCluster=False ) def modify_transit_encryption_mode(self, new_transit_encryption_mode: str): # generate api call to migrate the cluster to TLS preffered or to TLS required self.elasticache_client.modify_replication_group( ReplicationGroupId=self.name, TransitEncryptionMode=new_transit_encryption_mode, TransitEncryptionEnabled=True, ApplyImmediately=True, ) self.wait_for_modifying() class ElastiCacheClusterCME(ElastiCacheClusterBase): def __init__(self, name: str): super().__init__(name) @classmethod def launch(cls, config: ConfigCME = None) -> ElastiCacheClusterCME: config = config or ConfigCME() print(config) new_cluster = ElastiCacheClusterCME(config.name) launch_request = config.create_launch_request() new_cluster.elasticache_client.create_replication_group(**launch_request) new_cluster.wait_for_available() return new_cluster def get_configuration_endpoint(self) -> str: return self.get_first_replication_group()["ConfigurationEndpoint"]["Address"] #Since the code can throw exceptions, we define this class to make the code more readable and #so we won't forget to delete the cluster class ElastiCacheCMEManager: def __init__(self, config: ConfigCME = None): self.config = config or ConfigCME() def __enter__(self) -> ElastiCacheClusterCME: self.cluster = ElastiCacheClusterCME.launch(self.config) return self.cluster def __exit__(self, exc_type, exc_val, exc_tb): self.cluster.delete_cluster()

(選擇性) 建立包裝函式類別以示範用戶端與 Redis OSS 叢集的連線

現在,讓我們為 redis-py-cluster 用戶端建立一個包裝類別。這個包裝類別將支援使用一些鍵預先填充叢集,然後執行隨機重複的 get 命令。

注意

這是一個選擇性步驟,但它簡化了在後面步驟中出現的主函數。

import redis improt random from time import perf_counter_ns, time class DowntimeTestClient: def __init__(self, client): self.client = client # num of keys prefilled self.prefilled = 0 # percent of get above prefilled self.percent_get_above_prefilled = 10 # nil result expected when get hit above prefilled # total downtime in nano seconds self.downtime_ns = 0 # num of success and fail operations self.success_ops = 0 self.fail_ops = 0 self.connection_errors = 0 self.timeout_errors = 0 def replace_client(self, client): self.client = client def prefill_data(self, timelimit_sec=60): end_time = time() + timelimit_sec while time() < end_time: self.client.set(self.prefilled, self.prefilled) self.prefilled += 1 # unsuccesful operations throw exceptions def _exec(self, func): try: start_ns = perf_counter_ns() func() self.success_ops += 1 elapsed_ms = (perf_counter_ns() - start_ns) // 10 ** 6 # upon succesful execution of func # reset random_key to None so that the next command # will use a new random key self.random_key = None except Exception as e: elapsed_ns = perf_counter_ns() - start_ns self.downtime_ns += elapsed_ns # in case of failure- increment the relevant counters so that we will keep track # of how many connection issues we had while trying to communicate with # the cluster. self.fail_ops += 1 if e.__class__ is redis.exceptions.ConnectionError: self.connection_errors += 1 if e.__class__ is redis.exceptions.TimeoutError: self.timeout_errors += 1 def _repeat_exec(self, func, seconds): end_time = time() + seconds while time() < end_time: self._exec(func) def _new_random_key_if_needed(self, percent_above_prefilled): if self.random_key is None: max = int((self.prefilled * (100 + percent_above_prefilled)) / 100) return random.randint(0, max) return self.random_key def _random_get(self): key = self._new_random_key_if_needed(self.percent_get_above_prefilled) result = self.client.get(key) # we know the key was set for sure only in the case key < self.prefilled if key < self.prefilled: assert result.decode("UTF-8") == str(key) def repeat_get(self, seconds=60): self._repeat_exec(self._random_get, seconds) def get_downtime_ms(self) -> int: return self.downtime_ns // 10 ** 6 def do_get_until(self, cond_check): while not cond_check(): self.repeat_get() # do one more get cycle once condition is met self.repeat_get()

建立主函數,以示範變更傳輸中加密組態的程序

現在,讓我們定義主函數,它將執行以下操作:

  1. 使用 boto3 ElastiCache 客戶端創建集群。

  2. 初始化將使用不具備 TLS 的清楚 TCP 連線連接到叢集的 redis-py-cluster 用戶端。

  3. redis-py-cluster 用戶端會用一些資料預先填充叢集。

  4. boto3 用戶端將會觸發 TLS 從無 TLS 遷移至 TLS 偏好。

  5. 當叢集正在遷移到 TLS Preferred 時,redis-py-cluster TCP 用戶端將向叢集發送重複的 get 操作,直到遷移完成。

  6. 完成遷移至 TLS Preferred 之後,我們會宣告該叢集支援傳輸中加密。之後,我們將建立一個將使用 TLS 連接到叢集的 redis-py-cluster 用戶端。

  7. 我們將使用新的 TLS 用戶端和舊的 TCP 用戶端發送一些 get 命令。

  8. boto3 用戶端將觸發從 TLS Preferred 到 TLS 所需的 TLS 遷移。

  9. 當叢集需要移轉至 TLS 時, redis-py-clusterTLS 用戶端會傳送重複的get作業至叢集,直到移轉完成為止。

import redis def init_cluster_client( cluster: ElastiCacheClusterCME, prefill_data: bool, TLS: bool = True) -> DowntimeTestClient: # we must use for the host name the cluster configuration endpoint. redis_client = redis.RedisCluster( host=cluster.get_configuration_endpoint(), ssl=TLS, socket_timeout=0.25, socket_connect_timeout=0.1 ) test_client = DowntimeTestClient(redis_client) if prefill_data: test_client.prefill_data() return test_client if __name__ == '__main__': config = ConfigCME(TLS=False, instance_type="cache.m5.large") with ElastiCacheCMEManager(config) as cluster: # create a client that will connect to the cluster with clear tcp connection test_client_tcp = init_cluster_client(cluster, prefill_data=True, TLS=False) # migrate the cluster to TLS Preferred cluster.modify_transit_encryption_mode(new_transit_encryption_mode="preferred") # do repeated get commands until the cluster finishes the migration to TLS Preferred test_client_tcp.do_get_until(cluster.is_available) # verify that in transit encryption is enabled so that clients will be able to connect to the cluster with TLS assert cluster.get_transit_encryption_enabled() == True # create a client that will connect to the cluster with TLS connection. # we must first make sure that the cluster indeed supports TLS test_client_tls = init_cluster_client(cluster, prefill_data=True, TLS=True) # by doing get commands with the tcp client for 60 more seconds # we can verify that the existing tcp connection to the cluster still works test_client_tcp.repeat_get(seconds=60) # do get commands with the new TLS client for 60 more seconds test_client_tcp.repeat_get(seconds=60) # migrate the cluster to TLS required cluster.modify_transit_encryption_mode(new_transit_encryption_mode="required") # from this point the tcp clients will be disconnected and we must not use them anymore. # do get commands with the TLS client until the cluster finishes migartion to TLS required mode. test_client_tls.do_get_until(cluster.is_available)