Python을 사용하여 자체 설계된 Redis OSS 클러스터에서 전송 중 암호화 활성화 - Amazon ElastiCache

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Python을 사용하여 자체 설계된 Redis OSS 클러스터에서 전송 중 암호화 활성화

다음 가이드에서는 원래 전송 중 암호화가 비활성화된 상태에서 생성된 Redis OSS 7.0 클러스터에서 전송 중 암호화를 활성화하는 방법을 보여줍니다. TCP 및 TLS 클라이언트는 이 프로세스 중에 가동 중지 없이 클러스터와 계속 통신합니다.

Boto3는 환경 변수에서 필요한 보안 인증 정보(aws_access_key_id, aws_secret_access_keyaws_session_token)를 가져옵니다. 이러한 보안 인증 정보는 이 가이드에 표시된 Python 코드를 처리하기 위해 python3를 실행할 동일한 bash 터미널에 미리 붙여넣어집니다. 아래 예제의 코드는 동일한 에서 시작된 EC2 인스턴스의 프로세스로, 해당 인스턴스에서 ElastiCache Redis OSS 클러스터를 생성하는 데 VPC 사용됩니다.

참고
  • 다음 예제에서는 ElastiCache 관리 작업(클러스터 또는 사용자 생성)에 boto3를 사용하고 데이터 처리를 SDK 위해 redis-py/redis-py-cluster 를 사용합니다.

  • 클러스터 수정과 함께 온라인 TLS 마이그레이션을 사용하려면 boto3 버전(=~) 1.26.39 이상을 사용해야 합니다API.

  • ElastiCache 는 Valkey 버전 7.2 이상 또는 Redis OSS 버전 7.0 이상의 클러스터에 대해서만 온라인 TLS 마이그레이션을 지원합니다. 따라서 7.0 이전 OSS 버전의 Redis를 실행하는 클러스터가 있는 경우 클러스터의 Redis OSS 버전을 업그레이드해야 합니다. 버전 차이에 대한 자세한 내용은 Redis와의 주요 버전 동작 및 호환성 차이 OSS 섹션을 참조하세요.

ElastiCache Valkey 또는 Redis OSS 클러스터를 시작할 문자열 상수 정의

먼저 , 및 와 같이 ElastiCache 클러스터를 생성하는 데 필요한 AWS 엔터티의 이름을 포함하는 몇 가지 간단한 Python 문자열 상수security-groupCache Subnet group를 정의해 보겠습니다default parameter group. 이러한 모든 AWS 엔터티는 사용하려는 리전의 AWS 계정에 미리 생성해야 합니다.

#Constants definitions SECURITY_GROUP = "sg-0492aa0a29c558427" CLUSTER_DESCRIPTION = "This cluster has been launched as part of the online TLS migration user guide" EC_SUBNET_GROUP = "client-testing" DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED = "default.redis7.cluster.on"

클러스터 구성을 위한 클래스를 정의합니다.

이제 클러스터의 구성을 나타내는 몇 가지 간단한 Python 클래스를 정의해 보겠습니다. 이 클래스는 Valkey 또는 Redis OSS 버전, 인스턴스 유형, 전송 중 암호화(TLS)의 활성화 또는 비활성화 여부와 같이 클러스터에 대한 메타데이터를 저장합니다.

#Class definitions class Config: def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, ): self.instance_type = instance_type self.version = version self.multi_az = multi_az self.TLS = TLS self.name = name or f"tls-test" def create_base_launch_request(self): return { "ReplicationGroupId": self.name, "TransitEncryptionEnabled": self.TLS, "MultiAZEnabled": self.multi_az, "CacheNodeType": self.instance_type, "Engine": "redis", "EngineVersion": self.version, "CacheSubnetGroupName": EC_SUBNET_GROUP , "CacheParameterGroupName": DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED , "ReplicationGroupDescription": CLUSTER_DESCRIPTION, "SecurityGroupIds": [SECURITY_GROUP], } class ConfigCME(Config): def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, num_shards: int = 2, num_replicas_per_shard: int = 1, ): super().__init__(instance_type, version, multi_az, TLS, name) self.num_shards = num_shards self.num_replicas_per_shard = num_replicas_per_shard def create_launch_request(self) -> dict: launch_request = self.create_base_launch_request() launch_request["NumNodeGroups"] = self.num_shards launch_request["ReplicasPerNodeGroup"] = self.num_replicas_per_shard return launch_request

클러스터 자체를 나타내는 클래스를 정의합니다.

