

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Mengaktifkan enkripsi dalam transit pada cluster Redis OSS berbasis node menggunakan Python
<a name="in-transit-encryption-enable-python"></a>

Panduan berikut akan menunjukkan cara mengaktifkan enkripsi dalam transit pada cluster Redis OSS 7.0 yang awalnya dibuat dengan enkripsi in-transit dinonaktifkan. Klien TCP dan TLS akan terus berkomunikasi dengan klaster selama proses ini tanpa waktu henti.

Boto3 akan mendapatkan kredensial yang dibutuhkannya (`aws_access_key_id`, `aws_secret_access_key`, dan `aws_session_token`) dari variabel lingkungan. Kredensial tersebut akan ditempelkan terlebih dahulu di terminal bash yang sama di tempat kita menjalankan `python3` untuk memproses kode Python yang ditunjukkan dalam panduan ini. Kode dalam contoh di bawah ini adalah proses dari sebuah EC2 instance yang diluncurkan di VPC yang sama yang akan digunakan untuk membuat ElastiCache Redis OSS Cluster di dalamnya.

**catatan**  
Contoh berikut menggunakan boto3 SDK untuk operasi ElastiCache manajemen (kluster atau pembuatan pengguna) dan redis-py-cluster redis-py/ untuk penanganan data.
Anda harus menggunakan setidaknya boto3 versi (=\$1) 1.26.39 untuk menggunakan migrasi TLS online dengan API perubahan klaster.
ElastiCache mendukung migrasi TLS online hanya untuk cluster dengan Valkey versi 7.2 ke atas atau Redis OSS versi 7.0 atau lebih tinggi. Jadi jika Anda memiliki cluster yang menjalankan versi Redis OSS lebih awal dari 7.0, Anda harus memutakhirkan versi Redis OSS dari cluster Anda. Untuk informasi selengkapnya tentang perbedaan versi, lihat [Perilaku versi mesin utama dan perbedaan kompatibilitas dengan Redis OSS](VersionManagementConsiderations.md).

