Python 및 ElastiCache - Amazon ElastiCache

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Python 및 ElastiCache

이 자습서에서는 for Python(Boto3)을 AWS SDK 사용하여 다음 ElastiCache (Redis OSS) 작업을 수행하는 간단한 프로그램을 작성합니다.

  • 클러스터 생성 ElastiCache (Redis OSS)(클러스터 모드 활성화 및 클러스터 모드 비활성화)

  • 사용자 또는 사용자 그룹이 존재하는지 확인하고 그렇지 않으면 생성합니다. (이 기능은 Valkey 7.2 이상 및 Redis OSS 6.0 이상에서 사용할 수 있습니다.)

  • 에 연결 ElastiCache

  • 문자열 설정 및 가져오기, 스트림에서 읽기 및 쓰기, 게시/구독 채널의 게시 및 구독과 같은 작업을 수행합니다.

이 자습서를 진행하면서 Python용(Boto) 설명서를 참조 AWS SDK할 수 있습니다. 다음 섹션은 에 따라 다릅니다 ElastiCache. ElastiCache 하위 수준 클라이언트

자습서 사전 요구 사항

  • 를 사용하도록 AWS 액세스 키를 설정합니다 AWS SDKs. 자세한 내용은 설정 ElastiCache 단원을 참조하십시오.

  • Python 3.0 이상을 설치합니다. 자세한 내용은 https://www.python.org/downloads 참조하세요. 지침은 Boto 3 설명서의 Quickstart 섹션을 참조하세요.

자습서: ElastiCache 클러스터 및 사용자 생성

다음 예제에서는 ElastiCache (Redis OSS) 관리 작업(클러스터 또는 사용자 생성)에 boto3를 사용하고 데이터 처리를 SDK 위해 redis-py/redis-py-cluster 를 사용합니다.

클러스터 모드가 비활성화된 클러스터 생성

다음 프로그램을 복사하여 CreateClusterModeDisabledCluster.py라는 파일에 붙여 넣습니다.

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_disabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumCacheClusters=2,ReplicationGroupDescription='Sample cache cluster',ReplicationGroupId=None): """Creates an ElastiCache Cluster with cluster mode disabled Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumCacheClusters: Number of nodes in the cluster. Minimum 1 (just a primary node) and maximun 6 (1 primary and 5 replicas). If not specified, cluster will be created with 1 primary and 1 replica. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Name for the cluster :return: dictionary with the API results """ if not ReplicationGroupId: return 'ReplicationGroupId parameter is required' response = client.create_replication_group( AutomaticFailoverEnabled=True, CacheNodeType=CacheNodeType, Engine='valkey', EngineVersion=EngineVersion, NumCacheClusters=NumCacheClusters, ReplicationGroupDescription=ReplicationGroupDescription, ReplicationGroupId=ReplicationGroupId, SnapshotRetentionLimit=30, ) return response if __name__ == '__main__': # Creates an ElastiCache Cluster mode disabled cluster, based on cache.m6g.large nodes, Valkey 7.2, one primary and two replicas elasticacheResponse = create_cluster_mode_disabled( #CacheNodeType='cache.m6g.large', EngineVersion='7.2', NumCacheClusters=3, ReplicationGroupDescription='Valkey cluster mode disabled with replicas', ReplicationGroupId='valkey202104053' ) logging.info(elasticacheResponse)

프로그램을 실행하려면 다음 명령을 입력합니다.

python CreateClusterModeDisabledCluster.py

자세한 내용은 에서 클러스터 관리 ElastiCache 단원을 참조하십시오.

TLS 및 를 사용하여 클러스터 모드 비활성화 클러스터 생성 RBAC

