

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Activation du chiffrement en transit sur un cluster Redis OSS basé sur des nœuds à l'aide de Python
<a name="in-transit-encryption-enable-python"></a>

Le guide suivant explique comment activer le chiffrement en transit sur un cluster Redis OSS 7.0 créé à l'origine avec le chiffrement en transit désactivé. Les clients TCP et TLS continuent de communiquer avec le cluster pendant ce processus sans interruption de service.

Boto3 obtient les informations d'identification nécessaires (`aws_access_key_id`, `aws_secret_access_key` et `aws_session_token`) à partir des variables d'environnement. Ces informations d'identification sont collées à l'avance dans le même terminal bash où nous exécutons `python3` pour traiter le code Python présenté dans ce guide. Le code de l'exemple ci-dessous a été traité à partir d'une EC2 instance lancée dans le même VPC qui sera utilisé pour y créer le cluster ElastiCache Redis OSS.

**Note**  
Les exemples suivants utilisent le SDK boto3 pour les opérations de ElastiCache gestion (création de clusters ou d'utilisateurs) et redis-py-cluster redis-py/ pour le traitement des données.
Vous devez utiliser au moins la version Boto3 (=\$1) 1.26.39 pour utiliser la migration TLS en ligne avec l'API de modification du cluster.
ElastiCache prend en charge la migration TLS en ligne uniquement pour les clusters dotés de Valkey version 7.2 ou supérieure ou de Redis OSS version 7.0 ou supérieure. Ainsi, si vous avez un cluster exécutant une version Redis OSS antérieure à 7.0, vous devrez mettre à niveau la version Redis OSS de votre cluster. Pour plus d'informations sur ces différences de version, consultez [Principales différences de comportement entre les versions du moteur et de compatibilité avec Redis OSS](VersionManagementConsiderations.md).

**Topics**
+ [Définissez les constantes de chaîne qui lanceront le cluster ElastiCache Valkey ou Redis OSS](#enable-python-define-constants)
+ [Définir les classes pour la configuration du cluster](#enable-python-define-classes)
+ [Définir une classe qui représente le cluster lui-même](#enable-python-define-classes-cluster)
+ [(Facultatif) Créez une classe wrapper pour démontrer la connexion du client au cluster Valkey ou Redis OSS](#enable-python-create-wrapper)
+ [Créer la fonction principale qui montre le processus de modification de la configuration du chiffrement en transit](#enable-python-main-function)

## Définissez les constantes de chaîne qui lanceront le cluster ElastiCache Valkey ou Redis OSS
<a name="enable-python-define-constants"></a>

Définissons d'abord quelques constantes de chaîne Python simples qui contiendront les noms des AWS entités requises pour créer le ElastiCache cluster`security-group`, telles que`Cache Subnet group`, et `default parameter group` a. Toutes ces AWS entités doivent être créées à l'avance sur votre AWS compte dans la région que vous souhaitez utiliser.

```
#Constants definitions 
SECURITY_GROUP = "sg-0492aa0a29c558427"
CLUSTER_DESCRIPTION = "This cluster has been launched as part of the online TLS migration user guide"
EC_SUBNET_GROUP = "client-testing"
DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED = "default.redis7.cluster.on"
```

## Définir les classes pour la configuration du cluster
<a name="enable-python-define-classes"></a>

Définissons maintenant quelques classes Python simples qui représenteront la configuration d'un cluster, qui contiendra des métadonnées sur le cluster, telles que la version de Valkey ou Redis OSS, le type d'instance et si le chiffrement en transit (TLS) est activé ou désactivé.

```
#Class definitions

class Config:
    def __init__(
        self,
        instance_type: str = "cache.t4g.small",
        version: str = "7.0",
        multi_az: bool = True,
        TLS: bool = True,
        name: str = None,
    ):
        self.instance_type = instance_type
        self.version = version
        self.multi_az = multi_az
        self.TLS = TLS
        self.name = name or f"tls-test"

    def create_base_launch_request(self):
        return {
            "ReplicationGroupId": self.name,
            "TransitEncryptionEnabled": self.TLS,
            "MultiAZEnabled": self.multi_az,
            "CacheNodeType": self.instance_type,
            "Engine": "redis",
            "EngineVersion": self.version,
            "CacheSubnetGroupName": EC_SUBNET_GROUP ,
            "CacheParameterGroupName": DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED ,
            "ReplicationGroupDescription": CLUSTER_DESCRIPTION,
            "SecurityGroupIds": [SECURITY_GROUP],
        }
        
class ConfigCME(Config):
    def __init__(
        self,
        instance_type: str = "cache.t4g.small",
        version: str = "7.0",
        multi_az: bool = True,
        TLS: bool = True,
        name: str = None,
        num_shards: int = 2,
        num_replicas_per_shard: int = 1,
    ):
        super().__init__(instance_type, version, multi_az, TLS, name)
        self.num_shards = num_shards
        self.num_replicas_per_shard = num_replicas_per_shard

    def create_launch_request(self) -> dict:
        launch_request = self.create_base_launch_request()
        launch_request["NumNodeGroups"] = self.num_shards
        launch_request["ReplicasPerNodeGroup"] = self.num_replicas_per_shard
        return launch_request
```

## Définir une classe qui représente le cluster lui-même
<a name="enable-python-define-classes-cluster"></a>

Définissons maintenant quelques classes Python simples qui représenteront le cluster ElastiCache Valkey ou Redis OSS lui-même. Cette classe aura un champ client qui contiendra un client boto3 pour les opérations de ElastiCache gestion telles que la création du cluster et l'interrogation de l'API. ElastiCache

```
import botocore.config
import boto3

# Create boto3 client
def init_client(region: str = "us-east-1"):
    config = botocore.config.Config(retries={"max_attempts": 10, "mode": "standard"})
    init_request = dict()
    init_request["config"] = config
    init_request["service_name"] = "elasticache"
    init_request["region_name"] = region
    return boto3.client(**init_request) 
 
 
class ElastiCacheClusterBase:
    def __init__(self, name: str):
        self.name = name
        self.elasticache_client = init_client()

    def get_first_replication_group(self):
        return self.elasticache_client.describe_replication_groups(
        ReplicationGroupId=self.name
        )["ReplicationGroups"][0]
 
    def get_status(self) -> str:
        return self.get_first_replication_group()["Status"]
 
    def get_transit_encryption_enabled(self) -> bool:
        return self.get_first_replication_group()["TransitEncryptionEnabled"]
 
    def is_available(self) -> bool:
        return self.get_status() == "available"
        
    def is_modifying(self) -> bool:
        return self.get_status() == "modifying"
        
    def wait_for_available(self):
        while True:
            if self.is_available():
                break
            else:
                time.sleep(5)

    def wait_for_modifying(self):
        while True:
            if self.is_modifying():
                break
            else:
                time.sleep(5)
                
    def delete_cluster(self) -> bool:
        self.elasticache_client.delete_replication_group(
            ReplicationGroupId=self.name, RetainPrimaryCluster=False
        )
        
    def modify_transit_encryption_mode(self, new_transit_encryption_mode: str):
        # generate api call to migrate the cluster to TLS preffered or to TLS required
            self.elasticache_client.modify_replication_group(
                ReplicationGroupId=self.name,
                TransitEncryptionMode=new_transit_encryption_mode,
                TransitEncryptionEnabled=True,
                ApplyImmediately=True,
            )  
        self.wait_for_modifying()
              
 class ElastiCacheClusterCME(ElastiCacheClusterBase):
    def __init__(self, name: str):
        super().__init__(name)

    @classmethod
    def launch(cls, config: ConfigCME = None) -> ElastiCacheClusterCME:
        config = config or ConfigCME()
        print(config)
        new_cluster = ElastiCacheClusterCME(config.name)
        launch_request = config.create_launch_request()
        new_cluster.elasticache_client.create_replication_group(**launch_request)
        new_cluster.wait_for_available()
        return new_cluster

    def get_configuration_endpoint(self) -> str:
        return self.get_first_replication_group()["ConfigurationEndpoint"]["Address"]
     
#Since the code can throw exceptions, we define this class to make the code more readable and 
#so we won't forget to delete the cluster    
class ElastiCacheCMEManager:
    def __init__(self, config: ConfigCME = None):
        self.config = config or ConfigCME()

    def __enter__(self) -> ElastiCacheClusterCME:
        self.cluster = ElastiCacheClusterCME.launch(self.config)
        return self.cluster 
          
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cluster.delete_cluster()
```

## (Facultatif) Créez une classe wrapper pour démontrer la connexion du client au cluster Valkey ou Redis OSS
<a name="enable-python-create-wrapper"></a>

Créons maintenant une classe encapsuleur pour le client `redis-py-cluster`. Cette classe encapsuleur permet de pré-remplir le cluster avec certaines clés, puis d'exécuter des commandes `get` répétées de manière aléatoire.

**Note**  
Il s'agit d'une étape facultative, mais elle simplifie le code de la fonction principale qui sera ajoutée à une étape ultérieure.

```
import redis
improt random
from time import perf_counter_ns, time


class DowntimeTestClient:
    def __init__(self, client):
        self.client = client

        # num of keys prefilled
        self.prefilled = 0
        # percent of get above prefilled
        self.percent_get_above_prefilled = 10 # nil result expected when get hit above prefilled
        # total downtime in nano seconds 
        self.downtime_ns = 0
        # num of success and fail operations
        self.success_ops = 0
        self.fail_ops = 0
        self.connection_errors = 0
        self.timeout_errors = 0
        

    def replace_client(self, client):
        self.client = client

    def prefill_data(self, timelimit_sec=60):
        end_time = time() + timelimit_sec
        while time() < end_time:
            self.client.set(self.prefilled, self.prefilled)
            self.prefilled += 1

    # unsuccesful operations throw exceptions
    def _exec(self, func):
        try:
            start_ns = perf_counter_ns()
            func()
            self.success_ops += 1
            elapsed_ms = (perf_counter_ns() - start_ns) // 10 ** 6
            # upon succesful execution of func
            # reset random_key to None so that the next command
            # will use a new random key
            self.random_key = None

        except Exception as e:
            elapsed_ns = perf_counter_ns() - start_ns
            self.downtime_ns += elapsed_ns
            # in case of failure- increment the relevant counters so that we will keep track 
            # of how many connection issues we had while trying to communicate with
            # the cluster.
            self.fail_ops += 1
            if e.__class__ is redis.exceptions.ConnectionError:
                self.connection_errors += 1
            if e.__class__ is redis.exceptions.TimeoutError:
                self.timeout_errors += 1

    def _repeat_exec(self, func, seconds):
        end_time = time() + seconds
        while time() < end_time:
            self._exec(func)

    def _new_random_key_if_needed(self, percent_above_prefilled):
        if self.random_key is None:
            max = int((self.prefilled * (100 + percent_above_prefilled)) / 100)
            return random.randint(0, max)
        return self.random_key

    def _random_get(self):
        key = self._new_random_key_if_needed(self.percent_get_above_prefilled)
        result = self.client.get(key)
        # we know the key was set for sure only in the case key < self.prefilled
        if key < self.prefilled:
            assert result.decode("UTF-8") == str(key)


    def repeat_get(self, seconds=60):
        self._repeat_exec(self._random_get, seconds)

    def get_downtime_ms(self) -> int:
        return self.downtime_ns // 10 ** 6


    def do_get_until(self, cond_check):
        while not cond_check():
            self.repeat_get()
        # do one more get cycle once condition is met
        self.repeat_get()
```

## Créer la fonction principale qui montre le processus de modification de la configuration du chiffrement en transit
<a name="enable-python-main-function"></a>

Maintenant, définissons la fonction principale, qui effectue les opérations suivantes :

1. Créez le cluster à l'aide du client boto3 ElastiCache .

1. Initialisez le client `redis-py-cluster` qui se connecte au cluster avec une connexion TCP claire sans TLS.

1. Le client `redis-py-cluster` préremplit le cluster avec certaines données. 

1. Le client Boto3 déclenche la migration TLS d'un protocole non compatible avec TLS vers un protocole TLS préféré.

1. Pendant la migration du cluster vers TLS `Preferred`, le client TCP `redis-py-cluster` envoie des opérations `get` répétées au cluster jusqu'à ce que la migration soit terminée.

1. Une fois la migration vers TLS `Preferred` terminée, nous affirmons que le cluster prend en charge le chiffrement en transit. Ensuite, nous créons un client `redis-py-cluster` qui se connecte au cluster via TLS.

1. Nous envoyons certaines commandes `get` en utilisant le nouveau client TLS et l'ancien client TCP.

1. Le client Boto3 déclenche la migration TLS d'un protocole TLS `Preferred` vers un protocole TLS obligatoire.

1. Lorsque la migration du cluster vers le protocole TLS est requise, le client redis-py-cluster TLS enverra des `get` opérations répétées au cluster jusqu'à ce que la migration soit terminée.

```
import redis

def init_cluster_client(
    cluster: ElastiCacheClusterCME, prefill_data: bool, TLS: bool = True) -> DowntimeTestClient:
    # we must use for the host name the cluster configuration endpoint. 
    redis_client = redis.RedisCluster(
        host=cluster.get_configuration_endpoint(), ssl=TLS, socket_timeout=0.25, socket_connect_timeout=0.1
    )
    test_client = DowntimeTestClient(redis_client)
    if prefill_data:
        test_client.prefill_data()
    return test_client

if __name__ == '__main__':
    config = ConfigCME(TLS=False, instance_type="cache.m5.large")

    with ElastiCacheCMEManager(config) as cluster:
        # create a client that will connect to the cluster with clear tcp connection
        test_client_tcp = init_cluster_client(cluster, prefill_data=True, TLS=False)
        
       # migrate the cluster to TLS Preferred
        cluster.modify_transit_encryption_mode(new_transit_encryption_mode="preferred")
        
        # do repeated get commands until the cluster finishes the migration to TLS Preferred
        test_client_tcp.do_get_until(cluster.is_available)
        
       # verify that in transit encryption is enabled so that clients will be able to connect to the cluster with TLS
        assert cluster.get_transit_encryption_enabled() == True
        
       # create a client that will connect to the cluster with TLS connection. 
        # we must first make sure that the cluster indeed supports TLS
        test_client_tls = init_cluster_client(cluster, prefill_data=True, TLS=True)
        
        # by doing get commands with the tcp client for 60 more seconds
       # we can verify that the existing tcp connection to the cluster still works 
        test_client_tcp.repeat_get(seconds=60)
        
        # do get commands with the new TLS client for 60 more seconds
        test_client_tcp.repeat_get(seconds=60)
        
       # migrate the cluster to TLS required
        cluster.modify_transit_encryption_mode(new_transit_encryption_mode="required")
        
       # from this point the tcp clients will be disconnected and we must not use them anymore.
       # do get commands with the TLS client until the cluster finishes migartion to TLS required mode.
        test_client_tls.do_get_until(cluster.is_available)
```