

文档 AWS SDK 示例 GitHub 存储库中还有更多 [S AWS DK 示例](https://github.com/awsdocs/aws-doc-sdk-examples)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 适用于 Apache Flink 的托管服务的代码示例 AWS SDKs
<a name="kinesis-analytics-v2_code_examples"></a>

以下代码示例向您展示了如何将适用于 Apache Flink 的亚马逊托管服务与 AWS 软件开发套件 (SDK) 一起使用。

*操作*是大型程序的代码摘录，必须在上下文中运行。您可以通过操作了解如何调用单个服务函数，还可以通过函数相关场景的上下文查看操作。

**更多资源**
+  **[Managed Service for Apache Flink 开发人员指南](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html)**——有关 Managed Service for Apache Flink 的更多信息。
+ **[Managed Service for Apache Flink API 参考](https://docs.aws.amazon.com/managed-flink/latest/apiv2/Welcome.html)**——有关所有可用的 Managed Service for Apache Flink 操作的详细信息。
+ **[AWS 开发者中心](https://aws.amazon.com/developer/code-examples/?awsf.sdk-code-examples-product=product%23kinesis-data-analytics)** — 您可以按类别或全文搜索筛选的代码示例。
+ **[AWS SDK 示例](https://github.com/awsdocs/aws-doc-sdk-examples)** — 包含首选语言完整代码的 GitHub 存储库。包括有关设置和运行代码的说明。

**Contents**
+ [基本功能](kinesis-analytics-v2_code_examples_basics.md)
  + [操作](kinesis-analytics-v2_code_examples_actions.md)
    + [`AddApplicationInput`](kinesis-analytics-v2_example_kinesis-analytics-v2_AddApplicationInput_section.md)
    + [`AddApplicationOutput`](kinesis-analytics-v2_example_kinesis-analytics-v2_AddApplicationOutput_section.md)
    + [`CreateApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_CreateApplication_section.md)
    + [`DeleteApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_DeleteApplication_section.md)
    + [`DescribeApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_DescribeApplication_section.md)
    + [`DescribeApplicationSnapshot`](kinesis-analytics-v2_example_kinesis-analytics-v2_DescribeApplicationSnapshot_section.md)
    + [`DiscoverInputSchema`](kinesis-analytics-v2_example_kinesis-analytics-v2_DiscoverInputSchema_section.md)
    + [`StartApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_StartApplication_section.md)
    + [`StopApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_StopApplication_section.md)
    + [`UpdateApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_UpdateApplication_section.md)
+ [数据生成器](kinesis-analytics-v2_code_examples_data_generator.md)
  + [使用引用站点生成流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Referrer_section.md)
  + [生成包含血压异常的流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_AnomalyEx_section.md)
  + [生成包含列数据的数据流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_ColumnLog_section.md)
  + [生成包含心率异常的流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Anomaly_section.md)
  + [生成包含热点的流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Hotspots_section.md)
  + [生成包含日志条目的流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_RegexLog_section.md)
  + [生成包含交错数据的流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Stagger_section.md)
  + [生成包含股票行情数据的流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_StockTicker_section.md)
  + [生成包含两种数据类型的流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_TwoRecordTypes_section.md)
  + [生成包含 Web 日志数据的流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_WebLog_section.md)

# 适用于 Apache Flink 的托管服务的基本示例 AWS SDKs
<a name="kinesis-analytics-v2_code_examples_basics"></a>

以下代码示例展示了如何使用适用于 Apache Flink 的亚马逊托管服务的基础知识。 AWS SDKs

**Contents**
+ [操作](kinesis-analytics-v2_code_examples_actions.md)
  + [`AddApplicationInput`](kinesis-analytics-v2_example_kinesis-analytics-v2_AddApplicationInput_section.md)
  + [`AddApplicationOutput`](kinesis-analytics-v2_example_kinesis-analytics-v2_AddApplicationOutput_section.md)
  + [`CreateApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_CreateApplication_section.md)
  + [`DeleteApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_DeleteApplication_section.md)
  + [`DescribeApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_DescribeApplication_section.md)
  + [`DescribeApplicationSnapshot`](kinesis-analytics-v2_example_kinesis-analytics-v2_DescribeApplicationSnapshot_section.md)
  + [`DiscoverInputSchema`](kinesis-analytics-v2_example_kinesis-analytics-v2_DiscoverInputSchema_section.md)
  + [`StartApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_StartApplication_section.md)
  + [`StopApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_StopApplication_section.md)
  + [`UpdateApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_UpdateApplication_section.md)

# Apache Flink 托管服务的操作使用 AWS SDKs
<a name="kinesis-analytics-v2_code_examples_actions"></a>

以下代码示例演示了如何使用执行各个 Apache Flink 托管服务 Flink 操作。 AWS SDKs每个示例都包含一个指向的链接 GitHub，您可以在其中找到有关设置和运行代码的说明。

 以下示例仅包括最常用的操作。有关完整列表，请参阅[适用于 Apache Flink 的亚马逊托管服务 API 参考](https://docs.aws.amazon.com/managed-flink/latest/apiv2/Welcome.html)。

**Topics**
+ [`AddApplicationInput`](kinesis-analytics-v2_example_kinesis-analytics-v2_AddApplicationInput_section.md)
+ [`AddApplicationOutput`](kinesis-analytics-v2_example_kinesis-analytics-v2_AddApplicationOutput_section.md)
+ [`CreateApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_CreateApplication_section.md)
+ [`DeleteApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_DeleteApplication_section.md)
+ [`DescribeApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_DescribeApplication_section.md)
+ [`DescribeApplicationSnapshot`](kinesis-analytics-v2_example_kinesis-analytics-v2_DescribeApplicationSnapshot_section.md)
+ [`DiscoverInputSchema`](kinesis-analytics-v2_example_kinesis-analytics-v2_DiscoverInputSchema_section.md)
+ [`StartApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_StartApplication_section.md)
+ [`StopApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_StopApplication_section.md)
+ [`UpdateApplication`](kinesis-analytics-v2_example_kinesis-analytics-v2_UpdateApplication_section.md)

# 与 AWS SDK `AddApplicationInput` 配合使用
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_AddApplicationInput_section"></a>

以下代码示例演示了如何使用 `AddApplicationInput`。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis-analytics-v2#code-examples)中查找完整示例，了解如何进行设置和运行。

```
class KinesisAnalyticsApplicationV2:
    """Encapsulates Kinesis Data Analytics application functions."""

    def __init__(self, analytics_client):
        """
        :param analytics_client: A Boto3 Kinesis Data Analytics v2 client.
        """
        self.analytics_client = analytics_client
        self.name = None
        self.arn = None
        self.version_id = None
        self.create_timestamp = None


    def add_input(self, input_prefix, stream_arn, input_schema):
        """
        Adds an input stream to the application. The input stream data is mapped
        to an in-application stream that can be processed by your code running in
        Kinesis Data Analytics.

        :param input_prefix: The prefix prepended to in-application input stream names.
        :param stream_arn: The ARN of the input stream.
        :param input_schema: A schema that maps the data in the input stream to the
                             runtime environment. This can be automatically generated
                             by using `discover_input_schema` or you can create it
                             yourself.
        :return: Metadata about the newly added input.
        """
        try:
            response = self.analytics_client.add_application_input(
                ApplicationName=self.name,
                CurrentApplicationVersionId=self.version_id,
                Input={
                    "NamePrefix": input_prefix,
                    "KinesisStreamsInput": {"ResourceARN": stream_arn},
                    "InputSchema": input_schema,
                },
            )
            self.version_id = response["ApplicationVersionId"]
            logger.info("Add input stream %s to application %s.", stream_arn, self.name)
        except ClientError:
            logger.exception(
                "Couldn't add input stream %s to application %s.", stream_arn, self.name
            )
            raise
        else:
            return response
```
+  有关 API 的详细信息，请参阅适用[AddApplicationInput](https://docs.aws.amazon.com/goto/boto3/kinesisanalyticsv2-2018-05-23/AddApplicationInput)于 *Python 的AWS SDK (Boto3) API 参考*。

------

# 与 AWS SDK `AddApplicationOutput` 配合使用
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_AddApplicationOutput_section"></a>

以下代码示例演示了如何使用 `AddApplicationOutput`。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis-analytics-v2#code-examples)中查找完整示例，了解如何进行设置和运行。

```
class KinesisAnalyticsApplicationV2:
    """Encapsulates Kinesis Data Analytics application functions."""

    def __init__(self, analytics_client):
        """
        :param analytics_client: A Boto3 Kinesis Data Analytics v2 client.
        """
        self.analytics_client = analytics_client
        self.name = None
        self.arn = None
        self.version_id = None
        self.create_timestamp = None


    def add_output(self, in_app_stream_name, output_arn):
        """
        Adds an output stream to the application. Kinesis Data Analytics maps data
        from the specified in-application stream to the output stream.

        :param in_app_stream_name: The name of the in-application stream to map
                                   to the output stream.
        :param output_arn: The ARN of the output stream.
        :return: A list of metadata about the output resources currently assigned
                 to the application.
        """
        try:
            response = self.analytics_client.add_application_output(
                ApplicationName=self.name,
                CurrentApplicationVersionId=self.version_id,
                Output={
                    "Name": in_app_stream_name,
                    "KinesisStreamsOutput": {"ResourceARN": output_arn},
                    "DestinationSchema": {"RecordFormatType": "JSON"},
                },
            )
            outputs = response["OutputDescriptions"]
            self.version_id = response["ApplicationVersionId"]
            logging.info(
                "Added output %s to %s, which now has %s outputs.",
                output_arn,
                self.name,
                len(outputs),
            )
        except ClientError:
            logger.exception("Couldn't add output %s to %s.", output_arn, self.name)
            raise
        else:
            return outputs
```
+  有关 API 的详细信息，请参阅适用[AddApplicationOutput](https://docs.aws.amazon.com/goto/boto3/kinesisanalyticsv2-2018-05-23/AddApplicationOutput)于 *Python 的AWS SDK (Boto3) API 参考*。

------

# 与 AWS SDK `CreateApplication` 配合使用
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_CreateApplication_section"></a>

以下代码示例演示了如何使用 `CreateApplication`。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis-analytics-v2#code-examples)中查找完整示例，了解如何进行设置和运行。

```
class KinesisAnalyticsApplicationV2:
    """Encapsulates Kinesis Data Analytics application functions."""

    def __init__(self, analytics_client):
        """
        :param analytics_client: A Boto3 Kinesis Data Analytics v2 client.
        """
        self.analytics_client = analytics_client
        self.name = None
        self.arn = None
        self.version_id = None
        self.create_timestamp = None


    def create(self, app_name, role_arn, env="SQL-1_0"):
        """
        Creates a Kinesis Data Analytics application.

        :param app_name: The name of the application.
        :param role_arn: The ARN of a role that can be assumed by Kinesis Data
                         Analytics and grants needed permissions.
        :param env: The runtime environment of the application, such as SQL. Code
                    uploaded to the application runs in this environment.
        :return: Metadata about the newly created application.
        """
        try:
            response = self.analytics_client.create_application(
                ApplicationName=app_name,
                RuntimeEnvironment=env,
                ServiceExecutionRole=role_arn,
            )
            details = response["ApplicationDetail"]
            self._update_details(details)
            logger.info("Application %s created.", app_name)
        except ClientError:
            logger.exception("Couldn't create application %s.", app_name)
            raise
        else:
            return details
```
+  有关 API 的详细信息，请参阅适用[CreateApplication](https://docs.aws.amazon.com/goto/boto3/kinesisanalyticsv2-2018-05-23/CreateApplication)于 *Python 的AWS SDK (Boto3) API 参考*。

------

# 与 AWS SDK `DeleteApplication` 配合使用
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DeleteApplication_section"></a>

以下代码示例演示了如何使用 `DeleteApplication`。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis-analytics-v2#code-examples)中查找完整示例，了解如何进行设置和运行。

```
class KinesisAnalyticsApplicationV2:
    """Encapsulates Kinesis Data Analytics application functions."""

    def __init__(self, analytics_client):
        """
        :param analytics_client: A Boto3 Kinesis Data Analytics v2 client.
        """
        self.analytics_client = analytics_client
        self.name = None
        self.arn = None
        self.version_id = None
        self.create_timestamp = None


    def delete(self):
        """
        Deletes an application.
        """
        try:
            self.analytics_client.delete_application(
                ApplicationName=self.name, CreateTimestamp=self.create_timestamp
            )
            logger.info("Deleted application %s.", self.name)
        except ClientError:
            logger.exception("Couldn't delete application %s.", self.name)
            raise
```
+  有关 API 的详细信息，请参阅适用[DeleteApplication](https://docs.aws.amazon.com/goto/boto3/kinesisanalyticsv2-2018-05-23/DeleteApplication)于 *Python 的AWS SDK (Boto3) API 参考*。

------

# 与 AWS SDK `DescribeApplication` 配合使用
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DescribeApplication_section"></a>

以下代码示例演示了如何使用 `DescribeApplication`。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis-analytics-v2#code-examples)中查找完整示例，了解如何进行设置和运行。

```
class KinesisAnalyticsApplicationV2:
    """Encapsulates Kinesis Data Analytics application functions."""

    def __init__(self, analytics_client):
        """
        :param analytics_client: A Boto3 Kinesis Data Analytics v2 client.
        """
        self.analytics_client = analytics_client
        self.name = None
        self.arn = None
        self.version_id = None
        self.create_timestamp = None


    def describe(self, name):
        """
        Gets metadata about an application.

        :param name: The name of the application to look up.
        :return: Metadata about the application.
        """
        try:
            response = self.analytics_client.describe_application(ApplicationName=name)
            details = response["ApplicationDetail"]
            self._update_details(details)
            logger.info("Got metadata for application %s.", name)
        except ClientError:
            logger.exception("Couldn't get metadata for application %s.", name)
            raise
        else:
            return details
```
+  有关 API 的详细信息，请参阅适用[DescribeApplication](https://docs.aws.amazon.com/goto/boto3/kinesisanalyticsv2-2018-05-23/DescribeApplication)于 *Python 的AWS SDK (Boto3) API 参考*。

------

# 与 AWS SDK `DescribeApplicationSnapshot` 配合使用
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DescribeApplicationSnapshot_section"></a>

以下代码示例演示了如何使用 `DescribeApplicationSnapshot`。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis-analytics-v2#code-examples)中查找完整示例，了解如何进行设置和运行。

```
class KinesisAnalyticsApplicationV2:
    """Encapsulates Kinesis Data Analytics application functions."""

    def __init__(self, analytics_client):
        """
        :param analytics_client: A Boto3 Kinesis Data Analytics v2 client.
        """
        self.analytics_client = analytics_client
        self.name = None
        self.arn = None
        self.version_id = None
        self.create_timestamp = None


    def describe_snapshot(self, application_name, snapshot_name):
        """
        Gets metadata about a previously saved application snapshot.

        :param application_name: The name of the application.
        :param snapshot_name: The name of the snapshot.
        :return: Metadata about the snapshot.
        """
        try:
            response = self.analytics_client.describe_application_snapshot(
                ApplicationName=application_name, SnapshotName=snapshot_name
            )
            snapshot = response["SnapshotDetails"]
            logger.info(
                "Got metadata for snapshot %s of application %s.",
                snapshot_name,
                application_name,
            )
        except ClientError:
            logger.exception(
                "Couldn't get metadata for snapshot %s of application %s.",
                snapshot_name,
                application_name,
            )
            raise
        else:
            return snapshot
```
+  有关 API 的详细信息，请参阅适用[DescribeApplicationSnapshot](https://docs.aws.amazon.com/goto/boto3/kinesisanalyticsv2-2018-05-23/DescribeApplicationSnapshot)于 *Python 的AWS SDK (Boto3) API 参考*。

------

# 与 AWS SDK `DiscoverInputSchema` 配合使用
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DiscoverInputSchema_section"></a>

以下代码示例演示了如何使用 `DiscoverInputSchema`。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis-analytics-v2#code-examples)中查找完整示例，了解如何进行设置和运行。

```
class KinesisAnalyticsApplicationV2:
    """Encapsulates Kinesis Data Analytics application functions."""

    def __init__(self, analytics_client):
        """
        :param analytics_client: A Boto3 Kinesis Data Analytics v2 client.
        """
        self.analytics_client = analytics_client
        self.name = None
        self.arn = None
        self.version_id = None
        self.create_timestamp = None


    def discover_input_schema(self, stream_arn, role_arn):
        """
        Discovers a schema that maps data in a stream to a format that is usable by
        an application's runtime environment. The stream must be active and have
        enough data moving through it for the service to sample. The returned schema
        can be used when you add the stream as an input to the application or you can
        write your own schema.

        :param stream_arn: The ARN of the stream to map.
        :param role_arn: A role that lets Kinesis Data Analytics read from the stream.
        :return: The discovered schema of the data in the input stream.
        """
        try:
            response = self.analytics_client.discover_input_schema(
                ResourceARN=stream_arn,
                ServiceExecutionRole=role_arn,
                InputStartingPositionConfiguration={"InputStartingPosition": "NOW"},
            )
            schema = response["InputSchema"]
            logger.info("Discovered input schema for stream %s.", stream_arn)
        except ClientError:
            logger.exception(
                "Couldn't discover input schema for stream %s.", stream_arn
            )
            raise
        else:
            return schema
```
+  有关 API 的详细信息，请参阅适用[DiscoverInputSchema](https://docs.aws.amazon.com/goto/boto3/kinesisanalyticsv2-2018-05-23/DiscoverInputSchema)于 *Python 的AWS SDK (Boto3) API 参考*。

------

# 与 AWS SDK `StartApplication` 配合使用
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_StartApplication_section"></a>

以下代码示例演示了如何使用 `StartApplication`。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis-analytics-v2#code-examples)中查找完整示例，了解如何进行设置和运行。

```
class KinesisAnalyticsApplicationV2:
    """Encapsulates Kinesis Data Analytics application functions."""

    def __init__(self, analytics_client):
        """
        :param analytics_client: A Boto3 Kinesis Data Analytics v2 client.
        """
        self.analytics_client = analytics_client
        self.name = None
        self.arn = None
        self.version_id = None
        self.create_timestamp = None


    def start(self, input_id):
        """
        Starts an application. After the application is running, it reads from the
        specified input stream and runs the application code on the incoming data.

        :param input_id: The ID of the input to read.
        """
        try:
            self.analytics_client.start_application(
                ApplicationName=self.name,
                RunConfiguration={
                    "SqlRunConfigurations": [
                        {
                            "InputId": input_id,
                            "InputStartingPositionConfiguration": {
                                "InputStartingPosition": "NOW"
                            },
                        }
                    ]
                },
            )
            logger.info("Started application %s.", self.name)
        except ClientError:
            logger.exception("Couldn't start application %s.", self.name)
            raise
```
+  有关 API 的详细信息，请参阅适用[StartApplication](https://docs.aws.amazon.com/goto/boto3/kinesisanalyticsv2-2018-05-23/StartApplication)于 *Python 的AWS SDK (Boto3) API 参考*。

------

# 与 AWS SDK `StopApplication` 配合使用
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_StopApplication_section"></a>

以下代码示例演示了如何使用 `StopApplication`。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis-analytics-v2#code-examples)中查找完整示例，了解如何进行设置和运行。

```
class KinesisAnalyticsApplicationV2:
    """Encapsulates Kinesis Data Analytics application functions."""

    def __init__(self, analytics_client):
        """
        :param analytics_client: A Boto3 Kinesis Data Analytics v2 client.
        """
        self.analytics_client = analytics_client
        self.name = None
        self.arn = None
        self.version_id = None
        self.create_timestamp = None


    def stop(self):
        """
        Stops an application. This stops the application from processing data but
        does not delete any resources.
        """
        try:
            self.analytics_client.stop_application(ApplicationName=self.name)
            logger.info("Stopping application %s.", self.name)
        except ClientError:
            logger.exception("Couldn't stop application %s.", self.name)
            raise
```
+  有关 API 的详细信息，请参阅适用[StopApplication](https://docs.aws.amazon.com/goto/boto3/kinesisanalyticsv2-2018-05-23/StopApplication)于 *Python 的AWS SDK (Boto3) API 参考*。

------

# 与 AWS SDK `UpdateApplication` 配合使用
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_UpdateApplication_section"></a>

以下代码示例演示了如何使用 `UpdateApplication`。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis-analytics-v2#code-examples)中查找完整示例，了解如何进行设置和运行。
此示例将更新在现有应用程序中运行的代码。  

```
class KinesisAnalyticsApplicationV2:
    """Encapsulates Kinesis Data Analytics application functions."""

    def __init__(self, analytics_client):
        """
        :param analytics_client: A Boto3 Kinesis Data Analytics v2 client.
        """
        self.analytics_client = analytics_client
        self.name = None
        self.arn = None
        self.version_id = None
        self.create_timestamp = None


    def update_code(self, code):
        """
        Updates the code that runs in the application. The code must run in the
        runtime environment of the application, such as SQL. Application code
        typically reads data from in-application streams and transforms it in some way.

        :param code: The code to upload. This completely replaces any existing code
                     in the application.
        :return: Metadata about the application.
        """
        try:
            response = self.analytics_client.update_application(
                ApplicationName=self.name,
                CurrentApplicationVersionId=self.version_id,
                ApplicationConfigurationUpdate={
                    "ApplicationCodeConfigurationUpdate": {
                        "CodeContentTypeUpdate": "PLAINTEXT",
                        "CodeContentUpdate": {"TextContentUpdate": code},
                    }
                },
            )
            details = response["ApplicationDetail"]
            self.version_id = details["ApplicationVersionId"]
            logger.info("Update code for application %s.", self.name)
        except ClientError:
            logger.exception("Couldn't update code for application %s.", self.name)
            raise
        else:
            return details
```
+  有关 API 的详细信息，请参阅适用[UpdateApplication](https://docs.aws.amazon.com/goto/boto3/kinesisanalyticsv2-2018-05-23/UpdateApplication)于 *Python 的AWS SDK (Boto3) API 参考*。

------

# Managed Service for Apache Flink 的数据生成器
<a name="kinesis-analytics-v2_code_examples_data_generator"></a>

以下代码示例展示了如何使用适用于 Apache Flink 的托管服务。 AWS SDKs

**Topics**
+ [使用引用站点生成流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Referrer_section.md)
+ [生成包含血压异常的流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_AnomalyEx_section.md)
+ [生成包含列数据的数据流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_ColumnLog_section.md)
+ [生成包含心率异常的流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Anomaly_section.md)
+ [生成包含热点的流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Hotspots_section.md)
+ [生成包含日志条目的流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_RegexLog_section.md)
+ [生成包含交错数据的流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Stagger_section.md)
+ [生成包含股票行情数据的流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_StockTicker_section.md)
+ [生成包含两种数据类型的流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_TwoRecordTypes_section.md)
+ [生成包含 Web 日志数据的流](kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_WebLog_section.md)

# 使用软件开发工具包生成带有反向链接的 Kinesis 直播 AWS
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Referrer_section"></a>

以下代码示例演示了如何生成包含引用站点的 Kinesis 流。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中查找完整示例，了解如何进行设置和运行。

```
import json
import boto3

STREAM_NAME = "ExampleInputStream"


def get_data():
    return {"REFERRER": "http://www.amazon.com"}


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
        )


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------

# 使用 SDK 生成包含血压异常的 Kinesis 直播 AWS
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_AnomalyEx_section"></a>

以下代码示例演示了如何生成包含血压异常的 Kinesis 流。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中查找完整示例，了解如何进行设置和运行。

```
from enum import Enum
import json
import random
import boto3

STREAM_NAME = "ExampleInputStream"


class PressureType(Enum):
    low = "LOW"
    normal = "NORMAL"
    high = "HIGH"


def get_blood_pressure(pressure_type):
    pressure = {"BloodPressureLevel": pressure_type.value}
    if pressure_type == PressureType.low:
        pressure["Systolic"] = random.randint(50, 80)
        pressure["Diastolic"] = random.randint(30, 50)
    elif pressure_type == PressureType.normal:
        pressure["Systolic"] = random.randint(90, 120)
        pressure["Diastolic"] = random.randint(60, 80)
    elif pressure_type == PressureType.high:
        pressure["Systolic"] = random.randint(130, 200)
        pressure["Diastolic"] = random.randint(90, 150)
    else:
        raise TypeError
    return pressure


def generate(stream_name, kinesis_client):
    while True:
        rnd = random.random()
        pressure_type = (
            PressureType.low
            if rnd < 0.005
            else PressureType.high
            if rnd > 0.995
            else PressureType.normal
        )
        blood_pressure = get_blood_pressure(pressure_type)
        print(blood_pressure)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(blood_pressure),
            PartitionKey="partitionkey",
        )


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------

# 使用 SDK 生成包含列中数据的 Kinesis 流 AWS
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_ColumnLog_section"></a>

以下代码示例演示了如何使用列中的数据生成 Kinesis 流。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中查找完整示例，了解如何进行设置和运行。

```
import json
import boto3

STREAM_NAME = "ExampleInputStream"


def get_data():
    return {"Col_A": "a", "Col_B": "b", "Col_C": "c", "Col_E_Unstructured": "x,y,z"}


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
        )


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------

# 使用 SDK 生成带有心率异常的 Kinesis 直播 AWS
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Anomaly_section"></a>

以下代码示例演示了如何生成包含心率异常的 Kinesis 流。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中查找完整示例，了解如何进行设置和运行。

```
from enum import Enum
import json
import random
import boto3

STREAM_NAME = "ExampleInputStream"


class RateType(Enum):
    normal = "NORMAL"
    high = "HIGH"


def get_heart_rate(rate_type):
    if rate_type == RateType.normal:
        rate = random.randint(60, 100)
    elif rate_type == RateType.high:
        rate = random.randint(150, 200)
    else:
        raise TypeError
    return {"heartRate": rate, "rateType": rate_type.value}


def generate(stream_name, kinesis_client, output=True):
    while True:
        rnd = random.random()
        rate_type = RateType.high if rnd < 0.01 else RateType.normal
        heart_rate = get_heart_rate(rate_type)
        if output:
            print(heart_rate)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(heart_rate),
            PartitionKey="partitionkey",
        )


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------

# 使用 SDK 生成带有热点的 Kinesis 直播 AWS
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Hotspots_section"></a>

以下代码示例演示了如何生成包含热点的 Kinesis 流。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中查找完整示例，了解如何进行设置和运行。

```
import json
from pprint import pprint
import random
import time
import boto3

STREAM_NAME = "ExampleInputStream"


def get_hotspot(field, spot_size):
    hotspot = {
        "left": field["left"] + random.random() * (field["width"] - spot_size),
        "width": spot_size,
        "top": field["top"] + random.random() * (field["height"] - spot_size),
        "height": spot_size,
    }
    return hotspot


def get_record(field, hotspot, hotspot_weight):
    rectangle = hotspot if random.random() < hotspot_weight else field
    point = {
        "x": rectangle["left"] + random.random() * rectangle["width"],
        "y": rectangle["top"] + random.random() * rectangle["height"],
        "is_hot": "Y" if rectangle is hotspot else "N",
    }
    return {"Data": json.dumps(point), "PartitionKey": "partition_key"}


def generate(
    stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client
):
    """
    Generates points used as input to a hotspot detection algorithm.
    With probability hotspot_weight (20%), a point is drawn from the hotspot;
    otherwise, it is drawn from the base field. The location of the hotspot
    changes for every 1000 points generated.
    """
    points_generated = 0
    hotspot = None
    while True:
        if points_generated % 1000 == 0:
            hotspot = get_hotspot(field, hotspot_size)
        records = [
            get_record(field, hotspot, hotspot_weight) for _ in range(batch_size)
        ]
        points_generated += len(records)
        pprint(records)
        kinesis_client.put_records(StreamName=stream_name, Records=records)

        time.sleep(0.1)


if __name__ == "__main__":
    generate(
        stream_name=STREAM_NAME,
        field={"left": 0, "width": 10, "top": 0, "height": 10},
        hotspot_size=1,
        hotspot_weight=0.2,
        batch_size=10,
        kinesis_client=boto3.client("kinesis"),
    )
```

------

# 使用软件开发工具包生成带有日志条目的 Kinesis 流 AWS
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_RegexLog_section"></a>

以下代码示例演示了如何生成包含日志条目的 Kinesis 流。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中查找完整示例，了解如何进行设置和运行。

```
import json
import boto3

STREAM_NAME = "ExampleInputStream"


def get_data():
    return {
        "LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] "
        '"GET /index.php HTTP/1.1" 200 125 "-" '
        '"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0"'
    }


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
        )


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------

# 使用 SDK 生成包含错开数据的 Kinesis 直播 AWS
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_Stagger_section"></a>

以下代码示例演示了如何生成包含错开数据的 Kinesis 流。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中查找完整示例，了解如何进行设置和运行。

```
import datetime
import json
import random
import time
import boto3

STREAM_NAME = "ExampleInputStream"


def get_data():
    event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
    return {
        "EVENT_TIME": event_time.isoformat(),
        "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
    }


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        # Send six records, ten seconds apart, with the same event time and ticker
        for _ in range(6):
            print(data)
            kinesis_client.put_record(
                StreamName=stream_name,
                Data=json.dumps(data),
                PartitionKey="partitionkey",
            )
            time.sleep(10)


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------

# 使用 SDK 生成包含股票行情数据的 Kinesis 直播 AWS
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_StockTicker_section"></a>

以下代码示例演示了如何生成包含股票行情数据的 Kinesis 流。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中查找完整示例，了解如何进行设置和运行。

```
import datetime
import json
import random
import boto3

STREAM_NAME = "ExampleInputStream"


def get_data():
    return {
        "EVENT_TIME": datetime.datetime.now().isoformat(),
        "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
        "PRICE": round(random.random() * 100, 2),
    }


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
        )


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------

# 使用软件开发工具包生成具有两种数据类型的 Kinesis 流 AWS
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_TwoRecordTypes_section"></a>

以下代码示例演示了如何生成包含两种数据类型的 Kinesis 流。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中查找完整示例，了解如何进行设置和运行。

```
import json
import random
import boto3

STREAM_NAME = "OrdersAndTradesStream"
PARTITION_KEY = "partition_key"


def get_order(order_id, ticker):
    return {
        "RecordType": "Order",
        "Oid": order_id,
        "Oticker": ticker,
        "Oprice": random.randint(500, 10000),
        "Otype": "Sell",
    }


def get_trade(order_id, trade_id, ticker):
    return {
        "RecordType": "Trade",
        "Tid": trade_id,
        "Toid": order_id,
        "Tticker": ticker,
        "Tprice": random.randint(0, 3000),
    }


def generate(stream_name, kinesis_client):
    order_id = 1
    while True:
        ticker = random.choice(["AAAA", "BBBB", "CCCC"])
        order = get_order(order_id, ticker)
        print(order)
        kinesis_client.put_record(
            StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY
        )
        for trade_id in range(1, random.randint(0, 6)):
            trade = get_trade(order_id, trade_id, ticker)
            print(trade)
            kinesis_client.put_record(
                StreamName=stream_name,
                Data=json.dumps(trade),
                PartitionKey=PARTITION_KEY,
            )
        order_id += 1


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------

# 使用软件开发工具包生成包含网络日志数据的 Kinesis 直播 AWS
<a name="kinesis-analytics-v2_example_kinesis-analytics-v2_DataGenerator_WebLog_section"></a>

以下代码示例演示了如何生成包含 Web 日志数据的 Kinesis 流。

------
#### [ Python ]

**适用于 Python 的 SDK（Boto3）**  
 还有更多相关信息 GitHub。在 [AWS 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/python/example_code/kinesis#code-examples)中查找完整示例，了解如何进行设置和运行。

```
import json
import boto3

STREAM_NAME = "ExampleInputStream"


def get_data():
    return {
        "log": "192.168.254.30 - John [24/May/2004:22:01:02 -0700] "
        '"GET /icons/apache_pb.gif HTTP/1.1" 304 0'
    }


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
        )


if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))
```

------