보안을 보장하기 위해 클러스터 모드 비활성화 클러스터를 생성할 때 전송 계층 보안(TLS) 및 역할 기반 액세스 제어(RBAC)를 사용할 수 있습니다. 토큰이 인증된 경우 모든 인증된 클라이언트OSSAUTH가 전체 복제 그룹 액세스 권한을 갖는 Valkey 또는 Redis와 달리 RBAC를 사용하면 사용자 그룹을 통해 클러스터 액세스를 제어할 수 있습니다. 이러한 사용자 그룹은 복제 그룹에 대한 액세스를 구성하는 방법으로 설계되었습니다. 자세한 내용은 역할 기반 액세스 제어(RBAC) 단원을 참조하십시오.

다음 프로그램을 복사하여 ClusterModeDisabledWithRBAC.py라는 파일에 붙여 넣습니다.

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_disabled_rbac(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumCacheClusters=2,ReplicationGroupDescription='Sample cache cluster',ReplicationGroupId=None, UserGroupIds=None, SecurityGroupIds=None,CacheSubnetGroupName=None): """Creates an ElastiCache Cluster with cluster mode disabled and RBAC Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumCacheClusters: Number of nodes in the cluster. Minimum 1 (just a primary node) and maximun 6 (1 primary and 5 replicas). If not specified, cluster will be created with 1 primary and 1 replica. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Mandatory name for the cluster. :param UserGroupIds: The ID of the user group to be assigned to the cluster. :param SecurityGroupIds: List of security groups to be assigned. If not defined, default will be used :param CacheSubnetGroupName: subnet group where the cluster will be placed. If not defined, default will be used. :return: dictionary with the API results """ if not ReplicationGroupId: return {'Error': 'ReplicationGroupId parameter is required'} elif not isinstance(UserGroupIds,(list)): return {'Error': 'UserGroupIds parameter is required and must be a list'} params={'AutomaticFailoverEnabled': True, 'CacheNodeType': CacheNodeType, 'Engine': 'valkey', 'EngineVersion': EngineVersion, 'NumCacheClusters': NumCacheClusters, 'ReplicationGroupDescription': ReplicationGroupDescription, 'ReplicationGroupId': ReplicationGroupId, 'SnapshotRetentionLimit': 30, 'TransitEncryptionEnabled': True, 'UserGroupIds':UserGroupIds } # defaults will be used if CacheSubnetGroupName or SecurityGroups are not explicit. if isinstance(SecurityGroupIds,(list)): params.update({'SecurityGroupIds':SecurityGroupIds}) if CacheSubnetGroupName: params.update({'CacheSubnetGroupName':CacheSubnetGroupName}) response = client.create_replication_group(**params) return response if __name__ == '__main__': # Creates an ElastiCache Cluster mode disabled cluster, based on cache.m6g.large nodes, Valkey 7.2, one primary and two replicas. # Assigns the existent user group "mygroup" for RBAC authentication response=create_cluster_mode_disabled_rbac( CacheNodeType='cache.m6g.large', EngineVersion='7.2', NumCacheClusters=3, ReplicationGroupDescription='Valkey cluster mode disabled with replicas', ReplicationGroupId='valkey202104', UserGroupIds=[ 'mygroup' ], SecurityGroupIds=[ 'sg-7cc73803' ], CacheSubnetGroupName='default' ) logging.info(response)

프로그램을 실행하려면 다음 명령을 입력합니다.

python ClusterModeDisabledWithRBAC.py

자세한 내용은 에서 클러스터 관리 ElastiCache 단원을 참조하십시오.

클러스터 모드가 활성화된 클러스터 생성

