Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Aktivierung der Verschlüsselung während der Übertragung auf einem selbst entworfenen OSS Redis-Cluster mit Python
Die folgende Anleitung zeigt, wie die Verschlüsselung bei der Übertragung auf einem Redis OSS 7.0-Cluster aktiviert wird, der ursprünglich mit deaktivierter Verschlüsselung während der Übertragung erstellt wurde. TCPund die TLS Clients werden während dieses Vorgangs ohne Ausfallzeiten weiterhin mit dem Cluster kommunizieren.
Boto3 ruft die benötigten Anmeldeinformationen (aws_access_key_id
, aws_secret_access_key
, und aws_session_token
) aus den Umgebungsvariablen ab. Diese Anmeldeinformationen werden vorab in dasselbe Bash-Terminal eingefügt, in dem wir python3
ausführen, um den in diesem Handbuch gezeigten Python-Code zu verarbeiten. Der Code im folgenden Beispiel wurde von einer EC2 Instanz verarbeitet, die in derselben Instanz gestartet wurdeVPC, mit der der ElastiCache OSS Redis-Cluster darin erstellt wird.
Anmerkung
-
In den folgenden Beispielen wird boto3 SDK für ElastiCache Verwaltungsvorgänge (Cluster- oder Benutzererstellung) und redis-py-cluster redis-py/ für die Datenverarbeitung verwendet.
-
Sie müssen mindestens die Boto3-Version (=~) 1.26.39 verwenden, um die Online-Migration mit der Cluster-Modifikation verwenden zu können. TLS API
-
ElastiCache unterstützt die TLS Online-Migration nur für Cluster mit Valkey Version 7.2 und höher oder Redis Version 7.0 oder höher. OSS Wenn Sie also einen Cluster haben, auf dem eine OSS Redis-Version vor 7.0 ausgeführt wird, müssen Sie die OSS Redis-Version Ihres Clusters aktualisieren. Weitere Informationen zu Versionsunterschieden finden Sie unter Wesentliche Unterschiede im Versionsverhalten und in der Kompatibilität mit Redis OSS.
Themen
- Definieren Sie die Zeichenkettenkonstanten, mit denen der ElastiCache Valkey- oder Redis-Cluster gestartet wird OSS
- Definieren der Klassen für die Clusterkonfiguration
- Definieren einer Klasse, die den eigentlichen Cluster darstellt
- (Optional) Erstellen Sie eine Wrapper-Klasse, um die Client-Verbindung zum Valkey- oder Redis-Cluster zu demonstrieren OSS
- Erstellen der Hauptfunktion, die den Änderungsprozess der Konfiguration der Verschlüsselung während der Übertragung demonstriert
Definieren Sie die Zeichenkettenkonstanten, mit denen der ElastiCache Valkey- oder Redis-Cluster gestartet wird OSS
Definieren wir zunächst einige einfache Python-String-Konstanten, die die Namen der AWS Entitäten enthalten, die zur Erstellung des ElastiCache Clusters erforderlich sindsecurity-group
, wieCache Subnet group
, und a. default parameter group
All diese AWS Entitäten müssen im Voraus in Ihrem AWS Konto in der Region erstellt werden, die Sie verwenden möchten.
#Constants definitions SECURITY_GROUP = "sg-0492aa0a29c558427" CLUSTER_DESCRIPTION = "This cluster has been launched as part of the online TLS migration user guide" EC_SUBNET_GROUP = "client-testing" DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED = "default.redis7.cluster.on"
Definieren der Klassen für die Clusterkonfiguration
Lassen Sie uns nun einige einfache Python-Klassen definieren, die eine Konfiguration eines Clusters darstellen und Metadaten über den Cluster wie die Valkey- oder OSS Redis-Version, den Instanztyp und ob die Verschlüsselung bei der Übertragung (TLS) aktiviert oder deaktiviert ist, enthalten.
#Class definitions class Config: def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, ): self.instance_type = instance_type self.version = version self.multi_az = multi_az self.TLS = TLS self.name = name or f"tls-test" def create_base_launch_request(self): return { "ReplicationGroupId": self.name, "TransitEncryptionEnabled": self.TLS, "MultiAZEnabled": self.multi_az, "CacheNodeType": self.instance_type, "Engine": "redis", "EngineVersion": self.version, "CacheSubnetGroupName": EC_SUBNET_GROUP , "CacheParameterGroupName": DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED , "ReplicationGroupDescription": CLUSTER_DESCRIPTION, "SecurityGroupIds": [SECURITY_GROUP], } class ConfigCME(Config): def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, num_shards: int = 2, num_replicas_per_shard: int = 1, ): super().__init__(instance_type, version, multi_az, TLS, name) self.num_shards = num_shards self.num_replicas_per_shard = num_replicas_per_shard def create_launch_request(self) -> dict: launch_request = self.create_base_launch_request() launch_request["NumNodeGroups"] = self.num_shards launch_request["ReplicasPerNodeGroup"] = self.num_replicas_per_shard return launch_request
Definieren einer Klasse, die den eigentlichen Cluster darstellt
Lassen Sie uns nun einige einfache Python-Klassen definieren, die den ElastiCache Valkey- oder OSS Redis-Cluster selbst darstellen. Diese Klasse wird über ein Client-Feld verfügen, das einen Boto3-Client für ElastiCache Verwaltungsvorgänge wie das Erstellen des Clusters und das Abfragen des Clusters enthält. ElastiCache API
import botocore.config import boto3 # Create boto3 client def init_client(region: str = "us-east-1"): config = botocore.config.Config(retries={"max_attempts": 10, "mode": "standard"}) init_request = dict() init_request["config"] = config init_request["service_name"] = "elasticache" init_request["region_name"] = region return boto3.client(**init_request) class ElastiCacheClusterBase: def __init__(self, name: str): self.name = name self.elasticache_client = init_client() def get_first_replication_group(self): return self.elasticache_client.describe_replication_groups( ReplicationGroupId=self.name )["ReplicationGroups"][0] def get_status(self) -> str: return self.get_first_replication_group()["Status"] def get_transit_encryption_enabled(self) -> bool: return self.get_first_replication_group()["TransitEncryptionEnabled"] def is_available(self) -> bool: return self.get_status() == "available" def is_modifying(self) -> bool: return self.get_status() == "modifying" def wait_for_available(self): while True: if self.is_available(): break else: time.sleep(5) def wait_for_modifying(self): while True: if self.is_modifying(): break else: time.sleep(5) def delete_cluster(self) -> bool: self.elasticache_client.delete_replication_group( ReplicationGroupId=self.name, RetainPrimaryCluster=False ) def modify_transit_encryption_mode(self, new_transit_encryption_mode: str): # generate api call to migrate the cluster to TLS preffered or to TLS required self.elasticache_client.modify_replication_group( ReplicationGroupId=self.name, TransitEncryptionMode=new_transit_encryption_mode, TransitEncryptionEnabled=True, ApplyImmediately=True, ) self.wait_for_modifying() class ElastiCacheClusterCME(ElastiCacheClusterBase): def __init__(self, name: str): super().__init__(name) @classmethod def launch(cls, config: ConfigCME = None) -> ElastiCacheClusterCME: config = config or ConfigCME() print(config) new_cluster = ElastiCacheClusterCME(config.name) launch_request = config.create_launch_request() new_cluster.elasticache_client.create_replication_group(**launch_request) new_cluster.wait_for_available() return new_cluster def get_configuration_endpoint(self) -> str: return self.get_first_replication_group()["ConfigurationEndpoint"]["Address"] #Since the code can throw exceptions, we define this class to make the code more readable and #so we won't forget to delete the cluster class ElastiCacheCMEManager: def __init__(self, config: ConfigCME = None): self.config = config or ConfigCME() def __enter__(self) -> ElastiCacheClusterCME: self.cluster = ElastiCacheClusterCME.launch(self.config) return self.cluster def __exit__(self, exc_type, exc_val, exc_tb): self.cluster.delete_cluster()
(Optional) Erstellen Sie eine Wrapper-Klasse, um die Client-Verbindung zum Valkey- oder Redis-Cluster zu demonstrieren OSS
Jetzt erstellen wir eine Wrapper-Klasse für den redis-py-cluster
-Client. Diese Wrapper-Klasse unterstützt das Vorbefüllen des Clusters mit einigen Schlüsseln und das anschließende Ausführen zufälliger wiederholter get
-Befehle.
Anmerkung
Dies ist ein optionaler Schritt, der jedoch den Code der Hauptfunktion vereinfacht, die in einem späteren Schritt erstellt wird.
import redis improt random from time import perf_counter_ns, time class DowntimeTestClient: def __init__(self, client): self.client = client # num of keys prefilled self.prefilled = 0 # percent of get above prefilled self.percent_get_above_prefilled = 10 # nil result expected when get hit above prefilled # total downtime in nano seconds self.downtime_ns = 0 # num of success and fail operations self.success_ops = 0 self.fail_ops = 0 self.connection_errors = 0 self.timeout_errors = 0 def replace_client(self, client): self.client = client def prefill_data(self, timelimit_sec=60): end_time = time() + timelimit_sec while time() < end_time: self.client.set(self.prefilled, self.prefilled) self.prefilled += 1 # unsuccesful operations throw exceptions def _exec(self, func): try: start_ns = perf_counter_ns() func() self.success_ops += 1 elapsed_ms = (perf_counter_ns() - start_ns) // 10 ** 6 # upon succesful execution of func # reset random_key to None so that the next command # will use a new random key self.random_key = None except Exception as e: elapsed_ns = perf_counter_ns() - start_ns self.downtime_ns += elapsed_ns # in case of failure- increment the relevant counters so that we will keep track # of how many connection issues we had while trying to communicate with # the cluster. self.fail_ops += 1 if e.__class__ is redis.exceptions.ConnectionError: self.connection_errors += 1 if e.__class__ is redis.exceptions.TimeoutError: self.timeout_errors += 1 def _repeat_exec(self, func, seconds): end_time = time() + seconds while time() < end_time: self._exec(func) def _new_random_key_if_needed(self, percent_above_prefilled): if self.random_key is None: max = int((self.prefilled * (100 + percent_above_prefilled)) / 100) return random.randint(0, max) return self.random_key def _random_get(self): key = self._new_random_key_if_needed(self.percent_get_above_prefilled) result = self.client.get(key) # we know the key was set for sure only in the case key < self.prefilled if key < self.prefilled: assert result.decode("UTF-8") == str(key) def repeat_get(self, seconds=60): self._repeat_exec(self._random_get, seconds) def get_downtime_ms(self) -> int: return self.downtime_ns // 10 ** 6 def do_get_until(self, cond_check): while not cond_check(): self.repeat_get() # do one more get cycle once condition is met self.repeat_get()
Erstellen der Hauptfunktion, die den Änderungsprozess der Konfiguration der Verschlüsselung während der Übertragung demonstriert
Definieren wir nun die Hauptfunktion wie folgt:
-
Erstellen Sie den Cluster mit dem Boto3-Client. ElastiCache
-
Initialisieren Sie den
redis-py-cluster
Client, der sich mit dem Cluster verbindet, mit einer klaren TCP Verbindung ohne. TLS -
Der
redis-py-cluster
-Client füllt den Cluster vorab mit einigen Daten. -
Der boto3-Client löst die TLS Migration von „Nein“ zu „Bevorzugt“ aus. TLS TLS
-
Während der Migration auf den Cluster sendet der
redis-py-cluster
TCP Client wiederholteget
Operationen an den Cluster TLSPreferred
, bis die Migration abgeschlossen ist. -
Nach Abschluss der Migration auf TLS
Preferred
stellen wir sicher, dass der Cluster die Verschlüsselung bei der Übertragung unterstützt. Anschließend erstellen wir einenredis-py-cluster
Client, mit TLS dem eine Verbindung zum Cluster hergestellt wird. -
Wir werden einige
get
Befehle mit dem neuen TLS Client und dem alten TCP Client senden. -
Der boto3-Client löst die TLS Migration von „erforderlich“ TLS
Preferred
nach TLS „erforderlich“ aus. -
Während der Migration des Clusters auf TLS erforderlich sendet der redis-py-cluster TLS Client wiederholte
get
Operationen an den Cluster, bis die Migration abgeschlossen ist.
import redis def init_cluster_client( cluster: ElastiCacheClusterCME, prefill_data: bool, TLS: bool = True) -> DowntimeTestClient: # we must use for the host name the cluster configuration endpoint. redis_client = redis.RedisCluster( host=cluster.get_configuration_endpoint(), ssl=TLS, socket_timeout=0.25, socket_connect_timeout=0.1 ) test_client = DowntimeTestClient(redis_client) if prefill_data: test_client.prefill_data() return test_client if __name__ == '__main__': config = ConfigCME(TLS=False, instance_type="cache.m5.large") with ElastiCacheCMEManager(config) as cluster: # create a client that will connect to the cluster with clear tcp connection test_client_tcp = init_cluster_client(cluster, prefill_data=True, TLS=False) # migrate the cluster to TLS Preferred cluster.modify_transit_encryption_mode(new_transit_encryption_mode="preferred") # do repeated get commands until the cluster finishes the migration to TLS Preferred test_client_tcp.do_get_until(cluster.is_available) # verify that in transit encryption is enabled so that clients will be able to connect to the cluster with TLS assert cluster.get_transit_encryption_enabled() == True # create a client that will connect to the cluster with TLS connection. # we must first make sure that the cluster indeed supports TLS test_client_tls = init_cluster_client(cluster, prefill_data=True, TLS=True) # by doing get commands with the tcp client for 60 more seconds # we can verify that the existing tcp connection to the cluster still works test_client_tcp.repeat_get(seconds=60) # do get commands with the new TLS client for 60 more seconds test_client_tcp.repeat_get(seconds=60) # migrate the cluster to TLS required cluster.modify_transit_encryption_mode(new_transit_encryption_mode="required") # from this point the tcp clients will be disconnected and we must not use them anymore. # do get commands with the TLS client until the cluster finishes migartion to TLS required mode. test_client_tls.do_get_until(cluster.is_available)