**Topics**
+ [Tentukan konstanta string yang akan meluncurkan ElastiCache Valkey atau Redis OSS Cluster](#enable-python-define-constants)
+ [Tentukan kelas untuk konfigurasi klaster](#enable-python-define-classes)
+ [Mendefinisikan kelas yang akan merepresentasikan klaster itu sendiri](#enable-python-define-classes-cluster)
+ [(Opsional) Buat kelas pembungkus untuk demo koneksi klien ke Valkey atau Redis OSS cluster](#enable-python-create-wrapper)
+ [Membuat fungsi utama yang mendemonstrasikan proses pengubahan konfigurasi enkripsi bergerak](#enable-python-main-function)

## Tentukan konstanta string yang akan meluncurkan ElastiCache Valkey atau Redis OSS Cluster
<a name="enable-python-define-constants"></a>

Pertama, mari kita mendefinisikan beberapa konstanta string Python sederhana yang akan menampung nama-nama AWS entitas yang diperlukan untuk membuat ElastiCache cluster seperti`security-group`,`Cache Subnet group`, dan a. `default parameter group` Semua AWS entitas ini harus dibuat terlebih dahulu di AWS akun Anda di Wilayah yang ingin Anda gunakan.

```
#Constants definitions 
SECURITY_GROUP = "sg-0492aa0a29c558427"
CLUSTER_DESCRIPTION = "This cluster has been launched as part of the online TLS migration user guide"
EC_SUBNET_GROUP = "client-testing"
DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED = "default.redis7.cluster.on"
```

## Tentukan kelas untuk konfigurasi klaster
<a name="enable-python-define-classes"></a>

Sekarang, mari kita definisikan beberapa kelas Python sederhana yang akan mewakili konfigurasi cluster, yang akan menyimpan metadata tentang cluster seperti versi Valkey atau Redis OSS, jenis instance, dan apakah enkripsi in-transit (TLS) diaktifkan atau dinonaktifkan.

```
#Class definitions

class Config:
    def __init__(
        self,
        instance_type: str = "cache.t4g.small",
        version: str = "7.0",
        multi_az: bool = True,
        TLS: bool = True,
        name: str = None,
    ):
        self.instance_type = instance_type
        self.version = version
        self.multi_az = multi_az
        self.TLS = TLS
        self.name = name or f"tls-test"

    def create_base_launch_request(self):
        return {
            "ReplicationGroupId": self.name,
            "TransitEncryptionEnabled": self.TLS,
            "MultiAZEnabled": self.multi_az,
            "CacheNodeType": self.instance_type,
            "Engine": "redis",
            "EngineVersion": self.version,
            "CacheSubnetGroupName": EC_SUBNET_GROUP ,
            "CacheParameterGroupName": DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED ,
            "ReplicationGroupDescription": CLUSTER_DESCRIPTION,
            "SecurityGroupIds": [SECURITY_GROUP],
        }
        
class ConfigCME(Config):
    def __init__(
        self,
        instance_type: str = "cache.t4g.small",
        version: str = "7.0",
        multi_az: bool = True,
        TLS: bool = True,
        name: str = None,
        num_shards: int = 2,
        num_replicas_per_shard: int = 1,
    ):
        super().__init__(instance_type, version, multi_az, TLS, name)
        self.num_shards = num_shards
        self.num_replicas_per_shard = num_replicas_per_shard

    def create_launch_request(self) -> dict:
        launch_request = self.create_base_launch_request()
        launch_request["NumNodeGroups"] = self.num_shards
        launch_request["ReplicasPerNodeGroup"] = self.num_replicas_per_shard
        return launch_request
```

## Mendefinisikan kelas yang akan merepresentasikan klaster itu sendiri
<a name="enable-python-define-classes-cluster"></a>

Sekarang, mari kita mendefinisikan beberapa kelas Python sederhana yang akan mewakili ElastiCache Valkey atau Redis OSS Cluster itu sendiri. Kelas ini akan memiliki bidang klien yang akan menampung klien boto3 untuk operasi ElastiCache manajemen seperti membuat cluster dan query API. ElastiCache

```
import botocore.config
import boto3

# Create boto3 client
def init_client(region: str = "us-east-1"):
    config = botocore.config.Config(retries={"max_attempts": 10, "mode": "standard"})
    init_request = dict()
    init_request["config"] = config
    init_request["service_name"] = "elasticache"
    init_request["region_name"] = region
    return boto3.client(**init_request) 
 
 
class ElastiCacheClusterBase:
    def __init__(self, name: str):
        self.name = name
        self.elasticache_client = init_client()

    def get_first_replication_group(self):
        return self.elasticache_client.describe_replication_groups(
        ReplicationGroupId=self.name
        )["ReplicationGroups"][0]
 
    def get_status(self) -> str:
        return self.get_first_replication_group()["Status"]
 
    def get_transit_encryption_enabled(self) -> bool:
        return self.get_first_replication_group()["TransitEncryptionEnabled"]
 
    def is_available(self) -> bool:
        return self.get_status() == "available"
        
    def is_modifying(self) -> bool:
        return self.get_status() == "modifying"
        
    def wait_for_available(self):
        while True:
            if self.is_available():
                break
            else:
                time.sleep(5)

    def wait_for_modifying(self):
        while True:
            if self.is_modifying():
                break
            else:
                time.sleep(5)
                
    def delete_cluster(self) -> bool:
        self.elasticache_client.delete_replication_group(
            ReplicationGroupId=self.name, RetainPrimaryCluster=False
        )
        
    def modify_transit_encryption_mode(self, new_transit_encryption_mode: str):
        # generate api call to migrate the cluster to TLS preffered or to TLS required
            self.elasticache_client.modify_replication_group(
                ReplicationGroupId=self.name,
                TransitEncryptionMode=new_transit_encryption_mode,
                TransitEncryptionEnabled=True,
                ApplyImmediately=True,
            )  
        self.wait_for_modifying()
              
 class ElastiCacheClusterCME(ElastiCacheClusterBase):
    def __init__(self, name: str):
        super().__init__(name)

    @classmethod
    def launch(cls, config: ConfigCME = None) -> ElastiCacheClusterCME:
        config = config or ConfigCME()
        print(config)
        new_cluster = ElastiCacheClusterCME(config.name)
        launch_request = config.create_launch_request()
        new_cluster.elasticache_client.create_replication_group(**launch_request)
        new_cluster.wait_for_available()
        return new_cluster

    def get_configuration_endpoint(self) -> str:
        return self.get_first_replication_group()["ConfigurationEndpoint"]["Address"]
     
#Since the code can throw exceptions, we define this class to make the code more readable and 
#so we won't forget to delete the cluster    
class ElastiCacheCMEManager:
    def __init__(self, config: ConfigCME = None):
        self.config = config or ConfigCME()

    def __enter__(self) -> ElastiCacheClusterCME:
        self.cluster = ElastiCacheClusterCME.launch(self.config)
        return self.cluster 
          
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cluster.delete_cluster()
```

## (Opsional) Buat kelas pembungkus untuk demo koneksi klien ke Valkey atau Redis OSS cluster
<a name="enable-python-create-wrapper"></a>

Sekarang, mari kita buat kelas wrapper untuk klien `redis-py-cluster`. Kelas wrapper ini akan mendukung pra-pengisian klaster dengan beberapa kunci lalu melakukan perintah `get` acak berulang.

**catatan**  
Ini adalah langkah opsional tetapi menyederhanakan kode fungsi utama yang muncul pada langkah selanjutnya.

```
import redis
improt random
from time import perf_counter_ns, time


class DowntimeTestClient:
    def __init__(self, client):
        self.client = client

        # num of keys prefilled
        self.prefilled = 0
        # percent of get above prefilled
        self.percent_get_above_prefilled = 10 # nil result expected when get hit above prefilled
        # total downtime in nano seconds 
        self.downtime_ns = 0
        # num of success and fail operations
        self.success_ops = 0
        self.fail_ops = 0
        self.connection_errors = 0
        self.timeout_errors = 0
        

    def replace_client(self, client):
        self.client = client

    def prefill_data(self, timelimit_sec=60):
        end_time = time() + timelimit_sec
        while time() < end_time:
            self.client.set(self.prefilled, self.prefilled)
            self.prefilled += 1

    # unsuccesful operations throw exceptions
    def _exec(self, func):
        try:
            start_ns = perf_counter_ns()
            func()
            self.success_ops += 1
            elapsed_ms = (perf_counter_ns() - start_ns) // 10 ** 6
            # upon succesful execution of func
            # reset random_key to None so that the next command
            # will use a new random key
            self.random_key = None

        except Exception as e:
            elapsed_ns = perf_counter_ns() - start_ns
            self.downtime_ns += elapsed_ns
            # in case of failure- increment the relevant counters so that we will keep track 
            # of how many connection issues we had while trying to communicate with
            # the cluster.
            self.fail_ops += 1
            if e.__class__ is redis.exceptions.ConnectionError:
                self.connection_errors += 1
            if e.__class__ is redis.exceptions.TimeoutError:
                self.timeout_errors += 1

    def _repeat_exec(self, func, seconds):
        end_time = time() + seconds
        while time() < end_time:
            self._exec(func)

    def _new_random_key_if_needed(self, percent_above_prefilled):
        if self.random_key is None:
            max = int((self.prefilled * (100 + percent_above_prefilled)) / 100)
            return random.randint(0, max)
        return self.random_key

    def _random_get(self):
        key = self._new_random_key_if_needed(self.percent_get_above_prefilled)
        result = self.client.get(key)
        # we know the key was set for sure only in the case key < self.prefilled
        if key < self.prefilled:
            assert result.decode("UTF-8") == str(key)


    def repeat_get(self, seconds=60):
        self._repeat_exec(self._random_get, seconds)

    def get_downtime_ms(self) -> int:
        return self.downtime_ns // 10 ** 6


    def do_get_until(self, cond_check):
        while not cond_check():
            self.repeat_get()
        # do one more get cycle once condition is met
        self.repeat_get()
```

## Membuat fungsi utama yang mendemonstrasikan proses pengubahan konfigurasi enkripsi bergerak
<a name="enable-python-main-function"></a>

Sekarang, mari kita definisikan fungsi utama, yang akan melakukan hal berikut:

1. Buat cluster menggunakan klien boto3 ElastiCache .

1. Inisialisasi klien `redis-py-cluster` yang akan terhubung ke klaster dengan koneksi TCP yang jelas tanpa TLS.

1. klien `redis-py-cluster` mengisi klaster dengan beberapa data. 

1. Klien boto3 akan memicu migrasi TLS dari tanpa TLS ke TLS diutamakan.

1. Sementara klaster sedang dimigrasikan ke TLS `Preferred`, klien `redis-py-cluster` TCP akan mengirim operasi `get` berulang ke klaster sampai migrasi selesai.

1. Setelah migrasi ke mode TLS `Preferred` selesai, kita akan memastikan bahwa klaster mendukung enkripsi bergerak. Setelah itu, kita akan membuat klien `redis-py-cluster` yang akan terhubung ke klaster dengan TLS.

1. Kita akan mengirim beberapa perintah `get` menggunakan klien TLS baru dan klien TCP lama.

1. Klien boto3 akan memicu migrasi TLS dari TLS `Preferred` ke TLS diperlukan.

1. Sementara cluster sedang dimigrasikan ke TLS diperlukan, klien redis-py-cluster TLS akan mengirim `get` operasi berulang ke cluster sampai migrasi selesai.

```
import redis

def init_cluster_client(
    cluster: ElastiCacheClusterCME, prefill_data: bool, TLS: bool = True) -> DowntimeTestClient:
    # we must use for the host name the cluster configuration endpoint. 
    redis_client = redis.RedisCluster(
        host=cluster.get_configuration_endpoint(), ssl=TLS, socket_timeout=0.25, socket_connect_timeout=0.1
    )
    test_client = DowntimeTestClient(redis_client)
    if prefill_data:
        test_client.prefill_data()
    return test_client

if __name__ == '__main__':
    config = ConfigCME(TLS=False, instance_type="cache.m5.large")

    with ElastiCacheCMEManager(config) as cluster:
        # create a client that will connect to the cluster with clear tcp connection
        test_client_tcp = init_cluster_client(cluster, prefill_data=True, TLS=False)
        
       # migrate the cluster to TLS Preferred
        cluster.modify_transit_encryption_mode(new_transit_encryption_mode="preferred")
        
        # do repeated get commands until the cluster finishes the migration to TLS Preferred
        test_client_tcp.do_get_until(cluster.is_available)
        
       # verify that in transit encryption is enabled so that clients will be able to connect to the cluster with TLS
        assert cluster.get_transit_encryption_enabled() == True
        
       # create a client that will connect to the cluster with TLS connection. 
        # we must first make sure that the cluster indeed supports TLS
        test_client_tls = init_cluster_client(cluster, prefill_data=True, TLS=True)
        
        # by doing get commands with the tcp client for 60 more seconds
       # we can verify that the existing tcp connection to the cluster still works 
        test_client_tcp.repeat_get(seconds=60)
        
        # do get commands with the new TLS client for 60 more seconds
        test_client_tcp.repeat_get(seconds=60)
        
       # migrate the cluster to TLS required
        cluster.modify_transit_encryption_mode(new_transit_encryption_mode="required")
        
       # from this point the tcp clients will be disconnected and we must not use them anymore.
       # do get commands with the TLS client until the cluster finishes migartion to TLS required mode.
        test_client_tls.do_get_until(cluster.is_available)
```