다음 프로그램을 복사하여 ClusterModeEnabled.py라는 파일에 붙여 넣습니다.

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_enabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumNodeGroups=1,ReplicasPerNodeGroup=1, ReplicationGroupDescription='Sample cache with cluster mode enabled',ReplicationGroupId=None): """Creates an ElastiCache Cluster with cluster mode enabled Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumNodeGroups: Number of shards in the cluster. Minimum 1 and maximun 90. If not specified, cluster will be created with 1 shard. :param ReplicasPerNodeGroup: Number of replicas per shard. If not specified 1 replica per shard will be created. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Name for the cluster :return: dictionary with the API results """ if not ReplicationGroupId: return 'ReplicationGroupId parameter is required' response = client.create_replication_group( AutomaticFailoverEnabled=True, CacheNodeType=CacheNodeType, Engine='valkey', EngineVersion=EngineVersion, ReplicationGroupDescription=ReplicationGroupDescription, ReplicationGroupId=ReplicationGroupId, # Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary node (implicit) and 2 replicas (replicasPerNodeGroup) NumNodeGroups=NumNodeGroups, ReplicasPerNodeGroup=ReplicasPerNodeGroup, CacheParameterGroupName='default.valkey7.2.cluster.on' ) return response # Creates a cluster mode enabled response = create_cluster_mode_enabled( CacheNodeType='cache.m6g.large', EngineVersion='6.0', ReplicationGroupDescription='Valkey cluster mode enabled with replicas', ReplicationGroupId='valkey20210', # Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary (implicit) and 2 replicas (replicasPerNodeGroup) NumNodeGroups=2, ReplicasPerNodeGroup=1, ) logging.info(response)

프로그램을 실행하려면 다음 명령을 입력합니다.

python ClusterModeEnabled.py

자세한 내용은 에서 클러스터 관리 ElastiCache 단원을 참조하십시오.

TLS 및 를 사용하여 클러스터 모드 활성화 클러스터 생성 RBAC

보안을 보장하기 위해 클러스터 모드 활성화 클러스터를 생성할 때 전송 계층 보안(TLS) 및 역할 기반 액세스 제어(RBAC)를 사용할 수 있습니다. 토큰이 인증된 경우 모든 인증된 클라이언트OSSAUTH가 전체 복제 그룹 액세스 권한을 갖는 Valkey 또는 Redis와 달리 RBAC를 사용하면 사용자 그룹을 통해 클러스터 액세스를 제어할 수 있습니다. 이러한 사용자 그룹은 복제 그룹에 대한 액세스를 구성하는 방법으로 설계되었습니다. 자세한 내용은 역할 기반 액세스 제어(RBAC) 단원을 참조하십시오.

다음 프로그램을 복사하여 ClusterModeEnabledWithRBAC.py라는 파일에 붙여 넣습니다.

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_enabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumNodeGroups=1,ReplicasPerNodeGroup=1, ReplicationGroupDescription='Sample cache with cluster mode enabled',ReplicationGroupId=None,UserGroupIds=None, SecurityGroupIds=None,CacheSubnetGroupName=None,CacheParameterGroupName='default.valkey7.2.cluster.on'): """Creates an ElastiCache Cluster with cluster mode enabled and RBAC Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumNodeGroups: Number of shards in the cluster. Minimum 1 and maximun 90. If not specified, cluster will be created with 1 shard. :param ReplicasPerNodeGroup: Number of replicas per shard. If not specified 1 replica per shard will be created. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Name for the cluster. :param CacheParameterGroupName: Parameter group to be used. Must be compatible with the engine version and cluster mode enabled. :return: dictionary with the API results """ if not ReplicationGroupId: return 'ReplicationGroupId parameter is required' elif not isinstance(UserGroupIds,(list)): return {'Error': 'UserGroupIds parameter is required and must be a list'} params={'AutomaticFailoverEnabled': True, 'CacheNodeType': CacheNodeType, 'Engine': 'valkey', 'EngineVersion': EngineVersion, 'ReplicationGroupDescription': ReplicationGroupDescription, 'ReplicationGroupId': ReplicationGroupId, 'SnapshotRetentionLimit': 30, 'TransitEncryptionEnabled': True, 'UserGroupIds':UserGroupIds, 'NumNodeGroups': NumNodeGroups, 'ReplicasPerNodeGroup': ReplicasPerNodeGroup, 'CacheParameterGroupName': CacheParameterGroupName } # defaults will be used if CacheSubnetGroupName or SecurityGroups are not explicit. if isinstance(SecurityGroupIds,(list)): params.update({'SecurityGroupIds':SecurityGroupIds}) if CacheSubnetGroupName: params.update({'CacheSubnetGroupName':CacheSubnetGroupName}) response = client.create_replication_group(**params) return response if __name__ == '__main__': # Creates a cluster mode enabled cluster response = create_cluster_mode_enabled( CacheNodeType='cache.m6g.large', EngineVersion='7.2', ReplicationGroupDescription='Valkey cluster mode enabled with replicas', ReplicationGroupId='valkey2021', # Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary (implicit) and 2 replicas (replicasPerNodeGroup) NumNodeGroups=2, ReplicasPerNodeGroup=1, UserGroupIds=[ 'mygroup' ], SecurityGroupIds=[ 'sg-7cc73803' ], CacheSubnetGroupName='default' ) logging.info(response)

