Habilitando a criptografia em trânsito em um cluster Redis OSS autoprojetado usando Python - Amazon ElastiCache

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Habilitando a criptografia em trânsito em um cluster Redis OSS autoprojetado usando Python

O guia a seguir demonstrará como habilitar a criptografia em trânsito em um cluster Redis OSS 7.0 que foi originalmente criado com a criptografia em trânsito desativada. TCPe TLS os clientes continuarão se comunicando com o cluster durante esse processo sem tempo de inatividade.

O Boto3 obterá as credenciais necessárias (aws_access_key_idaws_secret_access_key e aws_session_token) das variáveis de ambiente. Essas credenciais serão coladas com antecedência no mesmo terminal bash em que executaremos o python3 para processar o código Python mostrado nesse guia. O código no exemplo abaixo foi processado a partir de uma EC2 instância que foi executada na mesma VPC que será usada para criar o ElastiCache Redis OSS Cluster nela.

nota
  • Os exemplos a seguir usam o boto3 SDK para operações de ElastiCache gerenciamento (criação de cluster ou usuário) e o redis-py-cluster redis-py/ para tratamento de dados.

  • Você deve usar pelo menos a versão boto3 (=~) 1.26.39 para usar a TLS migração on-line com a modificação do cluster. API

  • ElastiCache oferece suporte à TLS migração on-line somente para clusters com Valkey versão 7.2 e superior ou Redis OSS versão 7.0 ou superior. Portanto, se você tiver um cluster executando uma OSS versão do Redis anterior à 7.0, precisará atualizar a OSS versão Redis do seu cluster. Para obter mais informações sobre as diferenças entre as versões, consulte Principais diferenças de comportamento e compatibilidade da versão com o Redis OSS.

Defina as constantes de string que iniciarão o ElastiCache Valkey ou o Redis Cluster OSS

Primeiro, vamos definir algumas constantes de string simples do Python que conterão os nomes das AWS entidades necessárias para criar o ElastiCache clustersecurity-group, comoCache Subnet group, e a. default parameter group Todas essas AWS entidades devem ser criadas com antecedência em sua AWS conta na região que você deseja usar.

#Constants definitions SECURITY_GROUP = "sg-0492aa0a29c558427" CLUSTER_DESCRIPTION = "This cluster has been launched as part of the online TLS migration user guide" EC_SUBNET_GROUP = "client-testing" DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED = "default.redis7.cluster.on"

Defina as classes para a configuração do cluster

Agora, vamos definir algumas classes simples de Python que representarão uma configuração de um cluster, que conterá metadados sobre o cluster, como a OSS versão Valkey ou Redis, o tipo de instância e se a criptografia em trânsito () TLS está ativada ou desativada.

#Class definitions class Config: def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, ): self.instance_type = instance_type self.version = version self.multi_az = multi_az self.TLS = TLS self.name = name or f"tls-test" def create_base_launch_request(self): return { "ReplicationGroupId": self.name, "TransitEncryptionEnabled": self.TLS, "MultiAZEnabled": self.multi_az, "CacheNodeType": self.instance_type, "Engine": "redis", "EngineVersion": self.version, "CacheSubnetGroupName": EC_SUBNET_GROUP , "CacheParameterGroupName": DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED , "ReplicationGroupDescription": CLUSTER_DESCRIPTION, "SecurityGroupIds": [SECURITY_GROUP], } class ConfigCME(Config): def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, num_shards: int = 2, num_replicas_per_shard: int = 1, ): super().__init__(instance_type, version, multi_az, TLS, name) self.num_shards = num_shards self.num_replicas_per_shard = num_replicas_per_shard def create_launch_request(self) -> dict: launch_request = self.create_base_launch_request() launch_request["NumNodeGroups"] = self.num_shards launch_request["ReplicasPerNodeGroup"] = self.num_replicas_per_shard return launch_request

Defina uma classe que representará o próprio cluster

Agora, vamos definir algumas classes simples de Python que representarão o ElastiCache Valkey ou o próprio cluster RedisOSS. Essa classe terá um campo de cliente que conterá um cliente boto3 para operações ElastiCache de gerenciamento, como criar o cluster e consultar o. ElastiCache API

