View a markdown version of this page

Python を使用してノードベースの Redis OSS クラスターで送信中の暗号化を有効にする - Amazon ElastiCache

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Python を使用してノードベースの Redis OSS クラスターで送信中の暗号化を有効にする

以下のガイドでは、最初は転送中の暗号化を無効にして作成された Redis OSS 7.0 クラスターで転送中の暗号化を有効にする方法を説明します。TCP クライアントと TLS クライアントは、このプロセス中もダウンタイムなしでクラスターとの通信を継続します。

Boto3 は環境変数から必要な認証情報 (aws_access_key_idaws_secret_access_key、および aws_session_token) を取得します。これらの認証情報は、このガイドに示されている Python コードを処理するために python3 を実行するのと同じ bash ターミナルにあらかじめ貼り付けておきます。以下の例のコードは、同じ VPC で起動された EC2 インスタンスからのプロセスであり、その中に ElastiCache Redis OSS クラスターを作成するために使用されます。

注記
  • 次の例では、ElastiCache 管理オペレーション (クラスターまたはユーザーの作成) で boto3 SDK を使用し、データ処理で redis-py/redis-py-cluster を使用します。

  • クラスター変更 API でオンライン TLS 移行を使用するには、boto3 バージョン (=~) 1.26.39 以上を使用する必要があります。

  • ElastiCache は、Valkey バージョン 7.2 以降、Redis OSS バージョン 7.0 以降のクラスターに対してのみオンライン TLS 移行をサポートします。そのため、7.0 より前のバージョンの Redis OSS を実行しているクラスターがある場合は、クラスターの Redis OSS バージョンをアップグレードする必要があります。バージョンの違いの詳細については、「Redis OSS でのメジャーエンジンバージョンの動作と互換性の違い」を参照してください。

ElastiCache Valkey または Redis OSS クラスターを起動する文字列定数を定義する

まず、、、 などの ElastiCache クラスターの作成に必要な AWS エンティティの名前を保持する単純な Cache Subnet groupPython security-group文字列定数を定義しましょうdefault parameter group。これらの AWS エンティティはすべて、使用するリージョンの AWS アカウントで事前に作成する必要があります。

#Constants definitions SECURITY_GROUP = "sg-0492aa0a29c558427" CLUSTER_DESCRIPTION = "This cluster has been launched as part of the online TLS migration user guide" EC_SUBNET_GROUP = "client-testing" DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED = "default.redis7.cluster.on"

クラスター構成用のクラスを定義する

ここで、クラスターの設定を表すシンプルな Python クラスをいくつか定義しましょう。このクラスには、Valkey または Redis OSS のバージョン、インスタンスタイプ、転送中の暗号化 (TLS) が有効か無効かなど、クラスターに関するメタデータが格納されます。

#Class definitions class Config: def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, ): self.instance_type = instance_type self.version = version self.multi_az = multi_az self.TLS = TLS self.name = name or f"tls-test" def create_base_launch_request(self): return { "ReplicationGroupId": self.name, "TransitEncryptionEnabled": self.TLS, "MultiAZEnabled": self.multi_az, "CacheNodeType": self.instance_type, "Engine": "redis", "EngineVersion": self.version, "CacheSubnetGroupName": EC_SUBNET_GROUP , "CacheParameterGroupName": DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED , "ReplicationGroupDescription": CLUSTER_DESCRIPTION, "SecurityGroupIds": [SECURITY_GROUP], } class ConfigCME(Config): def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, num_shards: int = 2, num_replicas_per_shard: int = 1, ): super().__init__(instance_type, version, multi_az, TLS, name) self.num_shards = num_shards self.num_replicas_per_shard = num_replicas_per_shard def create_launch_request(self) -> dict: launch_request = self.create_base_launch_request() launch_request["NumNodeGroups"] = self.num_shards launch_request["ReplicasPerNodeGroup"] = self.num_replicas_per_shard return launch_request

クラスター自体を表すクラスを定義する

次に、ElastiCache Valkey または Redis OSS クラスター自体を表すシンプルな Python クラスをいくつか定義しましょう。このクラスには、クラスターの作成や ElastiCache API のクエリなどの ElastiCache 管理オペレーションに使用する boto3 クライアントを格納するクライアントフィールドがあります。

