使用以下方法產生具有心率異常的 Kinesis 串流 AWS SDK - AWS SDK 程式碼範例

AWS 文檔 AWS SDK示例 GitHub 回購中有更多SDK示例

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用以下方法產生具有心率異常的 Kinesis 串流 AWS SDK

下列程式碼範例顯示如何產生具有心率異常的 Kinesis 串流。

Python
SDK對於 Python(肉毒桿菌 3)
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在AWS 設定和執行程式碼範例儲存庫

from enum import Enum import json import random import boto3 STREAM_NAME = "ExampleInputStream" class RateType(Enum): normal = "NORMAL" high = "HIGH" def get_heart_rate(rate_type): if rate_type == RateType.normal: rate = random.randint(60, 100) elif rate_type == RateType.high: rate = random.randint(150, 200) else: raise TypeError return {"heartRate": rate, "rateType": rate_type.value} def generate(stream_name, kinesis_client, output=True): while True: rnd = random.random() rate_type = RateType.high if rnd < 0.01 else RateType.normal heart_rate = get_heart_rate(rate_type) if output: print(heart_rate) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(heart_rate), PartitionKey="partitionkey", ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))