Python dan ElastiCache - Amazon ElastiCache

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Python dan ElastiCache

Dalam tutorial ini, Anda menggunakan AWS SDK untuk Python (Boto3) untuk menulis program sederhana untuk melakukan operasi ElastiCache (Redis) berikut: OSS

  • Buat klaster ElastiCache (RedisOSS) (mode cluster diaktifkan dan mode cluster dinonaktifkan)

  • Periksa apakah pengguna atau grup pengguna ada, jika tidak, buat mereka. (Fitur ini tersedia dengan Valkey 7.2 dan seterusnya, dan dengan OSS Redis 6.0 dan seterusnya.)

  • Connect ke ElastiCache

  • Lakukan operasi seperti mengatur dan mendapatkan string, membaca dari dan menulis ke aliran, serta memublikasikan dan berlangganan dari saluran Pub/Sub.

Saat Anda mengerjakan tutorial ini, Anda dapat merujuk ke dokumentasi AWS SDK untuk Python (Boto). Bagian berikut khusus untuk ElastiCache: klien ElastiCache tingkat rendah

Prasyarat Tutorial

Tutorial: Membuat ElastiCache cluster dan pengguna

Contoh berikut menggunakan boto3 SDK untuk ElastiCache (RedisOSS) operasi manajemen (cluster atau user creation) dan redis-py-cluster redis-py/ untuk penanganan data.

Buat klaster dengan mode klaster dinonaktifkan

Salin program berikut dan tempel ke dalam file bernama CreateClusterModeDisabledCluster.py.

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_disabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumCacheClusters=2,ReplicationGroupDescription='Sample cache cluster',ReplicationGroupId=None): """Creates an ElastiCache Cluster with cluster mode disabled Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumCacheClusters: Number of nodes in the cluster. Minimum 1 (just a primary node) and maximun 6 (1 primary and 5 replicas). If not specified, cluster will be created with 1 primary and 1 replica. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Name for the cluster :return: dictionary with the API results """ if not ReplicationGroupId: return 'ReplicationGroupId parameter is required' response = client.create_replication_group( AutomaticFailoverEnabled=True, CacheNodeType=CacheNodeType, Engine='valkey', EngineVersion=EngineVersion, NumCacheClusters=NumCacheClusters, ReplicationGroupDescription=ReplicationGroupDescription, ReplicationGroupId=ReplicationGroupId, SnapshotRetentionLimit=30, ) return response if __name__ == '__main__': # Creates an ElastiCache Cluster mode disabled cluster, based on cache.m6g.large nodes, Valkey 7.2, one primary and two replicas elasticacheResponse = create_cluster_mode_disabled( #CacheNodeType='cache.m6g.large', EngineVersion='7.2', NumCacheClusters=3, ReplicationGroupDescription='Valkey cluster mode disabled with replicas', ReplicationGroupId='valkey202104053' ) logging.info(elasticacheResponse)

Untuk menjalankan program ini, masukkan perintah berikut:

python CreateClusterModeDisabledCluster.py

Untuk informasi selengkapnya, lihat Mengelola cluster di ElastiCache.

Buat cluster yang dinonaktifkan mode cluster dengan TLS dan RBAC

Untuk memastikan keamanan, Anda dapat menggunakan Transport Layer Security (TLS) dan Role-Based Access Control (RBAC) saat membuat cluster yang dinonaktifkan mode cluster. Tidak seperti Valkey atau Redis OSSAUTH, di mana semua klien yang diautentikasi memiliki akses grup replikasi penuh jika token mereka diautentikasi, RBAC memungkinkan Anda untuk mengontrol akses cluster melalui grup pengguna. Grup pengguna ini dirancang sebagai cara untuk mengatur akses ke grup replikasi. Untuk informasi selengkapnya, lihat Kontrol Akses Berbasis Peran () RBAC.

