Python을 사용하여 자체 설계된 Redis OSS 클러스터에서 전송 중 암호화 활성화 - 아마존 ElastiCache (레디 스OSS)

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Python을 사용하여 자체 설계된 Redis OSS 클러스터에서 전송 중 암호화 활성화

다음 가이드에서는 원래 전송 중 암호화가 비활성화된 상태로 생성된 Redis OSS 7.0 클러스터에서 전송 중 암호화를 활성화하는 방법을 보여줍니다. TCP 및 TLS 클라이언트는 이 프로세스 동안 가동 중지 없이 클러스터와 계속 통신합니다.

Boto3는 환경 변수에서 필요한 보안 인증 정보(aws_access_key_id, aws_secret_access_keyaws_session_token)를 가져옵니다. 이러한 보안 인증 정보는 이 가이드에 표시된 Python 코드를 처리하기 위해 python3를 실행할 동일한 bash 터미널에 미리 붙여넣어집니다. 아래 예제의 코드는 Redis OSS 클러스터를 생성하는 ElastiCache 데 사용될 동일한 VPC에서 시작된 EC2 인스턴스의 프로세스입니다.

참고
  • 다음 예에서는 boto3 SDK를 ElastiCache 관리 작업 (클러스터 또는 사용자 생성) 에 사용하고 redis-py/를 데이터 처리에는 redis-py/를 사용합니다. redis-py-cluster

  • 클러스터 수정 API와 함께 온라인 TLS 마이그레이션을 사용하려면 최소한 boto3 버전 (=~) 1.26.39를 사용해야 합니다.

  • ElastiCache 버전 7.0 이상의 Redis OSS 클러스터에 대해서만 온라인 TLS 마이그레이션을 지원합니다. 따라서 7.0 이전의 Redis OSS 버전을 실행하는 클러스터가 있는 경우 클러스터의 Redis OSS 버전을 업그레이드해야 합니다. 버전 차이에 대한 자세한 내용은 메이저 버전 동작 및 호환성 차이 섹션을 참조하세요.

Redis OSS 클러스터를 시작하는 문자열 상수를 정의합니다. ElastiCache

먼저,,, default parameter group a와 같이 ElastiCache security-group 클러스터를 만드는 데 필요한 AWS 엔티티의 이름을 담을 간단한 Python 문자열 상수를 정의해 보겠습니다. Cache Subnet group 이러한 모든 AWS 엔티티는 사용하려는 지역의 AWS 계정에서 미리 생성해야 합니다.

#Constants definitions SECURITY_GROUP = "sg-0492aa0a29c558427" CLUSTER_DESCRIPTION = "This cluster has been launched as part of the online TLS migration user guide" EC_SUBNET_GROUP = "client-testing" DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED = "default.redis7.cluster.on"

클러스터 구성을 위한 클래스를 정의합니다.

이제 클러스터 구성을 나타내는 몇 가지 간단한 Python 클래스를 정의해 보겠습니다. 이 클래스에는 Redis OSS 버전, 인스턴스 유형, 전송 중 암호화 (TLS) 사용 여부 등 클러스터에 대한 메타데이터가 들어 있습니다.

#Class definitions class Config: def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, ): self.instance_type = instance_type self.version = version self.multi_az = multi_az self.TLS = TLS self.name = name or f"tls-test" def create_base_launch_request(self): return { "ReplicationGroupId": self.name, "TransitEncryptionEnabled": self.TLS, "MultiAZEnabled": self.multi_az, "CacheNodeType": self.instance_type, "Engine": "redis", "EngineVersion": self.version, "CacheSubnetGroupName": EC_SUBNET_GROUP , "CacheParameterGroupName": DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED , "ReplicationGroupDescription": CLUSTER_DESCRIPTION, "SecurityGroupIds": [SECURITY_GROUP], } class ConfigCME(Config): def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, num_shards: int = 2, num_replicas_per_shard: int = 1, ): super().__init__(instance_type, version, multi_az, TLS, name) self.num_shards = num_shards self.num_replicas_per_shard = num_replicas_per_shard def create_launch_request(self) -> dict: launch_request = self.create_base_launch_request() launch_request["NumNodeGroups"] = self.num_shards launch_request["ReplicasPerNodeGroup"] = self.num_replicas_per_shard return launch_request

클러스터 자체를 나타내는 클래스를 정의합니다.

이제 ElastiCache Redis OSS 클러스터 자체를 나타내는 몇 가지 간단한 Python 클래스를 정의해 보겠습니다. 이 클래스에는 클러스터 생성 및 API 쿼리와 같은 ElastiCache 관리 작업을 위한 boto3 클라이언트가 들어 있는 클라이언트 필드가 있습니다. ElastiCache

