

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Python 和 ElastiCache
<a name="ElastiCache-Getting-Started-Tutorials-Python"></a>

在本教學課程中，您會使用適用於 Python 的AWS SDK (Boto3) 撰寫簡單的程式來執行下列 ElastiCache 操作：
+ 建立 ElastiCache for Redis OSS 叢集 （啟用叢集模式和停用叢集模式）
+ 檢查使用者或使用者群組是否存在，否則請建立。（此功能適用於 Valkey 7.2 及更新版本，以及 Redis OSS 6.0 至 7.1。)
+ 連線至 ElastiCache
+ 執行各種作業，例如設定和取得字串、讀取和寫入串流，以及從發佈/訂閱頻道發佈和訂閱。

當您完成本教學課程時，您可以參考適用於 Python (Boto) 的AWS SDK 文件。下一節為 ElastiCache 專屬：[ElastiCache 低階用戶端](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/elasticache.html)

## 教學課程事前準備
<a name="ElastiCache-Getting-Started-Tutorials-Prerquisites"></a>
+ 設定AWS存取金鑰以使用AWS SDKs。如需詳細資訊，請參閱[設定 ElastiCache](set-up.md)。
+ 安裝 Python 3.0 或更新版本。如需詳細資訊，請參閱 [https://www.python.org/downloads](https://www.python.org/downloads)。如需說明，請參閱 Boto 3 文件中的[快速入門](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html)。

**Topics**
+ [教學課程事前準備](#ElastiCache-Getting-Started-Tutorials-Prerquisites)
+ [教學課程：建立 ElastiCache 叢集和使用者](#ElastiCache-Getting-Started-Tutorials-Create-Cluster-and-Users)
+ [教學課程：連線至 ElastiCache](#ElastiCache-Getting-Started-Tutorials-Connecting)
+ [使用範例](#ElastiCache-Getting-Started-Tutorials-Usage)

## 教學課程：建立 ElastiCache 叢集和使用者
<a name="ElastiCache-Getting-Started-Tutorials-Create-Cluster-and-Users"></a>

下列範例使用 boto3 SDK for ElastiCache for Redis OSS 管理操作 （叢集或使用者建立） 和 redis-py/redis-py-cluster 進行資料處理。

**Topics**
+ [建立停用叢集模式的叢集](#ElastiCache-Getting-Started-Tutorials-Create-Cluster)
+ [使用 TLS 和 RBAC 建立停用叢集模式的叢集](#ElastiCache-Getting-Started-Tutorials-RBAC)
+ [建立啟用叢集模式的叢集](#ElastiCache-Getting-Started-Tutorials-Cluster-Enabled)
+ [使用 TLS 和 RBAC 建立啟用叢集模式的叢集](#ElastiCache-Getting-Started-Tutorials-Cluster-RBAC)
+ [檢查使用者/使用者群組是否存在，若不存在則加以建立](#ElastiCache-Getting-Started-Tutorials-Users)

### 建立停用叢集模式的叢集
<a name="ElastiCache-Getting-Started-Tutorials-Create-Cluster"></a>

複製下列程式並貼入 *CreateClusterModeDisabledCluster.py* 檔案中。

```
import boto3
import logging

logging.basicConfig(level=logging.INFO)
client = boto3.client('elasticache')

def create_cluster_mode_disabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumCacheClusters=2,ReplicationGroupDescription='Sample cluster',ReplicationGroupId=None):
    """Creates an ElastiCache Cluster with cluster mode disabled

    Returns a dictionary with the API response

    :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used
    Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types
    :param EngineVersion: Engine version to be used. If not specified, latest will be used.
    :param NumCacheClusters: Number of nodes in the cluster. Minimum 1 (just a primary node) and maximun 6 (1 primary and 5 replicas).
    If not specified, cluster will be created with 1 primary and 1 replica.
    :param ReplicationGroupDescription: Description for the cluster.
    :param ReplicationGroupId: Name for the cluster
    :return: dictionary with the API results

    """
    if not ReplicationGroupId:
        return 'ReplicationGroupId parameter is required'

    response = client.create_replication_group(
        AutomaticFailoverEnabled=True,
        CacheNodeType=CacheNodeType,
        Engine='valkey',
        EngineVersion=EngineVersion,
        NumCacheClusters=NumCacheClusters,
        ReplicationGroupDescription=ReplicationGroupDescription,
        ReplicationGroupId=ReplicationGroupId,
        SnapshotRetentionLimit=30,
    )
    return response


if __name__ == '__main__':
    
    # Creates an ElastiCache Cluster mode disabled cluster, based on cache.m6g.large nodes, Valkey 8.0, one primary and two replicas
    elasticacheResponse = create_cluster_mode_disabled(
        #CacheNodeType='cache.m6g.large', 
        EngineVersion='8.0',
        NumCacheClusters=3,
        ReplicationGroupDescription='Valkey cluster mode disabled with replicas',
        ReplicationGroupId='valkey202104053'
        )
    
    logging.info(elasticacheResponse)
```

若要執行程式，請輸入下列命令：

 `python CreateClusterModeDisabledCluster.py`

如需詳細資訊，請參閱[在 ElastiCache 中管理叢集](Clusters.md)。

### 使用 TLS 和 RBAC 建立停用叢集模式的叢集
<a name="ElastiCache-Getting-Started-Tutorials-RBAC"></a>

為了確保安全性，您可以在建立停用叢集模式的叢集時，使用 Transport Layer Security (TLS) 和角色型存取控制 (RBAC)。與 Valkey 或 Redis OSS AUTH 不同，其中所有已驗證的用戶端都具有完整複寫群組存取權，如果其權杖已驗證，RBAC 可讓您透過使用者群組控制叢集存取權。這些使用者群組的設計目的是整理複寫群組的存取權。如需詳細資訊，請參閱[角色型存取控制 (RBAC) 規則數量](Clusters.RBAC.md)。

複製下列程式並貼入 *ClusterModeDisabledWithRBAC.py* 檔案中。

```
import boto3
import logging

logging.basicConfig(level=logging.INFO)
client = boto3.client('elasticache')

def create_cluster_mode_disabled_rbac(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumCacheClusters=2,ReplicationGroupDescription='Sample cluster',ReplicationGroupId=None, UserGroupIds=None, SecurityGroupIds=None,CacheSubnetGroupName=None):
    """Creates an ElastiCache Cluster with cluster mode disabled and RBAC

    Returns a dictionary with the API response

    :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used
    Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types
    :param EngineVersion: Engine version to be used. If not specified, latest will be used.
    :param NumCacheClusters: Number of nodes in the cluster. Minimum 1 (just a primary node) and maximun 6 (1 primary and 5 replicas).
    If not specified, cluster will be created with 1 primary and 1 replica.
    :param ReplicationGroupDescription: Description for the cluster.
    :param ReplicationGroupId: Mandatory name for the cluster.
    :param UserGroupIds: The ID of the user group to be assigned to the cluster.
    :param SecurityGroupIds: List of security groups to be assigned. If not defined, default will be used
    :param CacheSubnetGroupName: subnet group where the cluster will be placed. If not defined, default will be used.
    :return: dictionary with the API results

    """
    if not ReplicationGroupId:
        return {'Error': 'ReplicationGroupId parameter is required'}
    elif not isinstance(UserGroupIds,(list)):
        return {'Error': 'UserGroupIds parameter is required and must be a list'}

    params={'AutomaticFailoverEnabled': True, 
            'CacheNodeType': CacheNodeType, 
            'Engine': 'valkey', 
            'EngineVersion': EngineVersion, 
            'NumCacheClusters': NumCacheClusters, 
            'ReplicationGroupDescription': ReplicationGroupDescription, 
            'ReplicationGroupId': ReplicationGroupId, 
            'SnapshotRetentionLimit': 30, 
            'TransitEncryptionEnabled': True, 
            'UserGroupIds':UserGroupIds
        }

    # defaults will be used if CacheSubnetGroupName or SecurityGroups are not explicit.
    if isinstance(SecurityGroupIds,(list)):
        params.update({'SecurityGroupIds':SecurityGroupIds})
    if CacheSubnetGroupName:
        params.update({'CacheSubnetGroupName':CacheSubnetGroupName})

    response = client.create_replication_group(**params)
    return response

if __name__ == '__main__':

    # Creates an ElastiCache Cluster mode disabled cluster, based on cache.m6g.large nodes, Valkey 8.0, one primary and two replicas.
    # Assigns the existent user group "mygroup" for RBAC authentication
   
    response=create_cluster_mode_disabled_rbac(
        CacheNodeType='cache.m6g.large',
        EngineVersion='8.0',
        NumCacheClusters=3,
        ReplicationGroupDescription='Valkey cluster mode disabled with replicas',
        ReplicationGroupId='valkey202104',
        UserGroupIds=[
            'mygroup'
        ],
        SecurityGroupIds=[
            'sg-7cc73803'
        ],
        CacheSubnetGroupName='default'
    )

    logging.info(response)
```

若要執行程式，請輸入下列命令：

 `python ClusterModeDisabledWithRBAC.py`

如需詳細資訊，請參閱[在 ElastiCache 中管理叢集](Clusters.md)。

### 建立啟用叢集模式的叢集
<a name="ElastiCache-Getting-Started-Tutorials-Cluster-Enabled"></a>

複製下列程式並貼入 *ClusterModeEnabled.py* 檔案中。

```
import boto3
import logging

logging.basicConfig(level=logging.INFO)
client = boto3.client('elasticache')

def create_cluster_mode_enabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumNodeGroups=1,ReplicasPerNodeGroup=1, ReplicationGroupDescription='Sample cache with cluster mode enabled',ReplicationGroupId=None):
    """Creates an ElastiCache Cluster with cluster mode enabled

    Returns a dictionary with the API response

    :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used
    Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types
    :param EngineVersion: Engine version to be used. If not specified, latest will be used.
    :param NumNodeGroups: Number of shards in the cluster. Minimum 1 and maximun 90.
    If not specified, cluster will be created with 1 shard.
    :param ReplicasPerNodeGroup: Number of replicas per shard. If not specified 1 replica per shard will be created.
    :param ReplicationGroupDescription: Description for the cluster.
    :param ReplicationGroupId: Name for the cluster
    :return: dictionary with the API results

    """
    if not ReplicationGroupId:
        return 'ReplicationGroupId parameter is required'
    
    response = client.create_replication_group(
        AutomaticFailoverEnabled=True,
        CacheNodeType=CacheNodeType,
        Engine='valkey',
        EngineVersion=EngineVersion,
        ReplicationGroupDescription=ReplicationGroupDescription,
        ReplicationGroupId=ReplicationGroupId,
    #   Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary node (implicit) and 2 replicas (replicasPerNodeGroup)
        NumNodeGroups=NumNodeGroups,
        ReplicasPerNodeGroup=ReplicasPerNodeGroup,
        CacheParameterGroupName='default.valkey7.2.cluster.on'
    )

    return response


# Creates a cluster mode enabled 
response = create_cluster_mode_enabled(
    CacheNodeType='cache.m6g.large',
    EngineVersion='6.0',
    ReplicationGroupDescription='Valkey cluster mode enabled with replicas',
    ReplicationGroupId='valkey20210',
#   Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary (implicit) and 2 replicas (replicasPerNodeGroup)
    NumNodeGroups=2,
    ReplicasPerNodeGroup=1,
)

logging.info(response)
```

若要執行程式，請輸入下列命令：

 `python ClusterModeEnabled.py`

如需詳細資訊，請參閱[在 ElastiCache 中管理叢集](Clusters.md)。

### 使用 TLS 和 RBAC 建立啟用叢集模式的叢集
<a name="ElastiCache-Getting-Started-Tutorials-Cluster-RBAC"></a>

為了確保安全性，您可以在建立啟用叢集模式的叢集時，使用 Transport Layer Security (TLS) 和角色型存取控制 (RBAC)。與 Valkey 或 Redis OSS AUTH 不同，其中所有已驗證的用戶端都具有完整複寫群組存取權，如果其權杖已驗證，RBAC 可讓您透過使用者群組控制叢集存取權。這些使用者群組的設計目的是整理複寫群組的存取權。如需詳細資訊，請參閱[角色型存取控制 (RBAC) 規則數量](Clusters.RBAC.md)。

複製下列程式並貼入 *ClusterModeEnabledWithRBAC.py* 檔案中。

```
import boto3
import logging

logging.basicConfig(level=logging.INFO)
client = boto3.client('elasticache')

def create_cluster_mode_enabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumNodeGroups=1,ReplicasPerNodeGroup=1, ReplicationGroupDescription='Sample cache with cluster mode enabled',ReplicationGroupId=None,UserGroupIds=None, SecurityGroupIds=None,CacheSubnetGroupName=None,CacheParameterGroupName='default.valkey7.2.cluster.on'):
    """Creates an ElastiCache Cluster with cluster mode enabled and RBAC

    Returns a dictionary with the API response

    :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used
    Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types
    :param EngineVersion: Engine version to be used. If not specified, latest will be used.
    :param NumNodeGroups: Number of shards in the cluster. Minimum 1 and maximun 90.
    If not specified, cluster will be created with 1 shard.
    :param ReplicasPerNodeGroup: Number of replicas per shard. If not specified 1 replica per shard will be created.
    :param ReplicationGroupDescription: Description for the cluster.
    :param ReplicationGroupId: Name for the cluster.
    :param CacheParameterGroupName: Parameter group to be used. Must be compatible with the engine version and cluster mode enabled.
    :return: dictionary with the API results

    """
    if not ReplicationGroupId:
        return 'ReplicationGroupId parameter is required'
    elif not isinstance(UserGroupIds,(list)):
        return {'Error': 'UserGroupIds parameter is required and must be a list'}

    params={'AutomaticFailoverEnabled': True, 
            'CacheNodeType': CacheNodeType, 
            'Engine': 'valkey', 
            'EngineVersion': EngineVersion, 
            'ReplicationGroupDescription': ReplicationGroupDescription, 
            'ReplicationGroupId': ReplicationGroupId, 
            'SnapshotRetentionLimit': 30, 
            'TransitEncryptionEnabled': True, 
            'UserGroupIds':UserGroupIds,
            'NumNodeGroups': NumNodeGroups,
            'ReplicasPerNodeGroup': ReplicasPerNodeGroup,
            'CacheParameterGroupName': CacheParameterGroupName
        }

    # defaults will be used if CacheSubnetGroupName or SecurityGroups are not explicit.
    if isinstance(SecurityGroupIds,(list)):
        params.update({'SecurityGroupIds':SecurityGroupIds})
    if CacheSubnetGroupName:
        params.update({'CacheSubnetGroupName':CacheSubnetGroupName})

    response = client.create_replication_group(**params)
    return response

if __name__ == '__main__':
    # Creates a cluster mode enabled cluster
    response = create_cluster_mode_enabled(
        CacheNodeType='cache.m6g.large',
        EngineVersion='7.2',
        ReplicationGroupDescription='Valkey cluster mode enabled with replicas',
        ReplicationGroupId='valkey2021',
    #   Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary (implicit) and 2 replicas (replicasPerNodeGroup)
        NumNodeGroups=2,
        ReplicasPerNodeGroup=1,
        UserGroupIds=[
            'mygroup'
        ],
        SecurityGroupIds=[
            'sg-7cc73803'
        ],
        CacheSubnetGroupName='default'

    )
    
    logging.info(response)
```

若要執行程式，請輸入下列命令：

 `python ClusterModeEnabledWithRBAC.py`

如需詳細資訊，請參閱[在 ElastiCache 中管理叢集](Clusters.md)。

### 檢查使用者/使用者群組是否存在，若不存在則加以建立
<a name="ElastiCache-Getting-Started-Tutorials-Users"></a>

使用 RBAC，您可以建立使用者，並使用存取字串將特定許可指派給他們。您可以將使用者指派給與特定角色 （管理員、人力資源） 一致的使用者群組，然後部署到一或多個 ElastiCache for Redis OSS 複寫群組。透過這樣做，您可以在使用相同 Valkey 或 Redis OSS 複寫群組的用戶端之間建立安全界限，並防止用戶端存取彼此的資料。如需詳細資訊，請參閱[角色型存取控制 (RBAC) 規則數量](Clusters.RBAC.md)。

複製下列程式並貼入 *UserAndUserGroups.py* 檔案中。更新提供憑證的機制。此範例中的憑證顯示為可取代，並且有指派的未宣告項目。避免對憑證進行硬式編碼。

此範例使用具有 使用者許可的存取字串。如需存取字串的詳細資訊，請參閱 [使用存取字串指定許可](Clusters.RBAC.md#Access-string)。

```
import boto3
import logging

logging.basicConfig(level=logging.INFO)
client = boto3.client('elasticache')

def check_user_exists(UserId):
    """Checks if UserId exists

    Returns True if UserId exists, otherwise False
    :param UserId: ElastiCache User ID
    :return: True|False
    """
    try:
        response = client.describe_users(
            UserId=UserId,
        )
        if response['Users'][0]['UserId'].lower() == UserId.lower():
            return True
    except Exception as e:
        if e.response['Error']['Code'] == 'UserNotFound':
            logging.info(e.response['Error'])
            return False
        else:
            raise

def check_group_exists(UserGroupId):
    """Checks if UserGroupID exists

    Returns True if Group ID exists, otherwise False
    :param UserGroupId: ElastiCache User ID
    :return: True|False
    """

    try:
        response = client.describe_user_groups(
            UserGroupId=UserGroupId
        )
        if response['UserGroups'][0]['UserGroupId'].lower() == UserGroupId.lower():
            return True
    except Exception as e:
        if e.response['Error']['Code'] == 'UserGroupNotFound':
            logging.info(e.response['Error'])
            return False
        else:
            raise

def create_user(UserId=None,UserName=None,Password=None,AccessString=None):
    """Creates a new user

    Returns the ARN for the newly created user or the error message
    :param UserId: ElastiCache user ID. User IDs must be unique
    :param UserName: ElastiCache user name. ElastiCache allows multiple users with the same name as long as the associated user ID is unique.
    :param Password: Password for user. Must have at least 16 chars.
    :param AccessString: Access string with the permissions for the user. 
    :return: user ARN
    """
    try:
        response = client.create_user(
            UserId=UserId,
            UserName=UserName,
            Engine='Redis',
            Passwords=[Password],
            AccessString=AccessString,
            NoPasswordRequired=False
        )
        return response['ARN']
    except Exception as e:
        logging.info(e.response['Error'])
        return e.response['Error']

def create_group(UserGroupId=None, UserIds=None):
    """Creates a new group.
    A default user is required (mandatory) and should be specified in the UserIds list

    Return: Group ARN
    :param UserIds: List with user IDs to be associated with the new group. A default user is required
    :param UserGroupId: The ID (name) for the group
    :return: Group ARN
    """
    try:
        response = client.create_user_group(
            UserGroupId=UserGroupId,
            Engine='Redis',
            UserIds=UserIds
        )
        return response['ARN']
    except Exception as e:
        logging.info(e.response['Error'])


if __name__ == '__main__':
    
    groupName='mygroup2'
    userName = 'myuser2'
    userId=groupName+'-'+userName

    # Creates a new user if the user ID does not exist.
    for tmpUserId,tmpUserName in [ (userId,userName), (groupName+'-default','default')]:
        if not check_user_exists(tmpUserId):
            response=create_user(UserId=tmpUserId, UserName=EXAMPLE,Password=EXAMPLE,AccessString='on ~* +@all')
            logging.info(response)
        # assigns the new user ID to the user group
    if not check_group_exists(groupName):
        UserIds = [ userId , groupName+'-default']
        response=create_group(UserGroupId=groupName,UserIds=UserIds)
        logging.info(response)
```

若要執行程式，請輸入下列命令：

 `python UserAndUserGroups.py`

## 教學課程：連線至 ElastiCache
<a name="ElastiCache-Getting-Started-Tutorials-Connecting"></a>

下列範例使用 Valkey 或 Redis OSS 用戶端連線至 ElastiCache。

**Topics**
+ [連線至停用叢集模式的叢集](#ElastiCache-Getting-Started-Tutorials-Connecting-cluster-mode-disabled)
+ [連線至啟用叢集模式的叢集](#ElastiCache-Getting-Started-Tutorials-Connecting-cluster-mode-enabled)

### 連線至停用叢集模式的叢集
<a name="ElastiCache-Getting-Started-Tutorials-Connecting-cluster-mode-disabled"></a>

複製下列程式並貼入 *ConnectClusterModeDisabled.py* 檔案中。更新提供憑證的機制。此範例中的憑證顯示為可取代，並且有指派的未宣告項目。避免對憑證進行硬式編碼。

```
from redis import Redis
import logging

logging.basicConfig(level=logging.INFO)
redis = Redis(host='primary.xxx.yyyyyy.zzz1.cache.amazonaws.com', port=6379, decode_responses=True, ssl=True, username=example, password=EXAMPLE)

if redis.ping():
    logging.info("Connected to Redis")
```

若要執行程式，請輸入下列命令：

 `python ConnectClusterModeDisabled.py`

### 連線至啟用叢集模式的叢集
<a name="ElastiCache-Getting-Started-Tutorials-Connecting-cluster-mode-enabled"></a>

複製下列程式並貼入 *ConnectClusterModeEnabled.py* 檔案中。

```
from rediscluster import RedisCluster
import logging

logging.basicConfig(level=logging.INFO)
redis = RedisCluster(startup_nodes=[{"host": "xxx.yyy.clustercfg.zzz1.cache.amazonaws.com","port": "6379"}], decode_responses=True,skip_full_coverage_check=True)

if redis.ping():
    logging.info("Connected to Redis")
```

若要執行程式，請輸入下列命令：

 `python ConnectClusterModeEnabled.py`

## 使用範例
<a name="ElastiCache-Getting-Started-Tutorials-Usage"></a>

下列範例使用適用於 ElastiCache 的 boto3 開發套件來使用 ElastiCache for Redis OSS。

**Topics**
+ [設定和取得字串](#ElastiCache-Getting-Started-Tutorials-set-strings)
+ [設定和取得具有多個項目的雜湊](#ElastiCache-Getting-Started-Tutorials-set-hash)
+ [從發佈/訂閱頻道發佈 (寫入) 和訂閱 (讀取)](#ElastiCache-Getting-Started-Tutorials-pub-sub)
+ [從串流寫入和讀取](#ElastiCache-Getting-Started-Tutorials-read-write-stream)

### 設定和取得字串
<a name="ElastiCache-Getting-Started-Tutorials-set-strings"></a>

複製下列程式並貼入 *SetAndGetStrings.py* 檔案中。

```
import time
import logging
logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s')

keyName='mykey'
currTime=time.ctime(time.time())

# Set the key 'mykey' with the current date and time as value. 
# The Key will expire and removed from cache in 60 seconds.
redis.set(keyName, currTime, ex=60)

# Sleep just for better illustration of TTL (expiration) value
time.sleep(5)

# Retrieve the key value and current TTL
keyValue=redis.get(keyName)
keyTTL=redis.ttl(keyName)

logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValue, keyTTL))
```

若要執行程式，請輸入下列命令：

 `python SetAndGetStrings.py`

### 設定和取得具有多個項目的雜湊
<a name="ElastiCache-Getting-Started-Tutorials-set-hash"></a>

複製下列程式並貼入 *SetAndGetHash.py* 檔案中。

```
import logging
import time

logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s')

keyName='mykey'
keyValues={'datetime': time.ctime(time.time()), 'epochtime': time.time()}

# Set the hash 'mykey' with the current date and time in human readable format (datetime field) and epoch number (epochtime field). 
redis.hset(keyName, mapping=keyValues)

# Set the key to expire and removed from cache in 60 seconds.
redis.expire(keyName, 60)

# Sleep just for better illustration of TTL (expiration) value
time.sleep(5)

# Retrieves all the fields and current TTL
keyValues=redis.hgetall(keyName)
keyTTL=redis.ttl(keyName)

logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValues, keyTTL))
```

若要執行程式，請輸入下列命令：

 `python SetAndGetHash.py`

### 從發佈/訂閱頻道發佈 (寫入) 和訂閱 (讀取)
<a name="ElastiCache-Getting-Started-Tutorials-pub-sub"></a>

複製下列程式並貼入 *PubAndSub.py* 檔案中。

```
import logging
import time

def handlerFunction(message):
    """Prints message got from PubSub channel to the log output

    Return None
    :param message: message to log
    """
    logging.info(message)

logging.basicConfig(level=logging.INFO)
redis = Redis(host="redis202104053.tihewd.ng.0001.use1.cache.amazonaws.com", port=6379, decode_responses=True)


# Creates the subscriber connection on "mychannel"
subscriber = redis.pubsub()
subscriber.subscribe(**{'mychannel': handlerFunction})

# Creates a new thread to watch for messages while the main process continues with its routines
thread = subscriber.run_in_thread(sleep_time=0.01)

# Creates publisher connection on "mychannel"
redis.publish('mychannel', 'My message')

# Publishes several messages. Subscriber thread will read and print on log.
while True:
    redis.publish('mychannel',time.ctime(time.time()))
    time.sleep(1)
```

若要執行程式，請輸入下列命令：

 `python PubAndSub.py`

### 從串流寫入和讀取
<a name="ElastiCache-Getting-Started-Tutorials-read-write-stream"></a>

複製下列程式並貼入 *ReadWriteStream.py* 檔案中。

```
from redis import Redis
import redis.exceptions as exceptions
import logging
import time
import threading

logging.basicConfig(level=logging.INFO)

def writeMessage(streamName):
    """Starts a loop writting the current time and thread name to 'streamName'

    :param streamName: Stream (key) name to write messages.
    """
    fieldsDict={'writerId':threading.currentThread().getName(),'myvalue':None}
    while True:
        fieldsDict['myvalue'] = time.ctime(time.time())
        redis.xadd(streamName,fieldsDict)
        time.sleep(1)

def readMessage(groupName=None,streamName=None):
    """Starts a loop reading from 'streamName'
    Multiple threads will read from the same stream consumer group. Consumer group is used to coordinate data distribution.
    Once a thread acknowleges the message, it won't be provided again. If message wasn't acknowledged, it can be served to another thread.

    :param groupName: stream group were multiple threads will read.
    :param streamName: Stream (key) name where messages will be read.
    """

    readerID=threading.currentThread().getName()
    while True:
        try:
            # Check if the stream has any message
            if redis.xlen(streamName)>0:
                # Check if if the messages are new (not acknowledged) or not (already processed)
                streamData=redis.xreadgroup(groupName,readerID,{streamName:'>'},count=1)
                if len(streamData) > 0:
                    msgId,message = streamData[0][1][0]
                    logging.info("{}: Got {} from ID {}".format(readerID,message,msgId))
                    #Do some processing here. If the message has been processed sucessfuly, acknowledge it and (optional) delete the message.
                    redis.xack(streamName,groupName,msgId)
                    logging.info("Stream message ID {} read and processed successfuly by {}".format(msgId,readerID))
                    redis.xdel(streamName,msgId)
            else:
                pass
        except:
            raise
            
        time.sleep(0.5)

# Creates the stream 'mystream' and consumer group 'myworkergroup' where multiple threads will write/read.
try:
    redis.xgroup_create('mystream','myworkergroup',mkstream=True)
except exceptions.ResponseError as e:
    logging.info("Consumer group already exists. Will continue despite the error: {}".format(e))
except:
    raise

# Starts 5 writer threads.
for writer_no in range(5):
    writerThread = threading.Thread(target=writeMessage, name='writer-'+str(writer_no), args=('mystream',),daemon=True)
    writerThread.start()

# Starts 10 reader threads
for reader_no in range(10):
    readerThread = threading.Thread(target=readMessage, name='reader-'+str(reader_no), args=('myworkergroup','mystream',),daemon=True)
    readerThread.daemon = True
    readerThread.start()

# Keep the code running for 30 seconds
time.sleep(30)
```

若要執行程式，請輸入下列命令：

 `python ReadWriteStream.py`