

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Activación del cifrado en tránsito en un clúster de Redis OSS basado en nodos con Python
<a name="in-transit-encryption-enable-python"></a>

La siguiente guía mostrará cómo habilitar el cifrado en tránsito en un clúster de Redis OSS 7.0 que se haya creado originalmente con el cifrado en tránsito deshabilitado. Los clientes de TCP y TLS seguirán comunicándose con el clúster durante este proceso sin tiempo de inactividad.

Boto3 obtendrá las credenciales que necesita (`aws_access_key_id`, `aws_secret_access_key` y `aws_session_token`) de las variables de entorno. Esas credenciales se pegarán previamente en el mismo terminal bash donde ejecutaremos `python3` para procesar el código Python que se muestra en esta guía. El código del ejemplo siguiente se procesó a partir de una EC2 instancia que se lanzó en la misma VPC que se utilizará para crear el clúster ElastiCache OSS de Redis en ella.

**nota**  
En los siguientes ejemplos, se utiliza el SDK boto3 para las operaciones de ElastiCache administración (creación de clústeres o usuarios) y redis-py-cluster redis-py/ para la gestión de datos.
Debe utilizar al menos la versión 1.26.39 de boto3 (=\$1) para utilizar la migración de TLS en línea con la API de modificación del clúster.
ElastiCache admite la migración de TLS en línea solo para clústeres con Valkey versión 7.2 o superior o Redis OSS versión 7.0 o superior. Por lo tanto, si tiene un clúster que ejecuta una versión de Redis OSS anterior a la 7.0, deberá actualizar la versión de Redis OSS del clúster. Para obtener más información sobre las diferencias de las versiones, consulte [Diferencias de compatibilidad y comportamiento de las versiones principales del motor con Redis OSS](VersionManagementConsiderations.md).