import botocore.config import boto3 # Create boto3 client def init_client(region: str = "us-east-1"): config = botocore.config.Config(retries={"max_attempts": 10, "mode": "standard"}) init_request = dict() init_request["config"] = config init_request["service_name"] = "elasticache" init_request["region_name"] = region return boto3.client(**init_request) class ElastiCacheClusterBase: def __init__(self, name: str): self.name = name self.elasticache_client = init_client() def get_first_replication_group(self): return self.elasticache_client.describe_replication_groups( ReplicationGroupId=self.name )["ReplicationGroups"][0] def get_status(self) -> str: return self.get_first_replication_group()["Status"] def get_transit_encryption_enabled(self) -> bool: return self.get_first_replication_group()["TransitEncryptionEnabled"] def is_available(self) -> bool: return self.get_status() == "available" def is_modifying(self) -> bool: return self.get_status() == "modifying" def wait_for_available(self): while True: if self.is_available(): break else: time.sleep(5) def wait_for_modifying(self): while True: if self.is_modifying(): break else: time.sleep(5) def delete_cluster(self) -> bool: self.elasticache_client.delete_replication_group( ReplicationGroupId=self.name, RetainPrimaryCluster=False ) def modify_transit_encryption_mode(self, new_transit_encryption_mode: str): # generate api call to migrate the cluster to TLS preffered or to TLS required self.elasticache_client.modify_replication_group( ReplicationGroupId=self.name, TransitEncryptionMode=new_transit_encryption_mode, TransitEncryptionEnabled=True, ApplyImmediately=True, ) self.wait_for_modifying() class ElastiCacheClusterCME(ElastiCacheClusterBase): def __init__(self, name: str): super().__init__(name) @classmethod def launch(cls, config: ConfigCME = None) -> ElastiCacheClusterCME: config = config or ConfigCME() print(config) new_cluster = ElastiCacheClusterCME(config.name) launch_request = config.create_launch_request() new_cluster.elasticache_client.create_replication_group(**launch_request) new_cluster.wait_for_available() return new_cluster def get_configuration_endpoint(self) -> str: return self.get_first_replication_group()["ConfigurationEndpoint"]["Address"] #Since the code can throw exceptions, we define this class to make the code more readable and #so we won't forget to delete the cluster class ElastiCacheCMEManager: def __init__(self, config: ConfigCME = None): self.config = config or ConfigCME() def __enter__(self) -> ElastiCacheClusterCME: self.cluster = ElastiCacheClusterCME.launch(self.config) return self.cluster def __exit__(self, exc_type, exc_val, exc_tb): self.cluster.delete_cluster()

(オプション) Valkey または Redis OSS クラスターへのクライアント接続をデモするためのラッパークラスを作成する

次に、redis-py-cluster クライアント用のラッパークラスを作成しましょう。このラッパークラスは、クラスターにいくつかのキーをあらかじめ入力してから、ランダムに繰り返し get コマンドを実行することをサポートします。

注記

これはオプションのステップですが、後のステップに含まれる main 関数のコードが簡略化されます。

import redis improt random from time import perf_counter_ns, time class DowntimeTestClient: def __init__(self, client): self.client = client # num of keys prefilled self.prefilled = 0 # percent of get above prefilled self.percent_get_above_prefilled = 10 # nil result expected when get hit above prefilled # total downtime in nano seconds self.downtime_ns = 0 # num of success and fail operations self.success_ops = 0 self.fail_ops = 0 self.connection_errors = 0 self.timeout_errors = 0 def replace_client(self, client): self.client = client def prefill_data(self, timelimit_sec=60): end_time = time() + timelimit_sec while time() < end_time: self.client.set(self.prefilled, self.prefilled) self.prefilled += 1 # unsuccesful operations throw exceptions def _exec(self, func): try: start_ns = perf_counter_ns() func() self.success_ops += 1 elapsed_ms = (perf_counter_ns() - start_ns) // 10 ** 6 # upon succesful execution of func # reset random_key to None so that the next command # will use a new random key self.random_key = None except Exception as e: elapsed_ns = perf_counter_ns() - start_ns self.downtime_ns += elapsed_ns # in case of failure- increment the relevant counters so that we will keep track # of how many connection issues we had while trying to communicate with # the cluster. self.fail_ops += 1 if e.__class__ is redis.exceptions.ConnectionError: self.connection_errors += 1 if e.__class__ is redis.exceptions.TimeoutError: self.timeout_errors += 1 def _repeat_exec(self, func, seconds): end_time = time() + seconds while time() < end_time: self._exec(func) def _new_random_key_if_needed(self, percent_above_prefilled): if self.random_key is None: max = int((self.prefilled * (100 + percent_above_prefilled)) / 100) return random.randint(0, max) return self.random_key def _random_get(self): key = self._new_random_key_if_needed(self.percent_get_above_prefilled) result = self.client.get(key) # we know the key was set for sure only in the case key < self.prefilled if key < self.prefilled: assert result.decode("UTF-8") == str(key) def repeat_get(self, seconds=60): self._repeat_exec(self._random_get, seconds) def get_downtime_ms(self) -> int: return self.downtime_ns // 10 ** 6 def do_get_until(self, cond_check): while not cond_check(): self.repeat_get() # do one more get cycle once condition is met self.repeat_get()