Salin program berikut dan tempel ke dalam file bernama ClusterModeDisabledWithRBAC.py.

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_disabled_rbac(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumCacheClusters=2,ReplicationGroupDescription='Sample cache cluster',ReplicationGroupId=None, UserGroupIds=None, SecurityGroupIds=None,CacheSubnetGroupName=None): """Creates an ElastiCache Cluster with cluster mode disabled and RBAC Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumCacheClusters: Number of nodes in the cluster. Minimum 1 (just a primary node) and maximun 6 (1 primary and 5 replicas). If not specified, cluster will be created with 1 primary and 1 replica. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Mandatory name for the cluster. :param UserGroupIds: The ID of the user group to be assigned to the cluster. :param SecurityGroupIds: List of security groups to be assigned. If not defined, default will be used :param CacheSubnetGroupName: subnet group where the cluster will be placed. If not defined, default will be used. :return: dictionary with the API results """ if not ReplicationGroupId: return {'Error': 'ReplicationGroupId parameter is required'} elif not isinstance(UserGroupIds,(list)): return {'Error': 'UserGroupIds parameter is required and must be a list'} params={'AutomaticFailoverEnabled': True, 'CacheNodeType': CacheNodeType, 'Engine': 'valkey', 'EngineVersion': EngineVersion, 'NumCacheClusters': NumCacheClusters, 'ReplicationGroupDescription': ReplicationGroupDescription, 'ReplicationGroupId': ReplicationGroupId, 'SnapshotRetentionLimit': 30, 'TransitEncryptionEnabled': True, 'UserGroupIds':UserGroupIds } # defaults will be used if CacheSubnetGroupName or SecurityGroups are not explicit. if isinstance(SecurityGroupIds,(list)): params.update({'SecurityGroupIds':SecurityGroupIds}) if CacheSubnetGroupName: params.update({'CacheSubnetGroupName':CacheSubnetGroupName}) response = client.create_replication_group(**params) return response if __name__ == '__main__': # Creates an ElastiCache Cluster mode disabled cluster, based on cache.m6g.large nodes, Valkey 7.2, one primary and two replicas. # Assigns the existent user group "mygroup" for RBAC authentication response=create_cluster_mode_disabled_rbac( CacheNodeType='cache.m6g.large', EngineVersion='7.2', NumCacheClusters=3, ReplicationGroupDescription='Valkey cluster mode disabled with replicas', ReplicationGroupId='valkey202104', UserGroupIds=[ 'mygroup' ], SecurityGroupIds=[ 'sg-7cc73803' ], CacheSubnetGroupName='default' ) logging.info(response)

Untuk menjalankan program ini, masukkan perintah berikut:

python ClusterModeDisabledWithRBAC.py

Untuk informasi selengkapnya, lihat Mengelola cluster di ElastiCache.

Membuat klaster dengan mode klaster diaktifkan