이제 ElastiCache Valkey 또는 Redis OSS 클러스터 자체를 나타내는 몇 가지 간단한 Python 클래스를 정의해 보겠습니다. 이 클래스에는 클러스터 생성 및 쿼리 ElastiCache와 같은 ElastiCache 관리 작업을 위해 boto3 클라이언트를 포함하는 클라이언트 필드가 있습니다API.

import botocore.config import boto3 # Create boto3 client def init_client(region: str = "us-east-1"): config = botocore.config.Config(retries={"max_attempts": 10, "mode": "standard"}) init_request = dict() init_request["config"] = config init_request["service_name"] = "elasticache" init_request["region_name"] = region return boto3.client(**init_request) class ElastiCacheClusterBase: def __init__(self, name: str): self.name = name self.elasticache_client = init_client() def get_first_replication_group(self): return self.elasticache_client.describe_replication_groups( ReplicationGroupId=self.name )["ReplicationGroups"][0] def get_status(self) -> str: return self.get_first_replication_group()["Status"] def get_transit_encryption_enabled(self) -> bool: return self.get_first_replication_group()["TransitEncryptionEnabled"] def is_available(self) -> bool: return self.get_status() == "available" def is_modifying(self) -> bool: return self.get_status() == "modifying" def wait_for_available(self): while True: if self.is_available(): break else: time.sleep(5) def wait_for_modifying(self): while True: if self.is_modifying(): break else: time.sleep(5) def delete_cluster(self) -> bool: self.elasticache_client.delete_replication_group( ReplicationGroupId=self.name, RetainPrimaryCluster=False ) def modify_transit_encryption_mode(self, new_transit_encryption_mode: str): # generate api call to migrate the cluster to TLS preffered or to TLS required self.elasticache_client.modify_replication_group( ReplicationGroupId=self.name, TransitEncryptionMode=new_transit_encryption_mode, TransitEncryptionEnabled=True, ApplyImmediately=True, ) self.wait_for_modifying() class ElastiCacheClusterCME(ElastiCacheClusterBase): def __init__(self, name: str): super().__init__(name) @classmethod def launch(cls, config: ConfigCME = None) -> ElastiCacheClusterCME: config = config or ConfigCME() print(config) new_cluster = ElastiCacheClusterCME(config.name) launch_request = config.create_launch_request() new_cluster.elasticache_client.create_replication_group(**launch_request) new_cluster.wait_for_available() return new_cluster def get_configuration_endpoint(self) -> str: return self.get_first_replication_group()["ConfigurationEndpoint"]["Address"] #Since the code can throw exceptions, we define this class to make the code more readable and #so we won't forget to delete the cluster class ElastiCacheCMEManager: def __init__(self, config: ConfigCME = None): self.config = config or ConfigCME() def __enter__(self) -> ElastiCacheClusterCME: self.cluster = ElastiCacheClusterCME.launch(self.config) return self.cluster def __exit__(self, exc_type, exc_val, exc_tb): self.cluster.delete_cluster()

(선택 사항) Valkey 또는 Redis OSS 클러스터에 대한 클라이언트 연결을 시연할 래퍼 클래스 생성

이제 redis-py-cluster 클라이언트를 위한 래퍼 클래스를 생성해 보겠습니다. 이 래퍼 클래스는 클러스터를 일부 키로 미리 채운 다음 무작위로 반복되는 get 명령을 수행하는 것을 지원합니다.

참고

이 단계는 선택 사항이지만 이후 단계에 나오는 기본 함수의 코드를 단순화합니다.

import redis improt random from time import perf_counter_ns, time class DowntimeTestClient: def __init__(self, client): self.client = client # num of keys prefilled self.prefilled = 0 # percent of get above prefilled self.percent_get_above_prefilled = 10 # nil result expected when get hit above prefilled # total downtime in nano seconds self.downtime_ns = 0 # num of success and fail operations self.success_ops = 0 self.fail_ops = 0 self.connection_errors = 0 self.timeout_errors = 0 def replace_client(self, client): self.client = client def prefill_data(self, timelimit_sec=60): end_time = time() + timelimit_sec while time() < end_time: self.client.set(self.prefilled, self.prefilled) self.prefilled += 1 # unsuccesful operations throw exceptions def _exec(self, func): try: start_ns = perf_counter_ns() func() self.success_ops += 1 elapsed_ms = (perf_counter_ns() - start_ns) // 10 ** 6 # upon succesful execution of func # reset random_key to None so that the next command # will use a new random key self.random_key = None except Exception as e: elapsed_ns = perf_counter_ns() - start_ns self.downtime_ns += elapsed_ns # in case of failure- increment the relevant counters so that we will keep track # of how many connection issues we had while trying to communicate with # the cluster. self.fail_ops += 1 if e.__class__ is redis.exceptions.ConnectionError: self.connection_errors += 1 if e.__class__ is redis.exceptions.TimeoutError: self.timeout_errors += 1 def _repeat_exec(self, func, seconds): end_time = time() + seconds while time() < end_time: self._exec(func) def _new_random_key_if_needed(self, percent_above_prefilled): if self.random_key is None: max = int((self.prefilled * (100 + percent_above_prefilled)) / 100) return random.randint(0, max) return self.random_key def _random_get(self): key = self._new_random_key_if_needed(self.percent_get_above_prefilled) result = self.client.get(key) # we know the key was set for sure only in the case key < self.prefilled if key < self.prefilled: assert result.decode("UTF-8") == str(key) def repeat_get(self, seconds=60): self._repeat_exec(self._random_get, seconds) def get_downtime_ms(self) -> int: return self.downtime_ns // 10 ** 6 def do_get_until(self, cond_check): while not cond_check(): self.repeat_get() # do one more get cycle once condition is met self.repeat_get()

