翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Python と ElastiCache
このチュートリアルでは、 for Python (Boto3) を使用して AWS SDK、次の ElastiCache (Redis OSS) オペレーションを実行するシンプルなプログラムを記述します。
ElastiCache (Redis OSS) クラスターの作成 (クラスターモードが有効、クラスターモードが無効)
ユーザーまたはユーザーグループが存在するかどうかを確認し、存在しない場合は作成します。(この機能は、Valkey 7.2 以降、および Redis 6.0 OSS 以降で使用できます)。
に接続 ElastiCache
文字列の設定と取得、ストリームからの読み取りと書き込み、Pub/Sub チャンネルからの公開と登録などのオペレーションを実行します。
このチュートリアルを進めるときは、 for Python (Boto) ドキュメントを参照してください AWS SDK。以下のセクションは、ElastiCache 低レベルクライアント
チュートリアルの前提条件
を使用するように AWS アクセスキーを設定します AWS SDKs。詳細については、「のセットアップ ElastiCache」を参照してください。
Python 3.0 以降をインストールします。詳細については、https://www.python.org/downloads
を参照してください。手順については、Boto 3 ドキュメントの「クイックスタート 」を参照してください。
チュートリアル: ElastiCache クラスターとユーザーの作成
次の例では、boto3 を SDK ElastiCache (Redis OSS) 管理オペレーション (クラスターまたはユーザー作成) および redis-py/for data handling redis-py-cluster に使用します。
トピック
クラスタモードが無効のクラスターを作成する
次のプログラムをコピーし、CreateClusterModeDisabledCluster.py という名前のファイルに貼り付けます。
import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_disabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumCacheClusters=2,ReplicationGroupDescription='Sample cache cluster',ReplicationGroupId=None): """Creates an ElastiCache Cluster with cluster mode disabled Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumCacheClusters: Number of nodes in the cluster. Minimum 1 (just a primary node) and maximun 6 (1 primary and 5 replicas). If not specified, cluster will be created with 1 primary and 1 replica. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Name for the cluster :return: dictionary with the API results """ if not ReplicationGroupId: return 'ReplicationGroupId parameter is required' response = client.create_replication_group( AutomaticFailoverEnabled=True, CacheNodeType=CacheNodeType, Engine='valkey', EngineVersion=EngineVersion, NumCacheClusters=NumCacheClusters, ReplicationGroupDescription=ReplicationGroupDescription, ReplicationGroupId=ReplicationGroupId, SnapshotRetentionLimit=30, ) return response if __name__ == '__main__': # Creates an ElastiCache Cluster mode disabled cluster, based on cache.m6g.large nodes, Valkey 7.2, one primary and two replicas elasticacheResponse = create_cluster_mode_disabled( #CacheNodeType='cache.m6g.large', EngineVersion='7.2', NumCacheClusters=3, ReplicationGroupDescription='Valkey cluster mode disabled with replicas', ReplicationGroupId='valkey202104053' ) logging.info(elasticacheResponse)
このプログラムを実行するには、次のコマンドを入力します。
python CreateClusterModeDisabledCluster.py
詳細については、「でのクラスターの管理 ElastiCache」を参照してください。
TLS および を使用してクラスターモードが無効になっているクラスターを作成する RBAC
セキュリティを確保するために、Transport Layer Security (TLS) と Role-Based Access Control (RBAC) を使用して、クラスターモードが無効になっているクラスターを作成できます。トークンが認証された場合OSSAUTH、すべての認証済みクライアントが完全なレプリケーショングループアクセスを持つ Valkey または Redis とは異なり、 RBAC ではユーザーグループを介してクラスターアクセスを制御できます。これらのユーザーグループは、レプリケーショングループへのアクセスを分類する方法として設計されています。詳細については、「ロールベースのアクセスコントロール (RBAC)」を参照してください。
次のプログラムをコピーし、ClusterModeDisabledWithRBAC.py という名前のファイルに貼り付けます。
import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_disabled_rbac(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumCacheClusters=2,ReplicationGroupDescription='Sample cache cluster',ReplicationGroupId=None, UserGroupIds=None, SecurityGroupIds=None,CacheSubnetGroupName=None): """Creates an ElastiCache Cluster with cluster mode disabled and RBAC Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumCacheClusters: Number of nodes in the cluster. Minimum 1 (just a primary node) and maximun 6 (1 primary and 5 replicas). If not specified, cluster will be created with 1 primary and 1 replica. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Mandatory name for the cluster. :param UserGroupIds: The ID of the user group to be assigned to the cluster. :param SecurityGroupIds: List of security groups to be assigned. If not defined, default will be used :param CacheSubnetGroupName: subnet group where the cluster will be placed. If not defined, default will be used. :return: dictionary with the API results """ if not ReplicationGroupId: return {'Error': 'ReplicationGroupId parameter is required'} elif not isinstance(UserGroupIds,(list)): return {'Error': 'UserGroupIds parameter is required and must be a list'} params={'AutomaticFailoverEnabled': True, 'CacheNodeType': CacheNodeType, 'Engine': 'valkey', 'EngineVersion': EngineVersion, 'NumCacheClusters': NumCacheClusters, 'ReplicationGroupDescription': ReplicationGroupDescription, 'ReplicationGroupId': ReplicationGroupId, 'SnapshotRetentionLimit': 30, 'TransitEncryptionEnabled': True, 'UserGroupIds':UserGroupIds } # defaults will be used if CacheSubnetGroupName or SecurityGroups are not explicit. if isinstance(SecurityGroupIds,(list)): params.update({'SecurityGroupIds':SecurityGroupIds}) if CacheSubnetGroupName: params.update({'CacheSubnetGroupName':CacheSubnetGroupName}) response = client.create_replication_group(**params) return response if __name__ == '__main__': # Creates an ElastiCache Cluster mode disabled cluster, based on cache.m6g.large nodes, Valkey 7.2, one primary and two replicas. # Assigns the existent user group "mygroup" for RBAC authentication response=create_cluster_mode_disabled_rbac( CacheNodeType='cache.m6g.large', EngineVersion='7.2', NumCacheClusters=3, ReplicationGroupDescription='Valkey cluster mode disabled with replicas', ReplicationGroupId='valkey202104', UserGroupIds=[ 'mygroup' ], SecurityGroupIds=[ 'sg-7cc73803' ], CacheSubnetGroupName='default' ) logging.info(response)
このプログラムを実行するには、次のコマンドを入力します。
python ClusterModeDisabledWithRBAC.py
詳細については、「でのクラスターの管理 ElastiCache」を参照してください。
クラスタモードが有効のクラスターの作成
次のプログラムをコピーし、ClusterModeEnabled.py という名前のファイルに貼り付けます。
import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_enabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumNodeGroups=1,ReplicasPerNodeGroup=1, ReplicationGroupDescription='Sample cache with cluster mode enabled',ReplicationGroupId=None): """Creates an ElastiCache Cluster with cluster mode enabled Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumNodeGroups: Number of shards in the cluster. Minimum 1 and maximun 90. If not specified, cluster will be created with 1 shard. :param ReplicasPerNodeGroup: Number of replicas per shard. If not specified 1 replica per shard will be created. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Name for the cluster :return: dictionary with the API results """ if not ReplicationGroupId: return 'ReplicationGroupId parameter is required' response = client.create_replication_group( AutomaticFailoverEnabled=True, CacheNodeType=CacheNodeType, Engine='valkey', EngineVersion=EngineVersion, ReplicationGroupDescription=ReplicationGroupDescription, ReplicationGroupId=ReplicationGroupId, # Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary node (implicit) and 2 replicas (replicasPerNodeGroup) NumNodeGroups=NumNodeGroups, ReplicasPerNodeGroup=ReplicasPerNodeGroup, CacheParameterGroupName='default.valkey7.2.cluster.on' ) return response # Creates a cluster mode enabled response = create_cluster_mode_enabled( CacheNodeType='cache.m6g.large', EngineVersion='6.0', ReplicationGroupDescription='Valkey cluster mode enabled with replicas', ReplicationGroupId='valkey20210', # Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary (implicit) and 2 replicas (replicasPerNodeGroup) NumNodeGroups=2, ReplicasPerNodeGroup=1, ) logging.info(response)
このプログラムを実行するには、次のコマンドを入力します。
python ClusterModeEnabled.py
詳細については、「でのクラスターの管理 ElastiCache」を参照してください。
TLS および を使用してクラスターモードが有効になっているクラスターを作成する RBAC
セキュリティを確保するために、クラスターモードが有効になっているクラスターを作成するときに Transport Layer Security (TLS) と Role-Based Access Control (RBAC) を使用できます。トークンが認証された場合OSSAUTH、すべての認証済みクライアントが完全なレプリケーショングループアクセスを持つ Valkey または Redis とは異なり、 RBAC ではユーザーグループを介してクラスターアクセスを制御できます。これらのユーザーグループは、レプリケーショングループへのアクセスを分類する方法として設計されています。詳細については、「ロールベースのアクセスコントロール (RBAC)」を参照してください。
次のプログラムをコピーし、ClusterModeEnabledWithRBAC.py という名前のファイルに貼り付けます。
import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def create_cluster_mode_enabled(CacheNodeType='cache.t3.small',EngineVersion='6.0',NumNodeGroups=1,ReplicasPerNodeGroup=1, ReplicationGroupDescription='Sample cache with cluster mode enabled',ReplicationGroupId=None,UserGroupIds=None, SecurityGroupIds=None,CacheSubnetGroupName=None,CacheParameterGroupName='default.valkey7.2.cluster.on'): """Creates an ElastiCache Cluster with cluster mode enabled and RBAC Returns a dictionary with the API response :param CacheNodeType: Node type used on the cluster. If not specified, cache.t3.small will be used Refer to https://docs.aws.amazon.com/AmazonElastiCache/latest/dg/CacheNodes.SupportedTypes.html for supported node types :param EngineVersion: Engine version to be used. If not specified, latest will be used. :param NumNodeGroups: Number of shards in the cluster. Minimum 1 and maximun 90. If not specified, cluster will be created with 1 shard. :param ReplicasPerNodeGroup: Number of replicas per shard. If not specified 1 replica per shard will be created. :param ReplicationGroupDescription: Description for the cluster. :param ReplicationGroupId: Name for the cluster. :param CacheParameterGroupName: Parameter group to be used. Must be compatible with the engine version and cluster mode enabled. :return: dictionary with the API results """ if not ReplicationGroupId: return 'ReplicationGroupId parameter is required' elif not isinstance(UserGroupIds,(list)): return {'Error': 'UserGroupIds parameter is required and must be a list'} params={'AutomaticFailoverEnabled': True, 'CacheNodeType': CacheNodeType, 'Engine': 'valkey', 'EngineVersion': EngineVersion, 'ReplicationGroupDescription': ReplicationGroupDescription, 'ReplicationGroupId': ReplicationGroupId, 'SnapshotRetentionLimit': 30, 'TransitEncryptionEnabled': True, 'UserGroupIds':UserGroupIds, 'NumNodeGroups': NumNodeGroups, 'ReplicasPerNodeGroup': ReplicasPerNodeGroup, 'CacheParameterGroupName': CacheParameterGroupName } # defaults will be used if CacheSubnetGroupName or SecurityGroups are not explicit. if isinstance(SecurityGroupIds,(list)): params.update({'SecurityGroupIds':SecurityGroupIds}) if CacheSubnetGroupName: params.update({'CacheSubnetGroupName':CacheSubnetGroupName}) response = client.create_replication_group(**params) return response if __name__ == '__main__': # Creates a cluster mode enabled cluster response = create_cluster_mode_enabled( CacheNodeType='cache.m6g.large', EngineVersion='7.2', ReplicationGroupDescription='Valkey cluster mode enabled with replicas', ReplicationGroupId='valkey2021', # Creates a cluster mode enabled cluster with 1 shard(NumNodeGroups), 1 primary (implicit) and 2 replicas (replicasPerNodeGroup) NumNodeGroups=2, ReplicasPerNodeGroup=1, UserGroupIds=[ 'mygroup' ], SecurityGroupIds=[ 'sg-7cc73803' ], CacheSubnetGroupName='default' ) logging.info(response)
このプログラムを実行するには、次のコマンドを入力します。
python ClusterModeEnabledWithRBAC.py
詳細については、「でのクラスターの管理 ElastiCache」を参照してください。
ユーザー/ユーザーグループが存在するかどうかを確認し、そうでない場合は作成する
ではRBAC、ユーザーを作成し、アクセス文字列を使用して特定のアクセス許可を割り当てます。特定のロール (管理者、人事) と連携したユーザーグループにユーザーを割り当て、1 つ以上の ElastiCache (Redis OSS) レプリケーショングループにデプロイします。これにより、同じ Valkey または Redis OSSレプリケーショングループを使用してクライアント間のセキュリティ境界を確立し、クライアントが互いのデータにアクセスできないようにすることができます。詳細については、「ロールベースのアクセスコントロール (RBAC)」を参照してください。
次のプログラムをコピーし、UserAndUserGroups.py という名前のファイルに貼り付けます。認証情報を提供するメカニズムを更新します。この例の認証情報は交換可能と表示され、宣言されていない項目が割り当てられています。認証情報をハードコーディングすることは避けてください。
この例では、ユーザーのアクセス許可を持つアクセス文字列を使用します。アクセス文字列の詳細については、「」を参照してくださいアクセス文字列を使用したアクセス許可の指定。
import boto3 import logging logging.basicConfig(level=logging.INFO) client = boto3.client('elasticache') def check_user_exists(UserId): """Checks if UserId exists Returns True if UserId exists, otherwise False :param UserId: ElastiCache User ID :return: True|False """ try: response = client.describe_users( UserId=UserId, ) if response['Users'][0]['UserId'].lower() == UserId.lower(): return True except Exception as e: if e.response['Error']['Code'] == 'UserNotFound': logging.info(e.response['Error']) return False else: raise def check_group_exists(UserGroupId): """Checks if UserGroupID exists Returns True if Group ID exists, otherwise False :param UserGroupId: ElastiCache User ID :return: True|False """ try: response = client.describe_user_groups( UserGroupId=UserGroupId ) if response['UserGroups'][0]['UserGroupId'].lower() == UserGroupId.lower(): return True except Exception as e: if e.response['Error']['Code'] == 'UserGroupNotFound': logging.info(e.response['Error']) return False else: raise def create_user(UserId=None,UserName=None,Password=None,AccessString=None): """Creates a new user Returns the ARN for the newly created user or the error message :param UserId: ElastiCache user ID. User IDs must be unique :param UserName: ElastiCache user name. ElastiCache allows multiple users with the same name as long as the associated user ID is unique. :param Password: Password for user. Must have at least 16 chars. :param AccessString: Access string with the permissions for the user. :return: user ARN """ try: response = client.create_user( UserId=UserId, UserName=UserName, Engine='Redis', Passwords=[Password], AccessString=AccessString, NoPasswordRequired=False ) return response['ARN'] except Exception as e: logging.info(e.response['Error']) return e.response['Error'] def create_group(UserGroupId=None, UserIds=None): """Creates a new group. A default user is required (mandatory) and should be specified in the UserIds list Return: Group ARN :param UserIds: List with user IDs to be associated with the new group. A default user is required :param UserGroupId: The ID (name) for the group :return: Group ARN """ try: response = client.create_user_group( UserGroupId=UserGroupId, Engine='Redis', UserIds=UserIds ) return response['ARN'] except Exception as e: logging.info(e.response['Error']) if __name__ == '__main__': groupName='mygroup2' userName = 'myuser2' userId=groupName+'-'+userName # Creates a new user if the user ID does not exist. for tmpUserId,tmpUserName in [ (userId,userName), (groupName+'-default','default')]: if not check_user_exists(tmpUserId): response=create_user(UserId=tmpUserId, UserName=
EXAMPLE
,Password=EXAMPLE
,AccessString='on ~* +@all') logging.info(response) # assigns the new user ID to the user group if not check_group_exists(groupName): UserIds = [ userId , groupName+'-default'] response=create_group(UserGroupId=groupName,UserIds=UserIds) logging.info(response)
このプログラムを実行するには、次のコマンドを入力します。
python UserAndUserGroups.py
チュートリアル: への接続 ElastiCache
次の例では、Valkey または Redis OSSクライアントを使用して に接続します ElastiCache。
クラスタモードが無効のクラスターへの接続
次のプログラムをコピーし、ConnectClusterModeDisabled.py という名前のファイルに貼り付けます。認証情報を提供するメカニズムを更新します。この例の認証情報は交換可能と表示され、宣言されていない項目が割り当てられています。認証情報をハードコーディングすることは避けてください。
from redis import Redis import logging logging.basicConfig(level=logging.INFO) redis = Redis(host='primary.xxx.yyyyyy.zzz1.cache.amazonaws.com', port=6379, decode_responses=True, ssl=True, username=
example
, password=EXAMPLE
) if redis.ping(): logging.info("Connected to Redis")
このプログラムを実行するには、次のコマンドを入力します。
python ConnectClusterModeDisabled.py
クラスタモードが有効のクラスターへの接続
次のプログラムをコピーし、ConnectClusterModeEnabled.py という名前のファイルに貼り付けます。
from rediscluster import RedisCluster import logging logging.basicConfig(level=logging.INFO) redis = RedisCluster(startup_nodes=[{"host": "xxx.yyy.clustercfg.zzz1.cache.amazonaws.com","port": "6379"}], decode_responses=True,skip_full_coverage_check=True) if redis.ping(): logging.info("Connected to Redis")
このプログラムを実行するには、次のコマンドを入力します。
python ConnectClusterModeEnabled.py
使用例
次の例では、 SDKの boto3 ElastiCache を使用して ElastiCache (Redis ) を操作しますOSS。
文字列の設定と取得
次のプログラムをコピーし、SetAndGetStrings.py という名前のファイルに貼り付けます。
import time import logging logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s') keyName='mykey' currTime=time.ctime(time.time()) # Set the key 'mykey' with the current date and time as value. # The Key will expire and removed from cache in 60 seconds. redis.set(keyName, currTime, ex=60) # Sleep just for better illustration of TTL (expiration) value time.sleep(5) # Retrieve the key value and current TTL keyValue=redis.get(keyName) keyTTL=redis.ttl(keyName) logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValue, keyTTL))
このプログラムを実行するには、次のコマンドを入力します。
python SetAndGetStrings.py
複数の項目があるハッシュを設定して取得する
次のプログラムをコピーし、SetAndGetHash.py という名前のファイルに貼り付けます。
import logging import time logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s') keyName='mykey' keyValues={'datetime': time.ctime(time.time()), 'epochtime': time.time()} # Set the hash 'mykey' with the current date and time in human readable format (datetime field) and epoch number (epochtime field). redis.hset(keyName, mapping=keyValues) # Set the key to expire and removed from cache in 60 seconds. redis.expire(keyName, 60) # Sleep just for better illustration of TTL (expiration) value time.sleep(5) # Retrieves all the fields and current TTL keyValues=redis.hgetall(keyName) keyTTL=redis.ttl(keyName) logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValues, keyTTL))
このプログラムを実行するには、次のコマンドを入力します。
python SetAndGetHash.py
Pub/Sub チャンネルから公開 (書き込み) および登録 (読み取り) する
次のプログラムをコピーし、PubAndSub.py という名前のファイルに貼り付けます。
import logging import time def handlerFunction(message): """Prints message got from PubSub channel to the log output Return None :param message: message to log """ logging.info(message) logging.basicConfig(level=logging.INFO) redis = Redis(host="redis202104053.tihewd.ng.0001.use1.cache.amazonaws.com", port=6379, decode_responses=True) # Creates the subscriber connection on "mychannel" subscriber = redis.pubsub() subscriber.subscribe(**{'mychannel': handlerFunction}) # Creates a new thread to watch for messages while the main process continues with its routines thread = subscriber.run_in_thread(sleep_time=0.01) # Creates publisher connection on "mychannel" redis.publish('mychannel', 'My message') # Publishes several messages. Subscriber thread will read and print on log. while True: redis.publish('mychannel',time.ctime(time.time())) time.sleep(1)
このプログラムを実行するには、次のコマンドを入力します。
python PubAndSub.py
ストリームからの書き込みおよび読み取り
次のプログラムをコピーし、ReadWriteStream.py という名前のファイルに貼り付けます。
from redis import Redis import redis.exceptions as exceptions import logging import time import threading logging.basicConfig(level=logging.INFO) def writeMessage(streamName): """Starts a loop writting the current time and thread name to 'streamName' :param streamName: Stream (key) name to write messages. """ fieldsDict={'writerId':threading.currentThread().getName(),'myvalue':None} while True: fieldsDict['myvalue'] = time.ctime(time.time()) redis.xadd(streamName,fieldsDict) time.sleep(1) def readMessage(groupName=None,streamName=None): """Starts a loop reading from 'streamName' Multiple threads will read from the same stream consumer group. Consumer group is used to coordinate data distribution. Once a thread acknowleges the message, it won't be provided again. If message wasn't acknowledged, it can be served to another thread. :param groupName: stream group were multiple threads will read. :param streamName: Stream (key) name where messages will be read. """ readerID=threading.currentThread().getName() while True: try: # Check if the stream has any message if redis.xlen(streamName)>0: # Check if if the messages are new (not acknowledged) or not (already processed) streamData=redis.xreadgroup(groupName,readerID,{streamName:'>'},count=1) if len(streamData) > 0: msgId,message = streamData[0][1][0] logging.info("{}: Got {} from ID {}".format(readerID,message,msgId)) #Do some processing here. If the message has been processed sucessfuly, acknowledge it and (optional) delete the message. redis.xack(streamName,groupName,msgId) logging.info("Stream message ID {} read and processed successfuly by {}".format(msgId,readerID)) redis.xdel(streamName,msgId) else: pass except: raise time.sleep(0.5) # Creates the stream 'mystream' and consumer group 'myworkergroup' where multiple threads will write/read. try: redis.xgroup_create('mystream','myworkergroup',mkstream=True) except exceptions.ResponseError as e: logging.info("Consumer group already exists. Will continue despite the error: {}".format(e)) except: raise # Starts 5 writer threads. for writer_no in range(5): writerThread = threading.Thread(target=writeMessage, name='writer-'+str(writer_no), args=('mystream',),daemon=True) writerThread.start() # Starts 10 reader threads for reader_no in range(10): readerThread = threading.Thread(target=readMessage, name='reader-'+str(reader_no), args=('myworkergroup','mystream',),daemon=True) readerThread.daemon = True readerThread.start() # Keep the code running for 30 seconds time.sleep(30)
このプログラムを実行するには、次のコマンドを入力します。
python ReadWriteStream.py