Salin program berikut dan tempel ke dalam file bernama ClusterModeEnabled.py.

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_enabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumNodeGroups=1,ReplicasPerNodeGroup=1, ReplicationGroupDescription='Sample cache with cluster mode enabled',ReplicationGroupId=None): """Creates an ElastiCache Cluster with cluster mode enabled Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumNodeGroups: Number of shards in the cluster. Minimum 1 and maximun 90. If not specified, cluster will be created with 1 shard. :param ReplicasPerNodeGroup: Number of replicas per shard. If not specified 1 replica per shard will be created. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Name for the cluster :return: dictionary with the API results """ if not ReplicationGroupId: return 'ReplicationGroupId parameter is required' response = client.create_replication_group( AutomaticFailoverEnabled=True, CacheNodeType=CacheNodeType, Engine='valkey', EngineVersion=EngineVersion, ReplicationGroupDescription=ReplicationGroupDescription, ReplicationGroupId=ReplicationGroupId, # Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary node (implicit) and 2 replicas (replicasPerNodeGroup) NumNodeGroups=NumNodeGroups, ReplicasPerNodeGroup=ReplicasPerNodeGroup, CacheParameterGroupName='default.valkey7.2.cluster.on' ) return response # Creates a cluster mode enabled response = create_cluster_mode_enabled( CacheNodeType='cache.m6g.large', EngineVersion='6.0', ReplicationGroupDescription='Valkey cluster mode enabled with replicas', ReplicationGroupId='valkey20210', # Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary (implicit) and 2 replicas (replicasPerNodeGroup) NumNodeGroups=2, ReplicasPerNodeGroup=1, ) logging.info(response)

Untuk menjalankan program ini, masukkan perintah berikut:

python ClusterModeEnabled.py

Untuk informasi selengkapnya, lihat Mengelola cluster di ElastiCache.

Buat cluster yang diaktifkan mode cluster dengan TLS dan RBAC

Untuk memastikan keamanan, Anda dapat menggunakan Transport Layer Security (TLS) dan Role-Based Access Control (RBAC) saat membuat cluster yang diaktifkan mode cluster. Tidak seperti Valkey atau Redis OSSAUTH, di mana semua klien yang diautentikasi memiliki akses grup replikasi penuh jika token mereka diautentikasi, RBAC memungkinkan Anda untuk mengontrol akses cluster melalui grup pengguna. Grup pengguna ini dirancang sebagai cara untuk mengatur akses ke grup replikasi. Untuk informasi selengkapnya, lihat Kontrol Akses Berbasis Peran () RBAC.

Salin program berikut dan tempel ke dalam file bernama ClusterModeEnabledWithRBAC.py.

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_enabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumNodeGroups=1,ReplicasPerNodeGroup=1, ReplicationGroupDescription='Sample cache with cluster mode enabled',ReplicationGroupId=None,UserGroupIds=None, SecurityGroupIds=None,CacheSubnetGroupName=None,CacheParameterGroupName='default.valkey7.2.cluster.on'): """Creates an ElastiCache Cluster with cluster mode enabled and RBAC Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumNodeGroups: Number of shards in the cluster. Minimum 1 and maximun 90. If not specified, cluster will be created with 1 shard. :param ReplicasPerNodeGroup: Number of replicas per shard. If not specified 1 replica per shard will be created. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Name for the cluster. :param CacheParameterGroupName: Parameter group to be used. Must be compatible with the engine version and cluster mode enabled. :return: dictionary with the API results """ if not ReplicationGroupId: return 'ReplicationGroupId parameter is required' elif not isinstance(UserGroupIds,(list)): return {'Error': 'UserGroupIds parameter is required and must be a list'} params={'AutomaticFailoverEnabled': True, 'CacheNodeType': CacheNodeType, 'Engine': 'valkey', 'EngineVersion': EngineVersion, 'ReplicationGroupDescription': ReplicationGroupDescription, 'ReplicationGroupId': ReplicationGroupId, 'SnapshotRetentionLimit': 30, 'TransitEncryptionEnabled': True, 'UserGroupIds':UserGroupIds, 'NumNodeGroups': NumNodeGroups, 'ReplicasPerNodeGroup': ReplicasPerNodeGroup, 'CacheParameterGroupName': CacheParameterGroupName } # defaults will be used if CacheSubnetGroupName or SecurityGroups are not explicit. if isinstance(SecurityGroupIds,(list)): params.update({'SecurityGroupIds':SecurityGroupIds}) if CacheSubnetGroupName: params.update({'CacheSubnetGroupName':CacheSubnetGroupName}) response = client.create_replication_group(**params) return response if __name__ == '__main__': # Creates a cluster mode enabled cluster response = create_cluster_mode_enabled( CacheNodeType='cache.m6g.large', EngineVersion='7.2', ReplicationGroupDescription='Valkey cluster mode enabled with replicas', ReplicationGroupId='valkey2021', # Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary (implicit) and 2 replicas (replicasPerNodeGroup) NumNodeGroups=2, ReplicasPerNodeGroup=1, UserGroupIds=[ 'mygroup' ], SecurityGroupIds=[ 'sg-7cc73803' ], CacheSubnetGroupName='default' ) logging.info(response)

Untuk menjalankan program ini, masukkan perintah berikut:

python ClusterModeEnabledWithRBAC.py

Untuk informasi selengkapnya, lihat Mengelola cluster di ElastiCache.

Periksa apakah pengguna/grup pengguna sudah ada, buat jika belum ada

DenganRBAC, Anda membuat pengguna dan menetapkan mereka izin tertentu dengan menggunakan string akses. Anda menetapkan pengguna ke grup pengguna yang selaras dengan peran tertentu (administrator, sumber daya manusia) yang kemudian disebarkan ke satu atau beberapa grup replikasi ElastiCache (OSSRedis). Dengan melakukan ini, Anda dapat menetapkan batas keamanan antara klien menggunakan grup atau grup OSS replikasi Valkey atau Redis yang sama dan mencegah klien mengakses data satu sama lain. Untuk informasi selengkapnya, lihat Kontrol Akses Berbasis Peran () RBAC.

Salin program berikut dan tempel ke dalam file bernama UserAndUserGroups.py. Perbarui mekanisme untuk memberikan kredensial. Kredensial dalam contoh ini ditampilkan sebagai dapat diganti dan diberikan item yang tidak dideklarasikan. Hindari kredensial hard-coding.

Contoh ini menggunakan string akses dengan izin untuk pengguna. Untuk informasi selengkapnya tentang string akses, lihatMenentukan Izin Menggunakan String Akses.

