

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 教程：在 Managed Service for Apache Flink 中创建 Studio 笔记本
<a name="example-notebook"></a>

以下教程演示如何创建从 Kinesis 数据流或 Amazon MSK 集群读取数据的 Studio 笔记本。

**Topics**
+ [完成 先决条件](#example-notebook-setup)
+ [创建 AWS Glue 数据库](#example-notebook-glue)
+ [后续步骤：使用 Kinesis Data Streams 或 Amazon MSK 创建 Studio 笔记本](#examples-notebook-nextsteps)
+ [使用 Kinesis Data Streams 创建 Studio 笔记本](example-notebook-streams.md)
+ [使用 Amazon MSK 创建 Studio 笔记本](example-notebook-msk.md)
+ [清除您的应用程序和依赖资源](example-notebook-cleanup.md)

## 完成 先决条件
<a name="example-notebook-setup"></a>

请确保您的版本 AWS CLI 为 2 或更高版本。要安装最新版本 AWS CLI，请参阅[安装、更新和卸载 AWS CLI 版本 2](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html)。

## 创建 AWS Glue 数据库
<a name="example-notebook-glue"></a>

您的 Studio 笔记本使用[AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)数据库来存储有关您的Amazon MSK 数据来源的元数据。

**创建 AWS Glue 数据库**

1. 打开 AWS Glue 控制台，网址为[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)。

1. 选择 **Add database**（添加数据库）。在“**添加数据库**” 窗口中，输入 **default**“**数据库名称**”。选择**创建**。

## 后续步骤：使用 Kinesis Data Streams 或 Amazon MSK 创建 Studio 笔记本
<a name="examples-notebook-nextsteps"></a>

通过本教程，您可以创建一个使用 Kinesis Data Streams 或 Amazon MSK 的 Studio 笔记本：
+ [使用 Kinesis Data Streams 创建 Studio 笔记本](example-notebook-streams.md)：使用 Kinesis Data Streams，您可以快速创建使用 Kinesis 数据流作为源的应用程序。您只需要创建 Kinesis 数据流作为依赖资源即可。
+ [使用 Amazon MSK 创建 Studio 笔记本](example-notebook-msk.md)：使用 Amazon MSK，您可以创建使用Amazon MSK 集群作为源的应用程序。您需要创建一个 Amazon VPC、一个 Amazon EC2 客户端实例和一个 Amazon MSK 集群作为依赖资源。

# 使用 Kinesis Data Streams 创建 Studio 笔记本
<a name="example-notebook-streams"></a>

本教程描述如何创建使用 Kinesis 数据流作为源的 Studio 笔记本。

**Topics**
+ [完成 先决条件](#example-notebook-streams-setup)
+ [创建 AWS Glue 表格](#example-notebook-streams-glue)
+ [使用 Kinesis Data Streams 创建 Studio 笔记本](#example-notebook-streams-create)
+ [将数据发送到您的 Kinesis 数据流](#example-notebook-streams-send)
+ [测试您的 Studio 笔记本](#example-notebook-streams-test)

## 完成 先决条件
<a name="example-notebook-streams-setup"></a>

在创建 Studio 笔记本之前，请创建 Kinesis 数据流 (`ExampleInputStream`) 。您的应用程序使用此流作为应用程序源。

可以使用 Amazon Kinesis 控制台或以下 AWS CLI 命令创建这些流。有关控制台说明，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建和更新数据流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)。为流命名**ExampleInputStream**并将**打开的分片数**设置为**1**。

要使用创建直播 (`ExampleInputStream`) AWS CLI，请使用以下 Amazon Kinesis 命令`create-stream` AWS CLI 。

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## 创建 AWS Glue 表格
<a name="example-notebook-streams-glue"></a>

您的 Studio 笔记本使用[AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)数据库来存储有关您的 Kinesis 数据流数据来源的元数据。

**注意**  
您可以先手动创建数据库，也可以让 Managed Service for Apache Flink 在创建笔记本时为您创建数据库。同样，您可以按照本节所述手动创建表，也可以在 Apache Zeppelin 的笔记本中使用 Managed Service for Apache Flink 创建表连接器代码，通过 DDL 语句创建表。然后，您可以签入 AWS Glue 以确保表格已正确创建。

**创建表**

1. 登录 AWS 管理控制台 并打开 AWS Glue 控制台，网址为[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)。

1. 如果您还没有 AWS Glue 数据库，请从左侧导航栏中选择 “**数据库**”。选择 **添加数据库**。在“**添加数据库**” 窗口中，输入 **default**“**数据库名称**”。选择 **创建**。

1. 在左侧导航栏中，选择 **表**。在 “**表**” 页中，选择 “**添加表**”，“**手动添加表**”。

1. 在**设置表的属性**页面中，输入**stock****表格名称**。请务必选择之前创建的数据库。选择 **下一步**。

1. 在**添加数据存储**页面中，选择 **Kinesis**。对于**直播名称**，请输入**ExampleInputStream**。对于 **Kinesis 来源网址**，请选择 Enter。**https://kinesis.us-east-1.amazonaws.com**如果您复制并粘贴 **Kinesis 源网址**，请务必删除所有前导或尾随空格。选择 **下一步**。

1. 在**分类**页面中，选择 **JSON**。选择 **下一步**。

1. 在**定义架构**页面中，选择 Add Column 以添加列。添加具有以下属性的列：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/example-notebook-streams.html)

   选择**下一步**。

1. 在下一页上，验证您的设置，然后选择**完成**。

1. 从表列表中选择新创建的表。

1. 选择 **“编辑表”**，然后添加包含键`managed-flink.proctime`和值的属性`proctime`。

1. 选择**应用**。

## 使用 Kinesis Data Streams 创建 Studio 笔记本
<a name="example-notebook-streams-create"></a>

现在，您已经创建了应用程序使用的资源，接下来就可以创建自己的 Studio 笔记本了。

**Topics**
+ [使用创建 Studio 笔记本 AWS 管理控制台](#example-notebook-create-streams-console)
+ [使用创建 Studio 笔记本 AWS CLI](#example-notebook-msk-create-api)

### 使用创建 Studio 笔记本 AWS 管理控制台
<a name="example-notebook-create-streams-console"></a>

1. [在家打开适用于 Apache Flink 的托管服务控制台？ https://console.aws.amazon.com/managed-flink/ region=us](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)-east-1\$1/应用程序/仪表板。

1. 在 **Managed Service for Apache Flink 应用程序**页面中，选择 **Studio** 选项卡。选择**创建 Studio 笔记本**。
**注意**  
您也可以从 Amazon MSK 或 Kinesis Data Streams 控制台创建 Studio 笔记本，方法是选择输入的 Amazon MSK 集群或 Kinesis 数据流，然后**选择** “实时处理数据”。

1. 在 **创建笔记本实例页面上**，提供以下信息：
   + 输入笔记本**MyNotebook**的名称。
   + 为 Glue **数据库AWS 选择****默认值**。

   选择**创建 Studio 笔记本**。

1. 在**MyNotebook**页面中，选择 “**运行**”。等待“**状态”显示为“****正在运行**”。笔记本电脑运行时会产生费用。

### 使用创建 Studio 笔记本 AWS CLI
<a name="example-notebook-msk-create-api"></a>

要使用创建 Studio 笔记本 AWS CLI，请执行以下操作：

1. 验证账户 ID。创建应用程序时，您需要用到此值。

1. 创建角色`arn:aws:iam::AccountID:role/ZeppelinRole`并通过控制台向自动创建的角色添加以下权限。

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. 创建以下内容的名为 `create.json` 的文件。确保将占位符值替换为您自己的信息。

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 要创建应用程序，请运行以下命令：

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 命令完成后，您会看到显示新 Studio 笔记本详细信息的输出。下面是输出的一个示例。

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 要开始应用程序，请运行以下命令。请将占位符值替换为账户 ID。

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## 将数据发送到您的 Kinesis 数据流
<a name="example-notebook-streams-send"></a>

要将测试数据发送到您的 Kinesis 数据流，请执行以下操作：

1. 打开 [Kinesis Data Generator (KDG)](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)。

1. 选择**使用创建 Cognito 用户**。 CloudFormation

1.  CloudFormation 控制台随即打开 Kinesis 数据生成器模板。选择**下一步**。

1. 在 **指定堆栈详细信息**页面上，输入 Cognito 用户的用户名和密码。选择 **下一步**。

1. 在 **配置堆栈选项** 页面上，请选择 **下一步**。

1. 在 **Review Kinesis-Data-Generator-Cognito-User** 页面中，选择**我确认 AWS CloudFormation 可能会创建 IAM 资源**。 复选框。选择**创建堆栈**。

1. 等待 CloudFormation 堆栈完成创建。**堆栈完成后，在控制台中打开 **Kinesis-Data-Generator-Cognito-User 堆栈，然后**选择输出选项卡。 CloudFormation **打开为**KinesisDataGeneratorUrl**输出值列出的 URL。

1. 在 **Amazon Kinesis 数据生成器**页面中，使用您在步骤 4 中创建的凭证登录。

1. 在下一页，提供以下值：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/example-notebook-streams.html)

   对于 “**记录模板**”，粘贴以下代码：

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. 选择 “**发送数据**”。

1. 生成器会将数据发送到 Kinesis 数据流。

   在完成下一部分的同时，让发电机继续运行。

## 测试您的 Studio 笔记本
<a name="example-notebook-streams-test"></a>

在本节中，您将使用 Studio 笔记本来查询 Kinesis 数据流中的数据。

1. [在家打开适用于 Apache Flink 的托管服务控制台？ https://console.aws.amazon.com/managed-flink/ region=us](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)-east-1\$1/应用程序/仪表板。

1. 在 **Managed Service for Apache Flink 应用程序**页面上，选择 **Studio 笔记本**选项卡。选择 **MyNotebook**。

1. 在**MyNotebook**页面中，选择 “在 **Apache 齐柏林飞艇中打开**”。

   Apache Zeppelin 接口会在新选项卡中打开。

1. 在《**欢迎来到齐柏**林飞艇》中！ 页面上，选择**齐柏林飞艇笔记**。

1. 在 **Zeppelin Note 页面中**，在新笔记中输入以下查询：

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   选择运行图标。

   片刻之后，注释将显示来自 Kinesis 数据流的数据。

要打开应用程序的 Apache Flink 控制面板以查看操作方面，请选择 **FLINK JOB**。有关 Flink 控制面板的更多信息，请参阅《[Managed Service for Apache Flink [开发者指南》中的 Apache](https://docs.aws.amazon.com/) Flink 控制面板](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)。

有关 Flink Streaming SQL 查询的更多示例，请参阅 [Apache Fl](https://nightlies.apache.org/flink/flink-docs-release-1.15/) ink 文档中的[查询](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

# 使用 Amazon MSK 创建 Studio 笔记本
<a name="example-notebook-msk"></a>

本教程描述如何创建使用 Amazon MSK 集群作为源的 Studio 笔记本。

**Topics**
+ [设置 Amazon MSK 集群](#example-notebook-msk-setup)
+ [将 NAT 网关添加到您的 VPC](#example-notebook-msk-nat)
+ [创建 AWS Glue 连接和表](#example-notebook-msk-glue)
+ [使用 Amazon MSK 创建 Studio 笔记本](#example-notebook-msk-create)
+ [向您的Amazon MSK 集群发送数据](#example-notebook-msk-send)
+ [测试您的 Studio 笔记本](#example-notebook-msk-test)

## 设置 Amazon MSK 集群
<a name="example-notebook-msk-setup"></a>

在此教程中，您需要一个允许纯文本访问的 Amazon MSK 集群。如果您尚未设置 Amazon MSK 集群，请按照[使用亚马逊 MSK 入门](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)教程创建亚马逊 VPC、亚马逊 MSK 集群、主题和亚马逊 EC2 客户端实例。

在学习教程时，执行以下操作：
+ 在[步骤 3：创建 Amazon MSK 集群](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html)中，在步骤 4 中，将`ClientBroker`值从`TLS`更改为**PLAINTEXT**。

## 将 NAT 网关添加到您的 VPC
<a name="example-notebook-msk-nat"></a>

如果您按照[使用 Amazon MSK 入门教程创建了 Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) 集群，或者您的现有 Amazon VPC 还没有用于其私有子网的 NAT 网关，则必须将 NAT 网关添加到您的 Amazon VPC 中。下图演示了架构。

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/images/vpc_05.png)


要为您的 Amazon VPC 创建 NAT 网关，请执行以下操作：

1. 打开位于 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 的 Amazon VPC 控制台。

1. 从左侧导航栏中选择 **NAT 网关**。

1. 在 **NAT 网关**页面上，选择**创建 NAT 网关**。

1. 在**创建 NAT 网关**页面上，提供以下值：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/example-notebook-msk.html)

   选择 **Create NAT Gateway**（创建 NAT 网关）。

1. 在左侧导航栏中，选择 **路由表**。

1. 选择 **Create Route Table**。

1. 在 **创建(路由表)** 页面上，提供以下信息：
   + **名称标签：****ZeppelinRouteTable**
   + **VPC**：选择您的 VPC（例如 **AWS KafkaTutorialVPC**）。

   选择**创建**。

1. 在路由表列表中，选择**ZeppelinRouteTable**。选择 **路由**选项卡，然后选择 **编辑路由**。

1. 在**编辑路由**页面上，选择**添加路由**。

1. 在 ******目标位置**字段，输入**0.0.0.0/0**。对于**目标**，选择 **NAT 网关**ZeppelinGateway****。选择 **保存路由**。选择 **关闭**。

1. 在 “路由表” 页面上，**ZeppelinRouteTable**选中，选择 “**子网关联**” 选项卡。选择**编辑子网关联**。

1. 在**编辑子网关联**页面中，选择 **AWS KafkaTutorialSubnet2** 和 **AWS KafkaTutorialSubnet3**。选择**保存**。

## 创建 AWS Glue 连接和表
<a name="example-notebook-msk-glue"></a>

您的 Studio 笔记本使用[AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)数据库来存储有关您的Amazon MSK 数据来源的元数据。在本节中，您将创建一个描述如何访问您的 Amazon MSK 集群的 AWS Glue 连接，以及一个描述如何将数据源中的数据呈现给客户端（例如 Studio 笔记本）的 AWS Glue 表。

**创建连接**

1. 登录 AWS 管理控制台 并打开 AWS Glue 控制台，网址为[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)。

1. 如果您还没有 AWS Glue 数据库，请从左侧导航栏中选择 “**数据库**”。选择 **添加数据库**。在“**添加数据库**” 窗口中，输入 **default**“**数据库名称**”。选择 **创建**。

1. 从左侧导航菜单中，选择**连接**。选择 **添加连接**。

1. 在 “**添加连接**” 窗口中，提供以下值：
   + 对于 **连接名称**，输入 **ZeppelinConnection**。
   + 对于 **Connection type (连接类型)**，选择 **Kafka**。
   + 对于 **Kafka 引导服务器 URLs**，请为您的集群提供引导代理字符串。您可以从 MSK 控制台或通过输入以下 CLI 命令来获取引导程序代理：

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + 取消选中 “**需要 SSL 连接**” 复选框。

   选择 **下一步**。

1. 在 **VPC** 页面上，提供以下值：
   + 对于 **VPC**，请选择您的 VPC 的名称（例如 ** AWS KafkaTutorialVPC**。）
   + 对于**子网**，选择 **AWS KafkaTutorialSubnet2**。
   + 对于**安全组**，请选择所有可用的组。

   选择 **下一步**。

1. 在“**连接属性**/**连接访问权限**” 页中，选择“**完成**”

**创建表**
**注意**  
您可以按照以下步骤所述手动创建表，也可以在 Apache Zeppelin 的笔记本中使用 Managed Service for Apache Flink创建表连接器代码，通过 DDL 语句创建表。然后，您可以签入 AWS Glue 以确保表格已正确创建。

1. 在左侧导航栏中，选择 **表**。在 “**表**” 页中，选择 “**添加表**”，“**手动添加表**”。

1. 在**设置表的属性**页面中，输入**stock****表格名称**。请务必选择之前创建的数据库。选择 **下一步**。

1. 在**添加数据存储**页面中，选择 **Kafka**。在**主题名称**中，输入您的主题名称（例如 **AWS KafkaTutorialTopic**）。对于 “**连接**”，选择**ZeppelinConnection**。

1. 在**分类**页面中，选择 **JSON**。选择 **下一步**。

1. 在**定义架构**页面中，选择 Add Column 以添加列。添加具有以下属性的列：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/example-notebook-msk.html)

   选择**下一步**。

1. 在下一页上，验证您的设置，然后选择**完成**。

1. 从表列表中选择新创建的表。

1. 选择**编辑表格**并添加以下属性：
   + 键：`managed-flink.proctime`，值：`proctime`
   + 键：`flink.properties.group.id`，值：`test-consumer-group`
   + 键：`flink.properties.auto.offset.reset`，值：`latest`
   + 键：`classification`，值：`json`

   如果没有这些键/值对，Flink 笔记本就会遇到错误。

1. 选择**应用**。

## 使用 Amazon MSK 创建 Studio 笔记本
<a name="example-notebook-msk-create"></a>

现在，您已经创建了应用程序使用的资源，接下来就可以创建自己的 Studio 笔记本了。

**Topics**
+ [使用创建 Studio 笔记本 AWS 管理控制台](#example-notebook-create-msk-console)
+ [使用创建 Studio 笔记本 AWS CLI](#example-notebook-msk-create-api)

**注意**  
您也可以从 Amazon MSK 控制台创建 Studio 笔记本，方法是选择现有集群，然后选择 “**实时处理数据**”。

### 使用创建 Studio 笔记本 AWS 管理控制台
<a name="example-notebook-create-msk-console"></a>

1. [在家打开适用于 Apache Flink 的托管服务控制台？ https://console.aws.amazon.com/managed-flink/ region=us](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)-east-1\$1/应用程序/仪表板。

1. 在 **Managed Service for Apache Flink 应用程序**页面中，选择 **Studio** 选项卡。选择**创建 Studio 笔记本**。
**注意**  
要从Amazon MSK 或 Kinesis Data Streams 控制台创建 Studio 笔记本，请选择您输入的Amazon MSK 集群或 Kinesis 数据流，然后**选择“**实时处理数据”。

1. 在 **创建笔记本实例** 页面上，提供以下信息：
   + 输入 ****MyNotebook** Studio 笔记本的名称**。
   + 为 **Glue 数据库AWS 选择****默认值**。

   选择**创建 Studio 笔记本**。

1. 在该**MyNotebook**页面中，选择 “**配置**” 选项卡。在 **联网** 部分中，选择 **编辑**。

1. 在**编辑网络连接 MyNotebook**页面中，选择**基于 Amazon MSK 集群的 VPC 配置**。为Amazon MSK 集群选择您的**Amazon MSK** 集群。选择 **Save changes（保存更改）**。

1. 在**MyNotebook**页面中，选择 “**运行**”。等待“**状态”显示为“****正在运行**”。

### 使用创建 Studio 笔记本 AWS CLI
<a name="example-notebook-msk-create-api"></a>

要使用创建 Studio 笔记本 AWS CLI，请执行以下操作：

1. 验证您具有以下信息：创建应用程序时，您需要用到这些值。
   + 您的 账户 ID
   + 包含您的 Amazon MSK 集群的 Amazon VPC 的子网 IDs 和安全组 ID。

1. 创建以下内容的名为 `create.json` 的文件。确保将占位符值替换为您自己的信息。

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 要创建应用程序，请运行以下命令：

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 命令完成后，您应该会看到类似于以下内容的输出，其中显示了新 Studio 笔记本的详细信息：

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 要开始应用程序，请运行以下命令。请将占位符值替换为账户 ID。

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## 向您的Amazon MSK 集群发送数据
<a name="example-notebook-msk-send"></a>

在本节中，你将在亚马逊 EC2 客户端中运行 Python 脚本，将数据发送到你的亚马逊 MSK 数据源。

1. 连接到您的亚马逊 EC2 客户端。

1. 运行以下命令安装 Python 版本 3、Pip 和 Kafka for Python 软件包，然后确认操作：

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. 通过输入以下命令 AWS CLI 在您的客户端计算机上进行配置：

   ```
   aws configure
   ```

   提供您的账户凭证，**us-east-1**并提供`region`。

1. 创建以下内容的名为 `stock.py` 的文件。将示例值替换为您的 Amazon MSK 集群的 Bootstrap Brokers 字符串，如果您的主题不是，请更新主题名称：**AWS KafkaTutorialTopic**

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. 使用以下命令运行脚本：

   ```
   $ python3 stock.py
   ```

1. 完成以下部分后，请让脚本继续运行。

## 测试您的 Studio 笔记本
<a name="example-notebook-msk-test"></a>

在本节中，您将使用 Studio 笔记本查询来自 Amazon MSK 集群的数据。

1. [在家打开适用于 Apache Flink 的托管服务控制台？ https://console.aws.amazon.com/managed-flink/ region=us](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)-east-1\$1/应用程序/仪表板。

1. 在 **Managed Service for Apache Flink 应用程序**页面上，选择 **Studio 笔记本**选项卡。选择 **MyNotebook**。

1. 在**MyNotebook**页面中，选择 “在 **Apache 齐柏林飞艇中打开**”。

   Apache Zeppelin 接口会在新选项卡中打开。

1. 在**欢迎来到 Zeppelin\$1**页面上，选择**Zeppelin 新笔记**。

1. 在 **Zeppelin Note **页面上，在新笔记中输入以下查询：

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   选择运行图标。

   该应用程序显示来自 Amazon MSK 集群的数据。

要打开应用程序的 Apache Flink 仪表板以查看操作方面，请选择 **FLINK JOB**。有关 Flink 控制面板的更多信息，请参阅《[Managed Service for Apache Flink [开发者指南》中的 Apache](https://docs.aws.amazon.com/) Flink 控制面板](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)。

有关 Flink Streaming SQL 查询的更多示例，请参阅 [Apache Fl](https://nightlies.apache.org/flink/flink-docs-release-1.15/) ink 文档中的[查询](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

# 清除您的应用程序和依赖资源
<a name="example-notebook-cleanup"></a>

## 删除您的 Studio 笔记本
<a name="example-notebook-cleanup-app"></a>

1. 打开 Managed Service for Apache Flink 控制台。

1. 选择 **MyNotebook**。

1. 选择**操作**，然后选择**删除**。

## 删除您的 AWS Glue 数据库和连接
<a name="example-notebook-cleanup-glue"></a>

1. 打开 AWS Glue 控制台，网址为[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)。

1. 从左侧导航栏中选择 **数据库**。选中 “**默认**” 旁边的复选框将其选中。选择**操作**，**删除数据库**。确认您的选择。

1. 从左侧导航菜单中，选择**连接**。选中旁边的复选框**ZeppelinConnection**将其选中。选择**操作**，**删除连接**。确认您的选择。

## 删除 IAM 角色和策略
<a name="example-notebook-msk-cleanup-iam"></a>

1. 使用 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 打开 IAM 控制台。

1. 从左侧导航菜单中，选择 **角色**。

1. 使用搜索栏搜索**ZeppelinRole**角色。

1. 选择**ZeppelinRole**角色。选择 **删除角色**。确认删除操作。

## 删除您的 CloudWatch 日志组
<a name="example-notebook-cleanup-cw"></a>

当您使用控制台创建应用程序时，控制台会为您创建 CloudWatch 日志组和日志流。如果您使用创建应用程序，则没有日志组和流 AWS CLI。

1. 打开 CloudWatch 控制台，网址为[https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)。

1. 从左侧导航菜单中，选择 **日志组**。

1. 选择**/AWS/KinesisAnalytics/MyNotebook**日志组。

1. 依次选择 **Actions**（操作）和 **Delete log group(s)**（删除日志组）。确认删除操作。

## 清除 Kinesis Data Streams 资源
<a name="example-notebook-cleanup-streams"></a>

要删除您的 Kinesis stream，请打开 Kinesis Data Streams 控制台，选择您的 Kinesis stream，然后选择**操作**、**删除**。

## 清理 MSK 资源
<a name="example-notebook-cleanup-msk"></a>

如果您为本教程创建了 Amazon MSK 集群，请执行本部分中的步骤。本部分包含清理Amazon EC2 客户端实例、Amazon VPC 和 Amazon MSK 集群的说明。

### 删除您的Amazon MSK 集群
<a name="example-notebook-msk-cleanup-msk"></a>

如果您为本教程创建了 Amazon MSK 集群，请执行这些步骤。

1. 在[https://console.aws.amazon.com/msk/家打开亚马逊 MSK 控制台？ region=us](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)-east-1\$1/home/。

1. 选择 **AWS KafkaTutorialCluster**。选择**删除**。**delete**在出现的窗口中输入并确认您的选择。

### 终止您的客户端实例
<a name="example-notebook-msk-cleanup-client"></a>

如果您为本教程创建了 Amazon EC2 客户端实例，请按照以下步骤操作。

1. 打开位于 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 的 Amazon EC2 控制台。

1. 从左侧导航栏中选择**实例**。

1. 选中旁边的复选框**ZeppelinClient**将其选中。

1. 依次选择**实例状态**，**终止实例**。

### 删除 Amazon VPC
<a name="example-notebook-msk-cleanup-vpc"></a>

如果您为本教程创建了 Amazon VPC，请按照以下步骤操作。

1. 打开位于 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 的 Amazon EC2 控制台。

1. 从左侧导航栏中选择 **“网络接口**”。

1. 在搜索栏输入您的 VPC ID，然后按输入进行搜索。

1. 选中表格标题中的复选框以选择所有显示的网络接口。

1. 依次选择**操作**、**分离**。在出现的窗口中，在 “**强制分离**”下选择 “**启用**”。选择 “**分离**”，然后等待所有网络接口都变为 “**可用**” 状态。

1. 选中表格标题中的复选框，以再次选择所有显示的网络接口。

1. 依次选择**操作**和**删除**。确认该操作。

1. 打开位于 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 的 Amazon VPC 控制台。

1. 选择 **AWS KafkaTutorialVPC**。依次选择 **操作** 和 **删除 VPC**。输入**delete**并确认删除。