import botocore.config import boto3 # Create boto3 client def init_client(region: str = "us-east-1"): config = botocore.config.Config(retries={"max_attempts": 10, "mode": "standard"}) init_request = dict() init_request["config"] = config init_request["service_name"] = "elasticache" init_request["region_name"] = region return boto3.client(**init_request) class ElastiCacheClusterBase: def __init__(self, name: str): self.name = name self.elasticache_client = init_client() def get_first_replication_group(self): return self.elasticache_client.describe_replication_groups( ReplicationGroupId=self.name )["ReplicationGroups"][0] def get_status(self) -> str: return self.get_first_replication_group()["Status"] def get_transit_encryption_enabled(self) -> bool: return self.get_first_replication_group()["TransitEncryptionEnabled"] def is_available(self) -> bool: return self.get_status() == "available" def is_modifying(self) -> bool: return self.get_status() == "modifying" def wait_for_available(self): while True: if self.is_available(): break else: time.sleep(5) def wait_for_modifying(self): while True: if self.is_modifying(): break else: time.sleep(5) def delete_cluster(self) -> bool: self.elasticache_client.delete_replication_group( ReplicationGroupId=self.name, RetainPrimaryCluster=False ) def modify_transit_encryption_mode(self, new_transit_encryption_mode: str): # generate api call to migrate the cluster to TLS preffered or to TLS required self.elasticache_client.modify_replication_group( ReplicationGroupId=self.name, TransitEncryptionMode=new_transit_encryption_mode, TransitEncryptionEnabled=True, ApplyImmediately=True, ) self.wait_for_modifying() class ElastiCacheClusterCME(ElastiCacheClusterBase): def __init__(self, name: str): super().__init__(name) @classmethod def launch(cls, config: ConfigCME = None) -> ElastiCacheClusterCME: config = config or ConfigCME() print(config) new_cluster = ElastiCacheClusterCME(config.name) launch_request = config.create_launch_request() new_cluster.elasticache_client.create_replication_group(**launch_request) new_cluster.wait_for_available() return new_cluster def get_configuration_endpoint(self) -> str: return self.get_first_replication_group()["ConfigurationEndpoint"]["Address"] #Since the code can throw exceptions, we define this class to make the code more readable and #so we won't forget to delete the cluster class ElastiCacheCMEManager: def __init__(self, config: ConfigCME = None): self.config = config or ConfigCME() def __enter__(self) -> ElastiCacheClusterCME: self.cluster = ElastiCacheClusterCME.launch(self.config) return self.cluster def __exit__(self, exc_type, exc_val, exc_tb): self.cluster.delete_cluster()

(선택 사항) Redis OSS 클러스터에 대한 클라이언트 연결을 시연하기 위한 래퍼 클래스 생성

이제 redis-py-cluster 클라이언트를 위한 래퍼 클래스를 생성해 보겠습니다. 이 래퍼 클래스는 클러스터를 일부 키로 미리 채운 다음 무작위로 반복되는 get 명령을 수행하는 것을 지원합니다.

참고

이 단계는 선택 사항이지만 이후 단계에 나오는 기본 함수의 코드를 단순화합니다.

import redis improt random from time import perf_counter_ns, time class DowntimeTestClient: def __init__(self, client): self.client = client # num of keys prefilled self.prefilled = 0 # percent of get above prefilled self.percent_get_above_prefilled = 10 # nil result expected when get hit above prefilled # total downtime in nano seconds self.downtime_ns = 0 # num of success and fail operations self.success_ops = 0 self.fail_ops = 0 self.connection_errors = 0 self.timeout_errors = 0 def replace_client(self, client): self.client = client def prefill_data(self, timelimit_sec=60): end_time = time() + timelimit_sec while time() < end_time: self.client.set(self.prefilled, self.prefilled) self.prefilled += 1 # unsuccesful operations throw exceptions def _exec(self, func): try: start_ns = perf_counter_ns() func() self.success_ops += 1 elapsed_ms = (perf_counter_ns() - start_ns) // 10 ** 6 # upon succesful execution of func # reset random_key to None so that the next command # will use a new random key self.random_key = None except Exception as e: elapsed_ns = perf_counter_ns() - start_ns self.downtime_ns += elapsed_ns # in case of failure- increment the relevant counters so that we will keep track # of how many connection issues we had while trying to communicate with # the cluster. self.fail_ops += 1 if e.__class__ is redis.exceptions.ConnectionError: self.connection_errors += 1 if e.__class__ is redis.exceptions.TimeoutError: self.timeout_errors += 1 def _repeat_exec(self, func, seconds): end_time = time() + seconds while time() < end_time: self._exec(func) def _new_random_key_if_needed(self, percent_above_prefilled): if self.random_key is None: max = int((self.prefilled * (100 + percent_above_prefilled)) / 100) return random.randint(0, max) return self.random_key def _random_get(self): key = self._new_random_key_if_needed(self.percent_get_above_prefilled) result = self.client.get(key) # we know the key was set for sure only in the case key < self.prefilled if key < self.prefilled: assert result.decode("UTF-8") == str(key) def repeat_get(self, seconds=60): self._repeat_exec(self._random_get, seconds) def get_downtime_ms(self) -> int: return self.downtime_ns // 10 ** 6 def do_get_until(self, cond_check): while not cond_check(): self.repeat_get() # do one more get cycle once condition is met self.repeat_get()