import botocore.config import boto3 # Create boto3 client def init_client(region: str = "us-east-1"): config = botocore.config.Config(retries={"max_attempts": 10, "mode": "standard"}) init_request = dict() init_request["config"] = config init_request["service_name"] = "elasticache" init_request["region_name"] = region return boto3.client(**init_request) class ElastiCacheClusterBase: def __init__(self, name: str): self.name = name self.elasticache_client = init_client() def get_first_replication_group(self): return self.elasticache_client.describe_replication_groups( ReplicationGroupId=self.name )["ReplicationGroups"][0] def get_status(self) -> str: return self.get_first_replication_group()["Status"] def get_transit_encryption_enabled(self) -> bool: return self.get_first_replication_group()["TransitEncryptionEnabled"] def is_available(self) -> bool: return self.get_status() == "available" def is_modifying(self) -> bool: return self.get_status() == "modifying" def wait_for_available(self): while True: if self.is_available(): break else: time.sleep(5) def wait_for_modifying(self): while True: if self.is_modifying(): break else: time.sleep(5) def delete_cluster(self) -> bool: self.elasticache_client.delete_replication_group( ReplicationGroupId=self.name, RetainPrimaryCluster=False ) def modify_transit_encryption_mode(self, new_transit_encryption_mode: str): # generate api call to migrate the cluster to TLS preffered or to TLS required self.elasticache_client.modify_replication_group( ReplicationGroupId=self.name, TransitEncryptionMode=new_transit_encryption_mode, TransitEncryptionEnabled=True, ApplyImmediately=True, ) self.wait_for_modifying() class ElastiCacheClusterCME(ElastiCacheClusterBase): def __init__(self, name: str): super().__init__(name) @classmethod def launch(cls, config: ConfigCME = None) -> ElastiCacheClusterCME: config = config or ConfigCME() print(config) new_cluster = ElastiCacheClusterCME(config.name) launch_request = config.create_launch_request() new_cluster.elasticache_client.create_replication_group(**launch_request) new_cluster.wait_for_available() return new_cluster def get_configuration_endpoint(self) -> str: return self.get_first_replication_group()["ConfigurationEndpoint"]["Address"] #Since the code can throw exceptions, we define this class to make the code more readable and #so we won't forget to delete the cluster class ElastiCacheCMEManager: def __init__(self, config: ConfigCME = None): self.config = config or ConfigCME() def __enter__(self) -> ElastiCacheClusterCME: self.cluster = ElastiCacheClusterCME.launch(self.config) return self.cluster def __exit__(self, exc_type, exc_val, exc_tb): self.cluster.delete_cluster()

(Opcional) Crie uma classe wrapper para demonstrar a conexão do cliente com o cluster Valkey ou Redis OSS

Agora, vamos criar uma classe de wrapper para o cliente redis-py-cluster. Essa classe de wrapper será compatível com o pré-preenchimento do cluster com algumas chaves e, em seguida, com a execução de comandos get aleatórios repetidos.

nota

Essa é uma etapa opcional, mas simplifica o código da função principal que vem em uma etapa posterior.

import redis improt random from time import perf_counter_ns, time class DowntimeTestClient: def __init__(self, client): self.client = client # num of keys prefilled self.prefilled = 0 # percent of get above prefilled self.percent_get_above_prefilled = 10 # nil result expected when get hit above prefilled # total downtime in nano seconds self.downtime_ns = 0 # num of success and fail operations self.success_ops = 0 self.fail_ops = 0 self.connection_errors = 0 self.timeout_errors = 0 def replace_client(self, client): self.client = client def prefill_data(self, timelimit_sec=60): end_time = time() + timelimit_sec while time() < end_time: self.client.set(self.prefilled, self.prefilled) self.prefilled += 1 # unsuccesful operations throw exceptions def _exec(self, func): try: start_ns = perf_counter_ns() func() self.success_ops += 1 elapsed_ms = (perf_counter_ns() - start_ns) // 10 ** 6 # upon succesful execution of func # reset random_key to None so that the next command # will use a new random key self.random_key = None except Exception as e: elapsed_ns = perf_counter_ns() - start_ns self.downtime_ns += elapsed_ns # in case of failure- increment the relevant counters so that we will keep track # of how many connection issues we had while trying to communicate with # the cluster. self.fail_ops += 1 if e.__class__ is redis.exceptions.ConnectionError: self.connection_errors += 1 if e.__class__ is redis.exceptions.TimeoutError: self.timeout_errors += 1 def _repeat_exec(self, func, seconds): end_time = time() + seconds while time() < end_time: self._exec(func) def _new_random_key_if_needed(self, percent_above_prefilled): if self.random_key is None: max = int((self.prefilled * (100 + percent_above_prefilled)) / 100) return random.randint(0, max) return self.random_key def _random_get(self): key = self._new_random_key_if_needed(self.percent_get_above_prefilled) result = self.client.get(key) # we know the key was set for sure only in the case key < self.prefilled if key < self.prefilled: assert result.decode("UTF-8") == str(key) def repeat_get(self, seconds=60): self._repeat_exec(self._random_get, seconds) def get_downtime_ms(self) -> int: return self.downtime_ns // 10 ** 6 def do_get_until(self, cond_check): while not cond_check(): self.repeat_get() # do one more get cycle once condition is met self.repeat_get()