**Topics**
+ [Defina las constantes de cadena que lanzarán el clúster OSS de ElastiCache Valkey o Redis](#enable-python-define-constants)
+ [Definir las clases para la configuración del clúster](#enable-python-define-classes)
+ [Definir una clase que representará el propio clúster](#enable-python-define-classes-cluster)
+ [(Opcional) Creación de una clase contenedora para demostrar la conexión del cliente al clúster de Valkey o Redis OSS](#enable-python-create-wrapper)
+ [Cree la función principal que muestre el proceso de cambio de la configuración de cifrado en tránsito](#enable-python-main-function)

## Defina las constantes de cadena que lanzarán el clúster OSS de ElastiCache Valkey o Redis
<a name="enable-python-define-constants"></a>

Primero, definamos algunas constantes de cadena de Python simples que contendrán los nombres de las AWS entidades necesarias para crear el ElastiCache clúster`security-group`, como`Cache Subnet group`, y a`default parameter group`. Todas estas AWS entidades deben crearse con antelación en su AWS cuenta en la región que desee utilizar.

```
#Constants definitions 
SECURITY_GROUP = "sg-0492aa0a29c558427"
CLUSTER_DESCRIPTION = "This cluster has been launched as part of the online TLS migration user guide"
EC_SUBNET_GROUP = "client-testing"
DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED = "default.redis7.cluster.on"
```

## Definir las clases para la configuración del clúster
<a name="enable-python-define-classes"></a>

Ahora, vamos a definir algunas clases sencillas de Python que representarán una configuración de un clúster, que contendrá metadatos sobre el clúster, como la versión de Valkey o Redis OSS, el tipo de instancia y si el cifrado en tránsito (TLS) está habilitado o deshabilitado.

```
#Class definitions

class Config:
    def __init__(
        self,
        instance_type: str = "cache.t4g.small",
        version: str = "7.0",
        multi_az: bool = True,
        TLS: bool = True,
        name: str = None,
    ):
        self.instance_type = instance_type
        self.version = version
        self.multi_az = multi_az
        self.TLS = TLS
        self.name = name or f"tls-test"

    def create_base_launch_request(self):
        return {
            "ReplicationGroupId": self.name,
            "TransitEncryptionEnabled": self.TLS,
            "MultiAZEnabled": self.multi_az,
            "CacheNodeType": self.instance_type,
            "Engine": "redis",
            "EngineVersion": self.version,
            "CacheSubnetGroupName": EC_SUBNET_GROUP ,
            "CacheParameterGroupName": DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED ,
            "ReplicationGroupDescription": CLUSTER_DESCRIPTION,
            "SecurityGroupIds": [SECURITY_GROUP],
        }
        
class ConfigCME(Config):
    def __init__(
        self,
        instance_type: str = "cache.t4g.small",
        version: str = "7.0",
        multi_az: bool = True,
        TLS: bool = True,
        name: str = None,
        num_shards: int = 2,
        num_replicas_per_shard: int = 1,
    ):
        super().__init__(instance_type, version, multi_az, TLS, name)
        self.num_shards = num_shards
        self.num_replicas_per_shard = num_replicas_per_shard

    def create_launch_request(self) -> dict:
        launch_request = self.create_base_launch_request()
        launch_request["NumNodeGroups"] = self.num_shards
        launch_request["ReplicasPerNodeGroup"] = self.num_replicas_per_shard
        return launch_request
```

## Definir una clase que representará el propio clúster
<a name="enable-python-define-classes-cluster"></a>

Ahora, definamos algunas clases sencillas de Python que representarán el propio clúster OSS de ElastiCache Valkey o Redis. Esta clase tendrá un campo de cliente que contendrá un cliente boto3 para las operaciones de ElastiCache administración, como la creación del clúster y la consulta de la API. ElastiCache

```
import botocore.config
import boto3

# Create boto3 client
def init_client(region: str = "us-east-1"):
    config = botocore.config.Config(retries={"max_attempts": 10, "mode": "standard"})
    init_request = dict()
    init_request["config"] = config
    init_request["service_name"] = "elasticache"
    init_request["region_name"] = region
    return boto3.client(**init_request) 
 
 
class ElastiCacheClusterBase:
    def __init__(self, name: str):
        self.name = name
        self.elasticache_client = init_client()

    def get_first_replication_group(self):
        return self.elasticache_client.describe_replication_groups(
        ReplicationGroupId=self.name
        )["ReplicationGroups"][0]
 
    def get_status(self) -> str:
        return self.get_first_replication_group()["Status"]
 
    def get_transit_encryption_enabled(self) -> bool:
        return self.get_first_replication_group()["TransitEncryptionEnabled"]
 
    def is_available(self) -> bool:
        return self.get_status() == "available"
        
    def is_modifying(self) -> bool:
        return self.get_status() == "modifying"
        
    def wait_for_available(self):
        while True:
            if self.is_available():
                break
            else:
                time.sleep(5)

    def wait_for_modifying(self):
        while True:
            if self.is_modifying():
                break
            else:
                time.sleep(5)
                
    def delete_cluster(self) -> bool:
        self.elasticache_client.delete_replication_group(
            ReplicationGroupId=self.name, RetainPrimaryCluster=False
        )
        
    def modify_transit_encryption_mode(self, new_transit_encryption_mode: str):
        # generate api call to migrate the cluster to TLS preffered or to TLS required
            self.elasticache_client.modify_replication_group(
                ReplicationGroupId=self.name,
                TransitEncryptionMode=new_transit_encryption_mode,
                TransitEncryptionEnabled=True,
                ApplyImmediately=True,
            )  
        self.wait_for_modifying()
              
 class ElastiCacheClusterCME(ElastiCacheClusterBase):
    def __init__(self, name: str):
        super().__init__(name)

    @classmethod
    def launch(cls, config: ConfigCME = None) -> ElastiCacheClusterCME:
        config = config or ConfigCME()
        print(config)
        new_cluster = ElastiCacheClusterCME(config.name)
        launch_request = config.create_launch_request()
        new_cluster.elasticache_client.create_replication_group(**launch_request)
        new_cluster.wait_for_available()
        return new_cluster

    def get_configuration_endpoint(self) -> str:
        return self.get_first_replication_group()["ConfigurationEndpoint"]["Address"]
     
#Since the code can throw exceptions, we define this class to make the code more readable and 
#so we won't forget to delete the cluster    
class ElastiCacheCMEManager:
    def __init__(self, config: ConfigCME = None):
        self.config = config or ConfigCME()

    def __enter__(self) -> ElastiCacheClusterCME:
        self.cluster = ElastiCacheClusterCME.launch(self.config)
        return self.cluster 
          
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cluster.delete_cluster()
```

## (Opcional) Creación de una clase contenedora para demostrar la conexión del cliente al clúster de Valkey o Redis OSS
<a name="enable-python-create-wrapper"></a>

Ahora, vamos a crear una clase contenedora para el cliente de `redis-py-cluster`. Esta clase contenedora permitirá rellenar previamente el clúster con algunas claves y luego ejecutar comandos `get` repetidos aleatorios.

**nota**  
Este es un paso opcional, pero simplifica el código de la función principal que viene en un paso posterior.

```
import redis
improt random
from time import perf_counter_ns, time


class DowntimeTestClient:
    def __init__(self, client):
        self.client = client

        # num of keys prefilled
        self.prefilled = 0
        # percent of get above prefilled
        self.percent_get_above_prefilled = 10 # nil result expected when get hit above prefilled
        # total downtime in nano seconds 
        self.downtime_ns = 0
        # num of success and fail operations
        self.success_ops = 0
        self.fail_ops = 0
        self.connection_errors = 0
        self.timeout_errors = 0
        

    def replace_client(self, client):
        self.client = client

    def prefill_data(self, timelimit_sec=60):
        end_time = time() + timelimit_sec
        while time() < end_time:
            self.client.set(self.prefilled, self.prefilled)
            self.prefilled += 1

    # unsuccesful operations throw exceptions
    def _exec(self, func):
        try:
            start_ns = perf_counter_ns()
            func()
            self.success_ops += 1
            elapsed_ms = (perf_counter_ns() - start_ns) // 10 ** 6
            # upon succesful execution of func
            # reset random_key to None so that the next command
            # will use a new random key
            self.random_key = None

        except Exception as e:
            elapsed_ns = perf_counter_ns() - start_ns
            self.downtime_ns += elapsed_ns
            # in case of failure- increment the relevant counters so that we will keep track 
            # of how many connection issues we had while trying to communicate with
            # the cluster.
            self.fail_ops += 1
            if e.__class__ is redis.exceptions.ConnectionError:
                self.connection_errors += 1
            if e.__class__ is redis.exceptions.TimeoutError:
                self.timeout_errors += 1

    def _repeat_exec(self, func, seconds):
        end_time = time() + seconds
        while time() < end_time:
            self._exec(func)

    def _new_random_key_if_needed(self, percent_above_prefilled):
        if self.random_key is None:
            max = int((self.prefilled * (100 + percent_above_prefilled)) / 100)
            return random.randint(0, max)
        return self.random_key

    def _random_get(self):
        key = self._new_random_key_if_needed(self.percent_get_above_prefilled)
        result = self.client.get(key)
        # we know the key was set for sure only in the case key < self.prefilled
        if key < self.prefilled:
            assert result.decode("UTF-8") == str(key)


    def repeat_get(self, seconds=60):
        self._repeat_exec(self._random_get, seconds)

    def get_downtime_ms(self) -> int:
        return self.downtime_ns // 10 ** 6


    def do_get_until(self, cond_check):
        while not cond_check():
            self.repeat_get()
        # do one more get cycle once condition is met
        self.repeat_get()
```

## Cree la función principal que muestre el proceso de cambio de la configuración de cifrado en tránsito
<a name="enable-python-main-function"></a>

Ahora, definamos la función principal, que hará lo siguiente:

1. Cree el clúster con el cliente boto3. ElastiCache 

1. Inicialice el cliente de `redis-py-cluster` que se conectará al clúster con una conexión TCP clara sin TLS.

1. El cliente de `redis-py-cluster` rellena previamente el clúster con algunos datos. 

1. El cliente de boto3 activará la migración de TLS de sin TLS a TLS preferido.

1. Mientras se migra el clúster a TLS `Preferred`, el cliente de TCP de `redis-py-cluster` enviará operaciones `get` repetidas al clúster hasta que finalice la migración.

1. Una vez finalizada la migración a TLS `Preferred`, confirmaremos que el clúster admite el cifrado en tránsito. Después, crearemos un cliente de `redis-py-cluster` que se conectará al clúster con TLS.

1. Enviaremos algunos comandos `get` utilizando el nuevo cliente de TLS y el antiguo cliente TCP.

1. El cliente de boto3 activará la migración de TLS de TLS `Preferred` a TLS requerido.

1. Mientras se migra el clúster a TLS (es necesario), el cliente redis-py-cluster TLS enviará `get` operaciones repetidas al clúster hasta que finalice la migración.

```
import redis

def init_cluster_client(
    cluster: ElastiCacheClusterCME, prefill_data: bool, TLS: bool = True) -> DowntimeTestClient:
    # we must use for the host name the cluster configuration endpoint. 
    redis_client = redis.RedisCluster(
        host=cluster.get_configuration_endpoint(), ssl=TLS, socket_timeout=0.25, socket_connect_timeout=0.1
    )
    test_client = DowntimeTestClient(redis_client)
    if prefill_data:
        test_client.prefill_data()
    return test_client

if __name__ == '__main__':
    config = ConfigCME(TLS=False, instance_type="cache.m5.large")

    with ElastiCacheCMEManager(config) as cluster:
        # create a client that will connect to the cluster with clear tcp connection
        test_client_tcp = init_cluster_client(cluster, prefill_data=True, TLS=False)
        
       # migrate the cluster to TLS Preferred
        cluster.modify_transit_encryption_mode(new_transit_encryption_mode="preferred")
        
        # do repeated get commands until the cluster finishes the migration to TLS Preferred
        test_client_tcp.do_get_until(cluster.is_available)
        
       # verify that in transit encryption is enabled so that clients will be able to connect to the cluster with TLS
        assert cluster.get_transit_encryption_enabled() == True
        
       # create a client that will connect to the cluster with TLS connection. 
        # we must first make sure that the cluster indeed supports TLS
        test_client_tls = init_cluster_client(cluster, prefill_data=True, TLS=True)
        
        # by doing get commands with the tcp client for 60 more seconds
       # we can verify that the existing tcp connection to the cluster still works 
        test_client_tcp.repeat_get(seconds=60)
        
        # do get commands with the new TLS client for 60 more seconds
        test_client_tcp.repeat_get(seconds=60)
        
       # migrate the cluster to TLS required
        cluster.modify_transit_encryption_mode(new_transit_encryption_mode="required")
        
       # from this point the tcp clients will be disconnected and we must not use them anymore.
       # do get commands with the TLS client until the cluster finishes migartion to TLS required mode.
        test_client_tls.do_get_until(cluster.is_available)
```