전송 중 데이터 암호화 구성 변경 프로세스를 시연하는 기본 함수 생성

이제 다음을 수행하는 기본 함수를 정의해 보겠습니다.

  1. boto3 클라이언트를 사용하여 클러스터를 생성합니다. ElastiCache

  2. TLS 없이 명확한 TCP 연결로 클러스터에 연결할 redis-py-cluster 클라이언트를 초기화합니다.

  3. redis-py-cluster 클라이언트는 클러스터를 일부 데이터로 미리 채웁니다.

  4. boto3 클라이언트는 비 TLS에서 TLS 선호로의 TLS 마이그레이션을 트리거합니다.

  5. 클러스터를 Preferred TLS로 마이그레이션하는 동안 redis-py-cluster TCP 클라이언트는 마이그레이션이 완료될 때까지 클러스터에 반복 get 작업을 전송합니다.

  6. TLS Preferred로의 마이그레이션이 완료되면 클러스터가 전송 중 데이터 암호화를 지원하는지 확인합니다. 그런 다음 TLS를 사용하여 클러스터에 연결할 redis-py-cluster 클라이언트를 생성합니다.

  7. 새 TLS 클라이언트와 이전 TCP 클라이언트를 사용하여 몇 가지 get 명령을 보내겠습니다.

  8. boto3 클라이언트는 TLS Preferred에서 TLS 필수로의 TLS 마이그레이션을 트리거합니다.

  9. 클러스터를 TLS로 마이그레이션하는 동안 TLS 클라이언트는 마이그레이션이 redis-py-cluster 완료될 때까지 클러스터에 반복 get 작업을 전송합니다.

import redis def init_cluster_client( cluster: ElastiCacheClusterCME, prefill_data: bool, TLS: bool = True) -> DowntimeTestClient: # we must use for the host name the cluster configuration endpoint. redis_client = redis.RedisCluster( host=cluster.get_configuration_endpoint(), ssl=TLS, socket_timeout=0.25, socket_connect_timeout=0.1 ) test_client = DowntimeTestClient(redis_client) if prefill_data: test_client.prefill_data() return test_client if __name__ == '__main__': config = ConfigCME(TLS=False, instance_type="cache.m5.large") with ElastiCacheCMEManager(config) as cluster: # create a client that will connect to the cluster with clear tcp connection test_client_tcp = init_cluster_client(cluster, prefill_data=True, TLS=False) # migrate the cluster to TLS Preferred cluster.modify_transit_encryption_mode(new_transit_encryption_mode="preferred") # do repeated get commands until the cluster finishes the migration to TLS Preferred test_client_tcp.do_get_until(cluster.is_available) # verify that in transit encryption is enabled so that clients will be able to connect to the cluster with TLS assert cluster.get_transit_encryption_enabled() == True # create a client that will connect to the cluster with TLS connection. # we must first make sure that the cluster indeed supports TLS test_client_tls = init_cluster_client(cluster, prefill_data=True, TLS=True) # by doing get commands with the tcp client for 60 more seconds # we can verify that the existing tcp connection to the cluster still works test_client_tcp.repeat_get(seconds=60) # do get commands with the new TLS client for 60 more seconds test_client_tcp.repeat_get(seconds=60) # migrate the cluster to TLS required cluster.modify_transit_encryption_mode(new_transit_encryption_mode="required") # from this point the tcp clients will be disconnected and we must not use them anymore. # do get commands with the TLS client until the cluster finishes migartion to TLS required mode. test_client_tls.do_get_until(cluster.is_available)