전송 중 데이터 암호화 구성 변경 프로세스를 시연하는 기본 함수 생성

이제 다음을 수행하는 기본 함수를 정의해 보겠습니다.

  1. boto3 ElastiCache client를 사용하여 클러스터를 생성합니다.

  2. 를 사용하지 않고 명확한 연결로 클러스터에 TCP 연결할 redis-py-cluster 클라이언트를 초기화합니다TLS.

  3. redis-py-cluster 클라이언트는 클러스터를 일부 데이터로 미리 채웁니다.

  4. boto3 클라이언트는 no에서TLS TLS preferred로 TLS 마이그레이션을 트리거합니다.

  5. 클러스터가 TLS 로 마이그레이션되는 동안 Preferred redis-py-cluster TCP 클라이언트는 마이그레이션이 완료될 때까지 클러스터에 반복 get 작업을 보냅니다.

  6. 로 마이그레이션TLSPreferred이 완료되면 클러스터가 전송 중 암호화를 지원한다고 어설션합니다. 그런 다음 를 사용하여 클러스터에 연결할 redis-py-cluster 클라이언트를 생성합니다TLS.

  7. 새 TLS 클라이언트와 이전 TCP 클라이언트를 사용하여 몇 가지 get 명령을 보냅니다.

  8. boto3 클라이언트는 에서 TLS 필수로의 TLS 마이그레이션TLSPreferred을 트리거합니다.

  9. 클러스터가 TLS 필수 redis-py-cluster로 마이그레이션되는 동안 TLS 클라이언트는 마이그레이션이 완료될 때까지 클러스터에 반복 get 작업을 보냅니다.

import redis def init_cluster_client( cluster: ElastiCacheClusterCME, prefill_data: bool, TLS: bool = True) -> DowntimeTestClient: # we must use for the host name the cluster configuration endpoint. redis_client = redis.RedisCluster( host=cluster.get_configuration_endpoint(), ssl=TLS, socket_timeout=0.25, socket_connect_timeout=0.1 ) test_client = DowntimeTestClient(redis_client) if prefill_data: test_client.prefill_data() return test_client if __name__ == '__main__': config = ConfigCME(TLS=False, instance_type="cache.m5.large") with ElastiCacheCMEManager(config) as cluster: # create a client that will connect to the cluster with clear tcp connection test_client_tcp = init_cluster_client(cluster, prefill_data=True, TLS=False) # migrate the cluster to TLS Preferred cluster.modify_transit_encryption_mode(new_transit_encryption_mode="preferred") # do repeated get commands until the cluster finishes the migration to TLS Preferred test_client_tcp.do_get_until(cluster.is_available) # verify that in transit encryption is enabled so that clients will be able to connect to the cluster with TLS assert cluster.get_transit_encryption_enabled() == True # create a client that will connect to the cluster with TLS connection. # we must first make sure that the cluster indeed supports TLS test_client_tls = init_cluster_client(cluster, prefill_data=True, TLS=True) # by doing get commands with the tcp client for 60 more seconds # we can verify that the existing tcp connection to the cluster still works test_client_tcp.repeat_get(seconds=60) # do get commands with the new TLS client for 60 more seconds test_client_tcp.repeat_get(seconds=60) # migrate the cluster to TLS required cluster.modify_transit_encryption_mode(new_transit_encryption_mode="required") # from this point the tcp clients will be disconnected and we must not use them anymore. # do get commands with the TLS client until the cluster finishes migartion to TLS required mode. test_client_tls.do_get_until(cluster.is_available)