

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Python 在以節點為基礎的 Redis OSS 叢集上啟用傳輸中加密
<a name="in-transit-encryption-enable-python"></a>

下列指南將示範如何在最初在停用傳輸中加密的情況下建立的 Redis OSS 7.0 叢集上啟用傳輸中加密。在此程序期間，TCP 和 TLS 用戶端將繼續與叢集通訊，而不會停機。

Boto3 將從環境變量中取得所需的憑證 (`aws_access_key_id`、`aws_secret_access_key` 和 `aws_session_token`)。這些憑證將事先貼到同一個 bash 終端中，我們將在其中執行 `python3` 以處理本指南中顯示的 Python 程式碼。以下範例中的程式碼是從在相同 VPC 中啟動的 EC2 執行個體處理，該 VPC 將用於在其中建立 ElastiCache Redis OSS 叢集。

**注意**  
以下範例使用 boto3 SDK 進行 ElastiCache 管理作業 (叢集或使用者建立)，並使用 redis-py/redis-py-cluster 來處理資料。
您必須至少使用 boto3 版本 (=\$1) 1.26.39，才能搭配叢集修改 API 使用線上 TLS 遷移。
ElastiCache 僅支援具有 Valkey 7.2 版和更新版本或 Redis OSS 7.0 版或更新版本的叢集進行線上 TLS 遷移。因此，如果您的叢集執行的 Redis OSS 版本早於 7.0，則需要升級叢集的 Redis OSS 版本。如需這些版本差異的詳細資訊，請參閱 [與 Redis OSS 的主要引擎版本行為和相容性差異](VersionManagementConsiderations.md)。