import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def check_user_exists(UserId): """Checks if UserId exists Returns True if UserId exists, otherwise False :param UserId: ElastiCache User ID :return: True|False """ try: response = client.describe_users( UserId=UserId, ) if response['Users'][0]['UserId'].lower() == UserId.lower(): return True except Exception as e: if e.response['Error']['Code'] == 'UserNotFound': logging.info(e.response['Error']) return False else: raise def check_group_exists(UserGroupId): """Checks if UserGroupID exists Returns True if Group ID exists, otherwise False :param UserGroupId: ElastiCache User ID :return: True|False """ try: response = client.describe_user_groups( UserGroupId=UserGroupId ) if response['UserGroups'][0]['UserGroupId'].lower() == UserGroupId.lower(): return True except Exception as e: if e.response['Error']['Code'] == 'UserGroupNotFound': logging.info(e.response['Error']) return False else: raise def create_user(UserId=None,UserName=None,Password=None,AccessString=None): """Creates a new user Returns the ARN for the newly created user or the error message :param UserId: ElastiCache user ID. User IDs must be unique :param UserName: ElastiCache user name. ElastiCache allows multiple users with the same name as long as the associated user ID is unique. :param Password: Password for user. Must have at least 16 chars. :param AccessString: Access string with the permissions for the user. :return: user ARN """ try: response = client.create_user( UserId=UserId, UserName=UserName, Engine='Redis', Passwords=[Password], AccessString=AccessString, NoPasswordRequired=False ) return response['ARN'] except Exception as e: logging.info(e.response['Error']) return e.response['Error'] def create_group(UserGroupId=None, UserIds=None): """Creates a new group. A default user is required (mandatory) and should be specified in the UserIds list Return: Group ARN :param UserIds: List with user IDs to be associated with the new group. A default user is required :param UserGroupId: The ID (name) for the group :return: Group ARN """ try: response = client.create_user_group( UserGroupId=UserGroupId, Engine='Redis', UserIds=UserIds ) return response['ARN'] except Exception as e: logging.info(e.response['Error']) if __name__ == '__main__': groupName='mygroup2' userName = 'myuser2' userId=groupName+'-'+userName # Creates a new user if the user ID does not exist. for tmpUserId,tmpUserName in [ (userId,userName), (groupName+'-default','default')]: if not check_user_exists(tmpUserId): response=create_user(UserId=tmpUserId, UserName=EXAMPLE,Password=EXAMPLE,AccessString='on ~* +@all') logging.info(response) # assigns the new user ID to the user group if not check_group_exists(groupName): UserIds = [ userId , groupName+'-default'] response=create_group(UserGroupId=groupName,UserIds=UserIds) logging.info(response)

Untuk menjalankan program ini, masukkan perintah berikut:

python UserAndUserGroups.py

Tutorial: Menghubungkan ke ElastiCache

Contoh berikut menggunakan OSS klien Valkey atau Redis untuk terhubung. ElastiCache

Menghubungkan ke klaster yang menonaktifkan mode klaster

Salin program berikut dan tempel ke dalam file bernama ConnectClusterModeDisabled.py. Perbarui mekanisme untuk memberikan kredensial. Kredensial dalam contoh ini ditampilkan sebagai dapat diganti dan diberikan item yang tidak dideklarasikan. Hindari kredensial hard-coding.

from redis import Redis import logging logging.basicConfig(level=logging.INFO) redis = Redis(host='primary.xxx.yyyyyy.zzz1.cache.amazonaws.com', port=6379, decode_responses=True, ssl=True, username=example, password=EXAMPLE) if redis.ping(): logging.info("Connected to Redis")

Untuk menjalankan program ini, masukkan perintah berikut:

python ConnectClusterModeDisabled.py

Menghubungkan ke klaster yang mengaktifkan mode klaster

Salin program berikut dan tempel ke dalam file bernama ConnectClusterModeEnabled.py.

from rediscluster import RedisCluster import logging logging.basicConfig(level=logging.INFO) redis = RedisCluster(startup_nodes=[{"host": "xxx.yyy.clustercfg.zzz1.cache.amazonaws.com","port": "6379"}], decode_responses=True,skip_full_coverage_check=True) if redis.ping(): logging.info("Connected to Redis")

Untuk menjalankan program ini, masukkan perintah berikut:

python ConnectClusterModeEnabled.py

Contoh penggunaan

Contoh berikut menggunakan boto3 SDK ElastiCache untuk bekerja dengan ElastiCache (OSSRedis).

Menetapkan dan Mendapatkan string

Salin program berikut dan tempel ke dalam file bernama SetAndGetStrings.py.

import time import logging logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s') keyName='mykey' currTime=time.ctime(time.time()) # Set the key 'mykey' with the current date and time as value. # The Key will expire and removed from cache in 60 seconds. redis.set(keyName, currTime, ex=60) # Sleep just for better illustration of TTL (expiration) value time.sleep(5) # Retrieve the key value and current TTL keyValue=redis.get(keyName) keyTTL=redis.ttl(keyName) logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValue, keyTTL))