Crie a função principal que demonstra o processo de alteração da configuração de criptografia em trânsito

Agora, vamos definir a função principal, que fará o seguinte:

  1. Crie o cluster usando o cliente boto3 ElastiCache .

  2. Inicialize o redis-py-cluster cliente que se conectará ao cluster com uma TCP conexão clara semTLS.

  3. O cliente redis-py-cluster preenche o cluster com alguns dados.

  4. O cliente boto3 acionará a TLS migração de não TLS para TLS preferencial.

  5. Enquanto o cluster estiver sendo migrado para TLSPreferred, o redis-py-cluster TCP cliente enviará get operações repetidas para o cluster até que a migração seja concluída.

  6. Depois que a migração para TLS Preferred for concluída, afirmaremos que o cluster oferece suporte à criptografia em trânsito. Depois, criaremos um redis-py-cluster cliente que se conectará ao cluster comTLS.

  7. Enviaremos alguns get comandos usando o novo TLS cliente e o TCP cliente antigo.

  8. O cliente boto3 acionará a TLS migração do TLS Preferred para o necessárioTLS.

  9. Enquanto o cluster está sendo migrado para o modo TLS necessário, o redis-py-cluster TLS cliente enviará get operações repetidas para o cluster até que a migração seja concluída.

import redis def init_cluster_client( cluster: ElastiCacheClusterCME, prefill_data: bool, TLS: bool = True) -> DowntimeTestClient: # we must use for the host name the cluster configuration endpoint. redis_client = redis.RedisCluster( host=cluster.get_configuration_endpoint(), ssl=TLS, socket_timeout=0.25, socket_connect_timeout=0.1 ) test_client = DowntimeTestClient(redis_client) if prefill_data: test_client.prefill_data() return test_client if __name__ == '__main__': config = ConfigCME(TLS=False, instance_type="cache.m5.large") with ElastiCacheCMEManager(config) as cluster: # create a client that will connect to the cluster with clear tcp connection test_client_tcp = init_cluster_client(cluster, prefill_data=True, TLS=False) # migrate the cluster to TLS Preferred cluster.modify_transit_encryption_mode(new_transit_encryption_mode="preferred") # do repeated get commands until the cluster finishes the migration to TLS Preferred test_client_tcp.do_get_until(cluster.is_available) # verify that in transit encryption is enabled so that clients will be able to connect to the cluster with TLS assert cluster.get_transit_encryption_enabled() == True # create a client that will connect to the cluster with TLS connection. # we must first make sure that the cluster indeed supports TLS test_client_tls = init_cluster_client(cluster, prefill_data=True, TLS=True) # by doing get commands with the tcp client for 60 more seconds # we can verify that the existing tcp connection to the cluster still works test_client_tcp.repeat_get(seconds=60) # do get commands with the new TLS client for 60 more seconds test_client_tcp.repeat_get(seconds=60) # migrate the cluster to TLS required cluster.modify_transit_encryption_mode(new_transit_encryption_mode="required") # from this point the tcp clients will be disconnected and we must not use them anymore. # do get commands with the TLS client until the cluster finishes migartion to TLS required mode. test_client_tls.do_get_until(cluster.is_available)