転送中の暗号化設定を変更するプロセスをデモする main 関数を作成する

それでは、次の処理を行う main 関数を定義しましょう。

  1. boto3 ElastiCache クライアントを使用してクラスターを作成します。

  2. TLS を使用しないクリア TCP 接続でクラスターに接続する redis-py-cluster クライアントを初期化します。

  3. redis-py-cluster クライアントはクラスターにいくつかのデータを事前入力します。

  4. boto3 クライアントは、TLS なしから TLS 優先への TLS 移行をトリガーします。

  5. クラスターが TLS Preferred に移行されている間、redis-py-cluster TCP クライアントは、移行が完了するまでクラスターに繰り返し get オペレーションを送信します。

  6. TLS Preferred への移行が完了したら、クラスターが転送中の暗号化をサポートしていることを確認します。その後、TLS を使用してクラスターに接続する redis-py-cluster クライアントを作成します。

  7. 新しい TLS クライアントと古い TCP クライアントを使用して、いくつかの get コマンドを送信します。

  8. boto3 クライアントは、TLS Preferred から TLS 必須への TLS 移行をトリガーします。

  9. クラスターが TLS 必須に移行されている間、redis-py-cluster TCP クライアントは、移行が完了するまでクラスターに繰り返し get オペレーションを送信します。

import redis def init_cluster_client( cluster: ElastiCacheClusterCME, prefill_data: bool, TLS: bool = True) -> DowntimeTestClient: # we must use for the host name the cluster configuration endpoint. redis_client = redis.RedisCluster( host=cluster.get_configuration_endpoint(), ssl=TLS, socket_timeout=0.25, socket_connect_timeout=0.1 ) test_client = DowntimeTestClient(redis_client) if prefill_data: test_client.prefill_data() return test_client if __name__ == '__main__': config = ConfigCME(TLS=False, instance_type="cache.m5.large") with ElastiCacheCMEManager(config) as cluster: # create a client that will connect to the cluster with clear tcp connection test_client_tcp = init_cluster_client(cluster, prefill_data=True, TLS=False) # migrate the cluster to TLS Preferred cluster.modify_transit_encryption_mode(new_transit_encryption_mode="preferred") # do repeated get commands until the cluster finishes the migration to TLS Preferred test_client_tcp.do_get_until(cluster.is_available) # verify that in transit encryption is enabled so that clients will be able to connect to the cluster with TLS assert cluster.get_transit_encryption_enabled() == True # create a client that will connect to the cluster with TLS connection. # we must first make sure that the cluster indeed supports TLS test_client_tls = init_cluster_client(cluster, prefill_data=True, TLS=True) # by doing get commands with the tcp client for 60 more seconds # we can verify that the existing tcp connection to the cluster still works test_client_tcp.repeat_get(seconds=60) # do get commands with the new TLS client for 60 more seconds test_client_tcp.repeat_get(seconds=60) # migrate the cluster to TLS required cluster.modify_transit_encryption_mode(new_transit_encryption_mode="required") # from this point the tcp clients will be disconnected and we must not use them anymore. # do get commands with the TLS client until the cluster finishes migartion to TLS required mode. test_client_tls.do_get_until(cluster.is_available)