프로그램을 실행하려면 다음 명령을 입력합니다.

python ClusterModeEnabledWithRBAC.py

자세한 내용은 에서 클러스터 관리 ElastiCache 단원을 참조하십시오.

사용자/사용자 그룹이 있는지 확인하고 없으면 생성

를 사용하면 사용자를 RBAC생성하고 액세스 문자열을 사용하여 특정 권한을 할당할 수 있습니다. 사용자를 특정 역할(관리자, 인적 자원)과 정렬된 사용자 그룹에 할당한 다음 하나 이상의 ElastiCache (Redis OSS) 복제 그룹에 배포합니다. 이렇게 하면 동일한 Valkey 또는 Redis OSS 복제 그룹 또는 그룹을 사용하여 클라이언트 간에 보안 경계를 설정하고 클라이언트가 서로의 데이터에 액세스하지 못하게 할 수 있습니다. 자세한 내용은 역할 기반 액세스 제어(RBAC) 단원을 참조하십시오.

다음 프로그램을 복사하여 UserAndUserGroups.py라는 파일에 붙여 넣습니다. 보안 인증 정보 제공 메커니즘을 업데이트합니다. 이 예시의 보안 인증 정보는 교체 가능한 것으로 표시되며 선언되지 않은 항목이 할당됩니다. 보안 인증 정보를 하드 코딩하지 마시기 바랍니다.

이 예제에서는 사용자에 대한 권한이 있는 액세스 문자열을 사용합니다. 액세스 문자열에 대한 자세한 내용은 섹션을 참조하세요액세스 문자열을 사용하여 권한 지정.

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def check_user_exists(UserId): """Checks if UserId exists Returns True if UserId exists, otherwise False :param UserId: ElastiCache User ID :return: True|False """ try: response = client.describe_users( UserId=UserId, ) if response['Users'][0]['UserId'].lower() == UserId.lower(): return True except Exception as e: if e.response['Error']['Code'] == 'UserNotFound': logging.info(e.response['Error']) return False else: raise def check_group_exists(UserGroupId): """Checks if UserGroupID exists Returns True if Group ID exists, otherwise False :param UserGroupId: ElastiCache User ID :return: True|False """ try: response = client.describe_user_groups( UserGroupId=UserGroupId ) if response['UserGroups'][0]['UserGroupId'].lower() == UserGroupId.lower(): return True except Exception as e: if e.response['Error']['Code'] == 'UserGroupNotFound': logging.info(e.response['Error']) return False else: raise def create_user(UserId=None,UserName=None,Password=None,AccessString=None): """Creates a new user Returns the ARN for the newly created user or the error message :param UserId: ElastiCache user ID. User IDs must be unique :param UserName: ElastiCache user name. ElastiCache allows multiple users with the same name as long as the associated user ID is unique. :param Password: Password for user. Must have at least 16 chars. :param AccessString: Access string with the permissions for the user. :return: user ARN """ try: response = client.create_user( UserId=UserId, UserName=UserName, Engine='Redis', Passwords=[Password], AccessString=AccessString, NoPasswordRequired=False ) return response['ARN'] except Exception as e: logging.info(e.response['Error']) return e.response['Error'] def create_group(UserGroupId=None, UserIds=None): """Creates a new group. A default user is required (mandatory) and should be specified in the UserIds list Return: Group ARN :param UserIds: List with user IDs to be associated with the new group. A default user is required :param UserGroupId: The ID (name) for the group :return: Group ARN """ try: response = client.create_user_group( UserGroupId=UserGroupId, Engine='Redis', UserIds=UserIds ) return response['ARN'] except Exception as e: logging.info(e.response['Error']) if __name__ == '__main__': groupName='mygroup2' userName = 'myuser2' userId=groupName+'-'+userName # Creates a new user if the user ID does not exist. for tmpUserId,tmpUserName in [ (userId,userName), (groupName+'-default','default')]: if not check_user_exists(tmpUserId): response=create_user(UserId=tmpUserId, UserName=EXAMPLE,Password=EXAMPLE,AccessString='on ~* +@all') logging.info(response) # assigns the new user ID to the user group if not check_group_exists(groupName): UserIds = [ userId , groupName+'-default'] response=create_group(UserGroupId=groupName,UserIds=UserIds) logging.info(response)

