

文件 AWS 開發套件範例 GitHub 儲存庫中有更多可用的 [AWS SDK 範例](https://github.com/awsdocs/aws-doc-sdk-examples)。

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Managed Service for Apache Flink 的資料產生器
<a name="kinesis-analytics-v2_code_examples_data_generator"></a>

下列程式碼範例示範如何使用 Managed Service for Apache Flink AWS SDKs

**Topics**
+ [透過推薦網站產生串流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Referrer_section.md)
+ [產生血壓異常的串流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_AnomalyEx_section.md)
+ [使用欄中的資料產生串流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_ColumnLog_section.md)
+ [產生心率異常的串流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Anomaly_section.md)
+ [使用熱點產生串流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Hotspots_section.md)
+ [使用日誌項目產生串流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_RegexLog_section.md)
+ [使用交錯處理資料產生串流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Stagger_section.md)
+ [使用股票代號資料產生串流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_StockTicker_section.md)
+ [使用兩種資料類型產生串流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_TwoRecordTypes_section.md)
+ [使用 Web 日誌資料產生串流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_WebLog_section.md)

# 使用 AWS SDK 與推薦者產生 Kinesis 串流
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Referrer_section"></a>

下列程式碼範例示範如何透過推薦網站產生 Kinesis 串流。

------
#### [ Python ]

**適用於 Python 的 SDK (Boto3)**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在 [AWS 程式碼範例儲存庫](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中設定和執行。

```
import json
import boto3

STREAM_NAME = "ExampleInputStream"


def get_data():
    return {"REFERRER": "http://www.amazon.com"}


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
        )


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------

# 使用 SDK 產生具有血壓異常的 Kinesis 串流 AWS
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_AnomalyEx_section"></a>

下列程式碼範例示範如何產生血壓異常的 Kinesis 串流。

------
#### [ Python ]

**適用於 Python 的 SDK (Boto3)**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在 [AWS 程式碼範例儲存庫](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中設定和執行。

```
from enum import Enum
import json
import random
import boto3

STREAM_NAME = "ExampleInputStream"


class PressureType(Enum):
    low = "LOW"
    normal = "NORMAL"
    high = "HIGH"


def get_blood_pressure(pressure_type):
    pressure = {"BloodPressureLevel": pressure_type.value}
    if pressure_type == PressureType.low:
        pressure["Systolic"] = random.randint(50, 80)
        pressure["Diastolic"] = random.randint(30, 50)
    elif pressure_type == PressureType.normal:
        pressure["Systolic"] = random.randint(90, 120)
        pressure["Diastolic"] = random.randint(60, 80)
    elif pressure_type == PressureType.high:
        pressure["Systolic"] = random.randint(130, 200)
        pressure["Diastolic"] = random.randint(90, 150)
    else:
        raise TypeError
    return pressure


def generate(stream_name, kinesis_client):
    while True:
        rnd = random.random()
        pressure_type = (
            PressureType.low
            if rnd < 0.005
            else PressureType.high
            if rnd > 0.995
            else PressureType.normal
        )
        blood_pressure = get_blood_pressure(pressure_type)
        print(blood_pressure)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(blood_pressure),
            PartitionKey="partitionkey",
        )


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------

# 使用 AWS SDK 以資料欄中的資料產生 Kinesis 串流
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_ColumnLog_section"></a>

下列程式碼範例示範如何利用欄中的資料產生 Kinesis 串流。

------
#### [ Python ]

**適用於 Python 的 SDK (Boto3)**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在 [AWS 程式碼範例儲存庫](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中設定和執行。

```
import json
import boto3

STREAM_NAME = "ExampleInputStream"


def get_data():
    return {"Col_A": "a", "Col_B": "b", "Col_C": "c", "Col_E_Unstructured": "x,y,z"}


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
        )


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------

# 使用 SDK 產生具有心率異常的 Kinesis 串流 AWS
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Anomaly_section"></a>

下列程式碼範例示範如何產生心率異常的 Kinesis 串流。

------
#### [ Python ]

**適用於 Python 的 SDK (Boto3)**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在 [AWS 程式碼範例儲存庫](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中設定和執行。

```
from enum import Enum
import json
import random
import boto3

STREAM_NAME = "ExampleInputStream"


class RateType(Enum):
    normal = "NORMAL"
    high = "HIGH"


def get_heart_rate(rate_type):
    if rate_type == RateType.normal:
        rate = random.randint(60, 100)
    elif rate_type == RateType.high:
        rate = random.randint(150, 200)
    else:
        raise TypeError
    return {"heartRate": rate, "rateType": rate_type.value}


def generate(stream_name, kinesis_client, output=True):
    while True:
        rnd = random.random()
        rate_type = RateType.high if rnd < 0.01 else RateType.normal
        heart_rate = get_heart_rate(rate_type)
        if output:
            print(heart_rate)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(heart_rate),
            PartitionKey="partitionkey",
        )


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------

# 使用 AWS SDK 產生具有熱點的 Kinesis 串流
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Hotspots_section"></a>

下列程式碼範例示範如何使用熱點產生 Kinesis 串流。

------
#### [ Python ]

**適用於 Python 的 SDK (Boto3)**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在 [AWS 程式碼範例儲存庫](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中設定和執行。

```
import json
from pprint import pprint
import random
import time
import boto3

STREAM_NAME = "ExampleInputStream"


def get_hotspot(field, spot_size):
    hotspot = {
        "left": field["left"] + random.random() * (field["width"] - spot_size),
        "width": spot_size,
        "top": field["top"] + random.random() * (field["height"] - spot_size),
        "height": spot_size,
    }
    return hotspot


def get_record(field, hotspot, hotspot_weight):
    rectangle = hotspot if random.random() < hotspot_weight else field
    point = {
        "x": rectangle["left"] + random.random() * rectangle["width"],
        "y": rectangle["top"] + random.random() * rectangle["height"],
        "is_hot": "Y" if rectangle is hotspot else "N",
    }
    return {"Data": json.dumps(point), "PartitionKey": "partition_key"}


def generate(
    stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client
):
    """
    Generates points used as input to a hotspot detection algorithm.
    With probability hotspot_weight (20%), a point is drawn from the hotspot;
    otherwise, it is drawn from the base field. The location of the hotspot
    changes for every 1000 points generated.
    """
    points_generated = 0
    hotspot = None
    while True:
        if points_generated % 1000 == 0:
            hotspot = get_hotspot(field, hotspot_size)
        records = [
            get_record(field, hotspot, hotspot_weight) for _ in range(batch_size)
        ]
        points_generated += len(records)
        pprint(records)
        kinesis_client.put_records(StreamName=stream_name, Records=records)

        time.sleep(0.1)


if __name__ == "__main__":
    generate(
        stream_name=STREAM_NAME,
        field={"left": 0, "width": 10, "top": 0, "height": 10},
        hotspot_size=1,
        hotspot_weight=0.2,
        batch_size=10,
        kinesis_client=boto3.client("kinesis"),
    )
```

------

# 使用 AWS SDK 產生具有日誌項目的 Kinesis 串流
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_RegexLog_section"></a>

下列程式碼範例示範如何使用日誌項目產生 Kinesis 串流。

------
#### [ Python ]

**適用於 Python 的 SDK (Boto3)**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在 [AWS 程式碼範例儲存庫](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中設定和執行。

```
import json
import boto3

STREAM_NAME = "ExampleInputStream"


def get_data():
    return {
        "LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] "
        '"GET /index.php HTTP/1.1" 200 125 "-" '
        '"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0"'
    }


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
        )


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------

# 使用 AWS SDK 產生具有交錯資料的 Kinesis 串流
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Stagger_section"></a>

下列程式碼範例示範如何使用交錯處理資料產生 Kinesis 串流。

------
#### [ Python ]

**適用於 Python 的 SDK (Boto3)**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在 [AWS 程式碼範例儲存庫](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中設定和執行。

```
import datetime
import json
import random
import time
import boto3

STREAM_NAME = "ExampleInputStream"


def get_data():
    event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
    return {
        "EVENT_TIME": event_time.isoformat(),
        "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
    }


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        # Send six records, ten seconds apart, with the same event time and ticker
        for _ in range(6):
            print(data)
            kinesis_client.put_record(
                StreamName=stream_name,
                Data=json.dumps(data),
                PartitionKey="partitionkey",
            )
            time.sleep(10)


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------

# 使用 AWS SDK 產生具有股票代號資料的 Kinesis 串流
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_StockTicker_section"></a>

下列程式碼範例示範如何使用股票代號資料產生 Kinesis 串流。

------
#### [ Python ]

**適用於 Python 的 SDK (Boto3)**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在 [AWS 程式碼範例儲存庫](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中設定和執行。

```
import datetime
import json
import random
import boto3

STREAM_NAME = "ExampleInputStream"


def get_data():
    return {
        "EVENT_TIME": datetime.datetime.now().isoformat(),
        "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
        "PRICE": round(random.random() * 100, 2),
    }


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
        )


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------

# 使用 SDK 產生具有兩種資料類型的 Kinesis 串流 AWS
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_TwoRecordTypes_section"></a>

下列程式碼範例示範如何使用兩種資料類型產生 Kinesis 串流。

------
#### [ Python ]

**適用於 Python 的 SDK (Boto3)**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在 [AWS 程式碼範例儲存庫](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中設定和執行。

```
import json
import random
import boto3

STREAM_NAME = "OrdersAndTradesStream"
PARTITION_KEY = "partition_key"


def get_order(order_id, ticker):
    return {
        "RecordType": "Order",
        "Oid": order_id,
        "Oticker": ticker,
        "Oprice": random.randint(500, 10000),
        "Otype": "Sell",
    }


def get_trade(order_id, trade_id, ticker):
    return {
        "RecordType": "Trade",
        "Tid": trade_id,
        "Toid": order_id,
        "Tticker": ticker,
        "Tprice": random.randint(0, 3000),
    }


def generate(stream_name, kinesis_client):
    order_id = 1
    while True:
        ticker = random.choice(["AAAA", "BBBB", "CCCC"])
        order = get_order(order_id, ticker)
        print(order)
        kinesis_client.put_record(
            StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY
        )
        for trade_id in range(1, random.randint(0, 6)):
            trade = get_trade(order_id, trade_id, ticker)
            print(trade)
            kinesis_client.put_record(
                StreamName=stream_name,
                Data=json.dumps(trade),
                PartitionKey=PARTITION_KEY,
            )
        order_id += 1


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------

# 使用 AWS SDK 產生 Kinesis 串流與 Web 日誌資料
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_WebLog_section"></a>

下列程式碼範例示範如何使用 Web 日誌資料產生 Kinesis 串流。

------
#### [ Python ]

**適用於 Python 的 SDK (Boto3)**  
 GitHub 上提供更多範例。尋找完整範例，並了解如何在 [AWS 程式碼範例儲存庫](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中設定和執行。

```
import json
import boto3

STREAM_NAME = "ExampleInputStream"


def get_data():
    return {
        "log": "192.168.254.30 - John [24/May/2004:22:01:02 -0700] "
        '"GET /icons/apache_pb.gif HTTP/1.1" 304 0'
    }


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
        )


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------