**Topics**
+ [定義將啟動 ElastiCache Valkey 或 Redis OSS 叢集的字串常數](#enable-python-define-constants)
+ [定義叢集組態類別](#enable-python-define-classes)
+ [定義一個將代表叢集本身的類別](#enable-python-define-classes-cluster)
+ [（選用） 建立包裝函式類別，以示範用戶端與 Valkey 或 Redis OSS 叢集的連線](#enable-python-create-wrapper)
+ [建立主函數，以示範變更傳輸中加密組態的程序](#enable-python-main-function)

## 定義將啟動 ElastiCache Valkey 或 Redis OSS 叢集的字串常數
<a name="enable-python-define-constants"></a>

首先，讓我們定義一些簡單的 Python 字串常數，其中包含建立 ElastiCache 叢集所需的AWS實體名稱`Cache Subnet group`，例如 `security-group`、 和 `default parameter group`。所有這些AWS實體都必須在您願意使用的區域中AWS的帳戶中事先建立。

```
#Constants definitions 
SECURITY_GROUP = "sg-0492aa0a29c558427"
CLUSTER_DESCRIPTION = "This cluster has been launched as part of the online TLS migration user guide"
EC_SUBNET_GROUP = "client-testing"
DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED = "default.redis7.cluster.on"
```

## 定義叢集組態類別
<a name="enable-python-define-classes"></a>

現在，讓我們定義一些簡單的 Python 類別，其將代表叢集的組態，該類別將保留有關叢集的中繼資料，例如 Valkey 或 Redis OSS 版本、執行個體類型，以及是否啟用或停用傳輸中加密 (TLS)。

```
#Class definitions

class Config:
    def __init__(
        self,
        instance_type: str = "cache.t4g.small",
        version: str = "7.0",
        multi_az: bool = True,
        TLS: bool = True,
        name: str = None,
    ):
        self.instance_type = instance_type
        self.version = version
        self.multi_az = multi_az
        self.TLS = TLS
        self.name = name or f"tls-test"

    def create_base_launch_request(self):
        return {
            "ReplicationGroupId": self.name,
            "TransitEncryptionEnabled": self.TLS,
            "MultiAZEnabled": self.multi_az,
            "CacheNodeType": self.instance_type,
            "Engine": "redis",
            "EngineVersion": self.version,
            "CacheSubnetGroupName": EC_SUBNET_GROUP ,
            "CacheParameterGroupName": DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED ,
            "ReplicationGroupDescription": CLUSTER_DESCRIPTION,
            "SecurityGroupIds": [SECURITY_GROUP],
        }
        
class ConfigCME(Config):
    def __init__(
        self,
        instance_type: str = "cache.t4g.small",
        version: str = "7.0",
        multi_az: bool = True,
        TLS: bool = True,
        name: str = None,
        num_shards: int = 2,
        num_replicas_per_shard: int = 1,
    ):
        super().__init__(instance_type, version, multi_az, TLS, name)
        self.num_shards = num_shards
        self.num_replicas_per_shard = num_replicas_per_shard

    def create_launch_request(self) -> dict:
        launch_request = self.create_base_launch_request()
        launch_request["NumNodeGroups"] = self.num_shards
        launch_request["ReplicasPerNodeGroup"] = self.num_replicas_per_shard
        return launch_request
```

## 定義一個將代表叢集本身的類別
<a name="enable-python-define-classes-cluster"></a>

現在，讓我們定義一些簡單的 Python 類別，這些類別將代表 ElastiCache Valkey 或 Redis OSS 叢集本身。這個類別將有一個用戶端欄位，以保留一個 boto3 用戶端的 ElastiCache 管理操作，如建立叢集和查詢 ElastiCache API。

```
import botocore.config
import boto3

# Create boto3 client
def init_client(region: str = "us-east-1"):
    config = botocore.config.Config(retries={"max_attempts": 10, "mode": "standard"})
    init_request = dict()
    init_request["config"] = config
    init_request["service_name"] = "elasticache"
    init_request["region_name"] = region
    return boto3.client(**init_request) 
 
 
class ElastiCacheClusterBase:
    def __init__(self, name: str):
        self.name = name
        self.elasticache_client = init_client()

    def get_first_replication_group(self):
        return self.elasticache_client.describe_replication_groups(
        ReplicationGroupId=self.name
        )["ReplicationGroups"][0]
 
    def get_status(self) -> str:
        return self.get_first_replication_group()["Status"]
 
    def get_transit_encryption_enabled(self) -> bool:
        return self.get_first_replication_group()["TransitEncryptionEnabled"]
 
    def is_available(self) -> bool:
        return self.get_status() == "available"
        
    def is_modifying(self) -> bool:
        return self.get_status() == "modifying"
        
    def wait_for_available(self):
        while True:
            if self.is_available():
                break
            else:
                time.sleep(5)

    def wait_for_modifying(self):
        while True:
            if self.is_modifying():
                break
            else:
                time.sleep(5)
                
    def delete_cluster(self) -> bool:
        self.elasticache_client.delete_replication_group(
            ReplicationGroupId=self.name, RetainPrimaryCluster=False
        )
        
    def modify_transit_encryption_mode(self, new_transit_encryption_mode: str):
        # generate api call to migrate the cluster to TLS preffered or to TLS required
            self.elasticache_client.modify_replication_group(
                ReplicationGroupId=self.name,
                TransitEncryptionMode=new_transit_encryption_mode,
                TransitEncryptionEnabled=True,
                ApplyImmediately=True,
            )  
        self.wait_for_modifying()
              
 class ElastiCacheClusterCME(ElastiCacheClusterBase):
    def __init__(self, name: str):
        super().__init__(name)

    @classmethod
    def launch(cls, config: ConfigCME = None) -> ElastiCacheClusterCME:
        config = config or ConfigCME()
        print(config)
        new_cluster = ElastiCacheClusterCME(config.name)
        launch_request = config.create_launch_request()
        new_cluster.elasticache_client.create_replication_group(**launch_request)
        new_cluster.wait_for_available()
        return new_cluster

    def get_configuration_endpoint(self) -> str:
        return self.get_first_replication_group()["ConfigurationEndpoint"]["Address"]
     
#Since the code can throw exceptions, we define this class to make the code more readable and 
#so we won't forget to delete the cluster    
class ElastiCacheCMEManager:
    def __init__(self, config: ConfigCME = None):
        self.config = config or ConfigCME()

    def __enter__(self) -> ElastiCacheClusterCME:
        self.cluster = ElastiCacheClusterCME.launch(self.config)
        return self.cluster 
          
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cluster.delete_cluster()
```

## （選用） 建立包裝函式類別，以示範用戶端與 Valkey 或 Redis OSS 叢集的連線
<a name="enable-python-create-wrapper"></a>

現在，讓我們為 `redis-py-cluster` 用戶端建立一個包裝類別。這個包裝類別將支援使用一些鍵預先填充叢集，然後執行隨機重複的 `get` 命令。

**注意**  
這是一個選擇性步驟，但它簡化了在後面步驟中出現的主函數。

```
import redis
improt random
from time import perf_counter_ns, time


class DowntimeTestClient:
    def __init__(self, client):
        self.client = client

        # num of keys prefilled
        self.prefilled = 0
        # percent of get above prefilled
        self.percent_get_above_prefilled = 10 # nil result expected when get hit above prefilled
        # total downtime in nano seconds 
        self.downtime_ns = 0
        # num of success and fail operations
        self.success_ops = 0
        self.fail_ops = 0
        self.connection_errors = 0
        self.timeout_errors = 0
        

    def replace_client(self, client):
        self.client = client

    def prefill_data(self, timelimit_sec=60):
        end_time = time() + timelimit_sec
        while time() < end_time:
            self.client.set(self.prefilled, self.prefilled)
            self.prefilled += 1

    # unsuccesful operations throw exceptions
    def _exec(self, func):
        try:
            start_ns = perf_counter_ns()
            func()
            self.success_ops += 1
            elapsed_ms = (perf_counter_ns() - start_ns) // 10 ** 6
            # upon succesful execution of func
            # reset random_key to None so that the next command
            # will use a new random key
            self.random_key = None

        except Exception as e:
            elapsed_ns = perf_counter_ns() - start_ns
            self.downtime_ns += elapsed_ns
            # in case of failure- increment the relevant counters so that we will keep track 
            # of how many connection issues we had while trying to communicate with
            # the cluster.
            self.fail_ops += 1
            if e.__class__ is redis.exceptions.ConnectionError:
                self.connection_errors += 1
            if e.__class__ is redis.exceptions.TimeoutError:
                self.timeout_errors += 1

    def _repeat_exec(self, func, seconds):
        end_time = time() + seconds
        while time() < end_time:
            self._exec(func)

    def _new_random_key_if_needed(self, percent_above_prefilled):
        if self.random_key is None:
            max = int((self.prefilled * (100 + percent_above_prefilled)) / 100)
            return random.randint(0, max)
        return self.random_key

    def _random_get(self):
        key = self._new_random_key_if_needed(self.percent_get_above_prefilled)
        result = self.client.get(key)
        # we know the key was set for sure only in the case key < self.prefilled
        if key < self.prefilled:
            assert result.decode("UTF-8") == str(key)


    def repeat_get(self, seconds=60):
        self._repeat_exec(self._random_get, seconds)

    def get_downtime_ms(self) -> int:
        return self.downtime_ns // 10 ** 6


    def do_get_until(self, cond_check):
        while not cond_check():
            self.repeat_get()
        # do one more get cycle once condition is met
        self.repeat_get()
```

## 建立主函數，以示範變更傳輸中加密組態的程序
<a name="enable-python-main-function"></a>

現在，讓我們定義主函數，它將執行以下操作：

1. 使用 boto3 ElastiCache 用戶端建立叢集。

1. 初始化將使用不具備 TLS 的清楚 TCP 連線連接到叢集的 `redis-py-cluster` 用戶端。

1. `redis-py-cluster` 用戶端會用一些資料預先填充叢集。

1. boto3 用戶端將會觸發 TLS 從無 TLS 遷移至 TLS 偏好。

1. 當叢集正在遷移到 TLS `Preferred` 時，`redis-py-cluster` TCP 用戶端將向叢集發送重複的 `get` 操作，直到遷移完成。

1. 完成遷移至 TLS `Preferred` 之後，我們會宣告該叢集支援傳輸中加密。之後，我們將建立一個將使用 TLS 連接到叢集的 `redis-py-cluster` 用戶端。

1. 我們將使用新的 TLS 用戶端和舊的 TCP 用戶端發送一些 `get` 命令。

1. boto3 用戶端將觸發從 TLS `Preferred` 到 TLS 所需的 TLS 遷移。

1. 當叢集正在遷移到所需的 TLS 時，redis-py-cluster TLS 用戶端將向叢集發送重複的 `get` 操作，直到遷移完成。

```
import redis

def init_cluster_client(
    cluster: ElastiCacheClusterCME, prefill_data: bool, TLS: bool = True) -> DowntimeTestClient:
    # we must use for the host name the cluster configuration endpoint. 
    redis_client = redis.RedisCluster(
        host=cluster.get_configuration_endpoint(), ssl=TLS, socket_timeout=0.25, socket_connect_timeout=0.1
    )
    test_client = DowntimeTestClient(redis_client)
    if prefill_data:
        test_client.prefill_data()
    return test_client

if __name__ == '__main__':
    config = ConfigCME(TLS=False, instance_type="cache.m5.large")

    with ElastiCacheCMEManager(config) as cluster:
        # create a client that will connect to the cluster with clear tcp connection
        test_client_tcp = init_cluster_client(cluster, prefill_data=True, TLS=False)
        
       # migrate the cluster to TLS Preferred
        cluster.modify_transit_encryption_mode(new_transit_encryption_mode="preferred")
        
        # do repeated get commands until the cluster finishes the migration to TLS Preferred
        test_client_tcp.do_get_until(cluster.is_available)
        
       # verify that in transit encryption is enabled so that clients will be able to connect to the cluster with TLS
        assert cluster.get_transit_encryption_enabled() == True
        
       # create a client that will connect to the cluster with TLS connection. 
        # we must first make sure that the cluster indeed supports TLS
        test_client_tls = init_cluster_client(cluster, prefill_data=True, TLS=True)
        
        # by doing get commands with the tcp client for 60 more seconds
       # we can verify that the existing tcp connection to the cluster still works 
        test_client_tcp.repeat_get(seconds=60)
        
        # do get commands with the new TLS client for 60 more seconds
        test_client_tcp.repeat_get(seconds=60)
        
       # migrate the cluster to TLS required
        cluster.modify_transit_encryption_mode(new_transit_encryption_mode="required")
        
       # from this point the tcp clients will be disconnected and we must not use them anymore.
       # do get commands with the TLS client until the cluster finishes migartion to TLS required mode.
        test_client_tls.do_get_until(cluster.is_available)
```