프로그램을 실행하려면 다음 명령을 입력합니다.

python UserAndUserGroups.py

자습서: 에 연결 ElastiCache

다음 예제에서는 Valkey 또는 Redis OSS 클라이언트를 사용하여 에 연결합니다 ElastiCache.

클러스터 모드가 비활성화된 클러스터에 연결

다음 프로그램을 복사하여 ConnectClusterModeDisabled.py라는 파일에 붙여 넣습니다. 보안 인증 정보 제공 메커니즘을 업데이트합니다. 이 예시의 보안 인증 정보는 교체 가능한 것으로 표시되며 선언되지 않은 항목이 할당됩니다. 보안 인증 정보를 하드 코딩하지 마시기 바랍니다.

from redis import Redis import logging logging.basicConfig(level=logging.INFO) redis = Redis(host='primary.xxx.yyyyyy.zzz1.cache.amazonaws.com', port=6379, decode_responses=True, ssl=True, username=example, password=EXAMPLE) if redis.ping(): logging.info("Connected to Redis")

프로그램을 실행하려면 다음 명령을 입력합니다.

python ConnectClusterModeDisabled.py

클러스터 모드가 활성화된 클러스터에 연결

다음 프로그램을 복사하여 ConnectClusterModeEnabled.py라는 파일에 붙여 넣습니다.

from rediscluster import RedisCluster import logging logging.basicConfig(level=logging.INFO) redis = RedisCluster(startup_nodes=[{"host": "xxx.yyy.clustercfg.zzz1.cache.amazonaws.com","port": "6379"}], decode_responses=True,skip_full_coverage_check=True) if redis.ping(): logging.info("Connected to Redis")

프로그램을 실행하려면 다음 명령을 입력합니다.

python ConnectClusterModeEnabled.py

사용 예제:

다음 예제에서는 의 boto3SDK ElastiCache 를 사용하여 ElastiCache (Redis OSS)를 사용합니다.

문자열 설정 및 가져오기

다음 프로그램을 복사하여 SetAndGetStrings.py라는 파일에 붙여 넣습니다.

import time import logging logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s') keyName='mykey' currTime=time.ctime(time.time()) # Set the key 'mykey' with the current date and time as value. # The Key will expire and removed from cache in 60 seconds. redis.set(keyName, currTime, ex=60) # Sleep just for better illustration of TTL (expiration) value time.sleep(5) # Retrieve the key value and current TTL keyValue=redis.get(keyName) keyTTL=redis.ttl(keyName) logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValue, keyTTL))

프로그램을 실행하려면 다음 명령을 입력합니다.

python SetAndGetStrings.py

여러 항목이 있는 해시 설정 및 가져오기

다음 프로그램을 복사하여 SetAndGetHash.py라는 파일에 붙여 넣습니다.

import logging import time logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s') keyName='mykey' keyValues={'datetime': time.ctime(time.time()), 'epochtime': time.time()} # Set the hash 'mykey' with the current date and time in human readable format (datetime field) and epoch number (epochtime field). redis.hset(keyName, mapping=keyValues) # Set the key to expire and removed from cache in 60 seconds. redis.expire(keyName, 60) # Sleep just for better illustration of TTL (expiration) value time.sleep(5) # Retrieves all the fields and current TTL keyValues=redis.hgetall(keyName) keyTTL=redis.ttl(keyName) logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValues, keyTTL))