Untuk menjalankan program ini, masukkan perintah berikut:

python SetAndGetStrings.py

Tentukan dan Dapatkan hash dengan beberapa item

Salin program berikut dan tempel ke dalam file bernama SetAndGetHash.py.

import logging import time logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s') keyName='mykey' keyValues={'datetime': time.ctime(time.time()), 'epochtime': time.time()} # Set the hash 'mykey' with the current date and time in human readable format (datetime field) and epoch number (epochtime field). redis.hset(keyName, mapping=keyValues) # Set the key to expire and removed from cache in 60 seconds. redis.expire(keyName, 60) # Sleep just for better illustration of TTL (expiration) value time.sleep(5) # Retrieves all the fields and current TTL keyValues=redis.hgetall(keyName) keyTTL=redis.ttl(keyName) logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValues, keyTTL))

Untuk menjalankan program ini, masukkan perintah berikut:

python SetAndGetHash.py

Publikasikan (tulis) dan berlangganan (baca) dari saluran Pub/Sub

Salin program berikut dan tempel ke dalam file bernama PubAndSub.py.

import logging import time def handlerFunction(message): """Prints message got from PubSub channel to the log output Return None :param message: message to log """ logging.info(message) logging.basicConfig(level=logging.INFO) redis = Redis(host="redis202104053.tihewd.ng.0001.use1.cache.amazonaws.com", port=6379, decode_responses=True) # Creates the subscriber connection on "mychannel" subscriber = redis.pubsub() subscriber.subscribe(**{'mychannel': handlerFunction}) # Creates a new thread to watch for messages while the main process continues with its routines thread = subscriber.run_in_thread(sleep_time=0.01) # Creates publisher connection on "mychannel" redis.publish('mychannel', 'My message') # Publishes several messages. Subscriber thread will read and print on log. while True: redis.publish('mychannel',time.ctime(time.time())) time.sleep(1)

Untuk menjalankan program ini, masukkan perintah berikut:

python PubAndSub.py

Tulis dan baca dari aliran

Salin program berikut dan tempel ke dalam file bernama ReadWriteStream.py.

from redis import Redis import redis.exceptions as exceptions import logging import time import threading logging.basicConfig(level=logging.INFO) def writeMessage(streamName): """Starts a loop writting the current time and thread name to 'streamName' :param streamName: Stream (key) name to write messages. """ fieldsDict={'writerId':threading.currentThread().getName(),'myvalue':None} while True: fieldsDict['myvalue'] = time.ctime(time.time()) redis.xadd(streamName,fieldsDict) time.sleep(1) def readMessage(groupName=None,streamName=None): """Starts a loop reading from 'streamName' Multiple threads will read from the same stream consumer group. Consumer group is used to coordinate data distribution. Once a thread acknowleges the message, it won't be provided again. If message wasn't acknowledged, it can be served to another thread. :param groupName: stream group were multiple threads will read. :param streamName: Stream (key) name where messages will be read. """ readerID=threading.currentThread().getName() while True: try: # Check if the stream has any message if redis.xlen(streamName)>0: # Check if if the messages are new (not acknowledged) or not (already processed) streamData=redis.xreadgroup(groupName,readerID,{streamName:'>'},count=1) if len(streamData) > 0: msgId,message = streamData[0][1][0] logging.info("{}: Got {} from ID {}".format(readerID,message,msgId)) #Do some processing here. If the message has been processed sucessfuly, acknowledge it and (optional) delete the message. redis.xack(streamName,groupName,msgId) logging.info("Stream message ID {} read and processed successfuly by {}".format(msgId,readerID)) redis.xdel(streamName,msgId) else: pass except: raise time.sleep(0.5) # Creates the stream 'mystream' and consumer group 'myworkergroup' where multiple threads will write/read. try: redis.xgroup_create('mystream','myworkergroup',mkstream=True) except exceptions.ResponseError as e: logging.info("Consumer group already exists. Will continue despite the error: {}".format(e)) except: raise # Starts 5 writer threads. for writer_no in range(5): writerThread = threading.Thread(target=writeMessage, name='writer-'+str(writer_no), args=('mystream',),daemon=True) writerThread.start() # Starts 10 reader threads for reader_no in range(10): readerThread = threading.Thread(target=readMessage, name='reader-'+str(reader_no), args=('myworkergroup','mystream',),daemon=True) readerThread.daemon = True readerThread.start() # Keep the code running for 30 seconds time.sleep(30)

Untuk menjalankan program ini, masukkan perintah berikut:

python ReadWriteStream.py