프로그램을 실행하려면 다음 명령을 입력합니다.

python SetAndGetHash.py

게시/구독 채널의 게시(쓰기) 및 구독(읽기)

다음 프로그램을 복사하여 PubAndSub.py라는 파일에 붙여 넣습니다.

import logging import time def handlerFunction(message): """Prints message got from PubSub channel to the log output Return None :param message: message to log """ logging.info(message) logging.basicConfig(level=logging.INFO) redis = Redis(host="redis202104053.tihewd.ng.0001.use1.cache.amazonaws.com", port=6379, decode_responses=True) # Creates the subscriber connection on "mychannel" subscriber = redis.pubsub() subscriber.subscribe(**{'mychannel': handlerFunction}) # Creates a new thread to watch for messages while the main process continues with its routines thread = subscriber.run_in_thread(sleep_time=0.01) # Creates publisher connection on "mychannel" redis.publish('mychannel', 'My message') # Publishes several messages. Subscriber thread will read and print on log. while True: redis.publish('mychannel',time.ctime(time.time())) time.sleep(1)

프로그램을 실행하려면 다음 명령을 입력합니다.

python PubAndSub.py

스트림의 쓰기 및 읽기

다음 프로그램을 복사하여 ReadWriteStream.py라는 파일에 붙여 넣습니다.

from redis import Redis import redis.exceptions as exceptions import logging import time import threading logging.basicConfig(level=logging.INFO) def writeMessage(streamName): """Starts a loop writting the current time and thread name to 'streamName' :param streamName: Stream (key) name to write messages. """ fieldsDict={'writerId':threading.currentThread().getName(),'myvalue':None} while True: fieldsDict['myvalue'] = time.ctime(time.time()) redis.xadd(streamName,fieldsDict) time.sleep(1) def readMessage(groupName=None,streamName=None): """Starts a loop reading from 'streamName' Multiple threads will read from the same stream consumer group. Consumer group is used to coordinate data distribution. Once a thread acknowleges the message, it won't be provided again. If message wasn't acknowledged, it can be served to another thread. :param groupName: stream group were multiple threads will read. :param streamName: Stream (key) name where messages will be read. """ readerID=threading.currentThread().getName() while True: try: # Check if the stream has any message if redis.xlen(streamName)>0: # Check if if the messages are new (not acknowledged) or not (already processed) streamData=redis.xreadgroup(groupName,readerID,{streamName:'>'},count=1) if len(streamData) > 0: msgId,message = streamData[0][1][0] logging.info("{}: Got {} from ID {}".format(readerID,message,msgId)) #Do some processing here. If the message has been processed sucessfuly, acknowledge it and (optional) delete the message. redis.xack(streamName,groupName,msgId) logging.info("Stream message ID {} read and processed successfuly by {}".format(msgId,readerID)) redis.xdel(streamName,msgId) else: pass except: raise time.sleep(0.5) # Creates the stream 'mystream' and consumer group 'myworkergroup' where multiple threads will write/read. try: redis.xgroup_create('mystream','myworkergroup',mkstream=True) except exceptions.ResponseError as e: logging.info("Consumer group already exists. Will continue despite the error: {}".format(e)) except: raise # Starts 5 writer threads. for writer_no in range(5): writerThread = threading.Thread(target=writeMessage, name='writer-'+str(writer_no), args=('mystream',),daemon=True) writerThread.start() # Starts 10 reader threads for reader_no in range(10): readerThread = threading.Thread(target=readMessage, name='reader-'+str(reader_no), args=('myworkergroup','mystream',),daemon=True) readerThread.daemon = True readerThread.start() # Keep the code running for 30 seconds time.sleep(30)

프로그램을 실행하려면 다음 명령을 입력합니다.

python ReadWriteStream.py