

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Managed Service for Apache Flink 中的 Studio 笔记本示例和教程
<a name="how-zeppelin-examples"></a>

**Topics**
+ [教程：在 Managed Service for Apache Flink 中创建 Studio 笔记本](example-notebook.md)
+ [教程：将 Studio 笔记本部署为具有持久状态的 Managed Service for Apache Flink 应用程序。](example-notebook-deploy.md)
+ [查看用于分析 Studio 笔记本中数据的示例查询](how-zeppelin-sql-examples.md)

# 教程：在 Managed Service for Apache Flink 中创建 Studio 笔记本
<a name="example-notebook"></a>

以下教程演示如何创建从 Kinesis 数据流或 Amazon MSK 集群读取数据的 Studio 笔记本。

**Topics**
+ [完成 先决条件](#example-notebook-setup)
+ [创建 AWS Glue 数据库](#example-notebook-glue)
+ [后续步骤：使用 Kinesis Data Streams 或 Amazon MSK 创建 Studio 笔记本](#examples-notebook-nextsteps)
+ [使用 Kinesis Data Streams 创建 Studio 笔记本](example-notebook-streams.md)
+ [使用 Amazon MSK 创建 Studio 笔记本](example-notebook-msk.md)
+ [清除您的应用程序和依赖资源](example-notebook-cleanup.md)

## 完成 先决条件
<a name="example-notebook-setup"></a>

请确保您的版本 AWS CLI 为 2 或更高版本。要安装最新版本 AWS CLI，请参阅[安装、更新和卸载 AWS CLI 版本 2](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html)。

## 创建 AWS Glue 数据库
<a name="example-notebook-glue"></a>

您的 Studio 笔记本使用[AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)数据库来存储有关您的Amazon MSK 数据来源的元数据。

**创建 AWS Glue 数据库**

1. 打开 AWS Glue 控制台，网址为[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)。

1. 选择 **Add database**（添加数据库）。在“**添加数据库**” 窗口中，输入 **default**“**数据库名称**”。选择**创建**。

## 后续步骤：使用 Kinesis Data Streams 或 Amazon MSK 创建 Studio 笔记本
<a name="examples-notebook-nextsteps"></a>

通过本教程，您可以创建一个使用 Kinesis Data Streams 或 Amazon MSK 的 Studio 笔记本：
+ [使用 Kinesis Data Streams 创建 Studio 笔记本](example-notebook-streams.md)：使用 Kinesis Data Streams，您可以快速创建使用 Kinesis 数据流作为源的应用程序。您只需要创建 Kinesis 数据流作为依赖资源即可。
+ [使用 Amazon MSK 创建 Studio 笔记本](example-notebook-msk.md)：使用 Amazon MSK，您可以创建使用Amazon MSK 集群作为源的应用程序。您需要创建一个 Amazon VPC、一个 Amazon EC2 客户端实例和一个 Amazon MSK 集群作为依赖资源。

# 使用 Kinesis Data Streams 创建 Studio 笔记本
<a name="example-notebook-streams"></a>

本教程描述如何创建使用 Kinesis 数据流作为源的 Studio 笔记本。

**Topics**
+ [完成 先决条件](#example-notebook-streams-setup)
+ [创建 AWS Glue 表格](#example-notebook-streams-glue)
+ [使用 Kinesis Data Streams 创建 Studio 笔记本](#example-notebook-streams-create)
+ [将数据发送到您的 Kinesis 数据流](#example-notebook-streams-send)
+ [测试您的 Studio 笔记本](#example-notebook-streams-test)

## 完成 先决条件
<a name="example-notebook-streams-setup"></a>

在创建 Studio 笔记本之前，请创建 Kinesis 数据流 (`ExampleInputStream`) 。您的应用程序使用此流作为应用程序源。

可以使用 Amazon Kinesis 控制台或以下 AWS CLI 命令创建这些流。有关控制台说明，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建和更新数据流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)。为流命名**ExampleInputStream**并将**打开的分片数**设置为**1**。

要使用创建直播 (`ExampleInputStream`) AWS CLI，请使用以下 Amazon Kinesis 命令`create-stream` AWS CLI 。

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## 创建 AWS Glue 表格
<a name="example-notebook-streams-glue"></a>

您的 Studio 笔记本使用[AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)数据库来存储有关您的 Kinesis 数据流数据来源的元数据。

**注意**  
您可以先手动创建数据库，也可以让 Managed Service for Apache Flink 在创建笔记本时为您创建数据库。同样，您可以按照本节所述手动创建表，也可以在 Apache Zeppelin 的笔记本中使用 Managed Service for Apache Flink 创建表连接器代码，通过 DDL 语句创建表。然后，您可以签入 AWS Glue 以确保表格已正确创建。

**创建表**

1. 登录 AWS 管理控制台 并打开 AWS Glue 控制台，网址为[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)。

1. 如果您还没有 AWS Glue 数据库，请从左侧导航栏中选择 “**数据库**”。选择 **添加数据库**。在“**添加数据库**” 窗口中，输入 **default**“**数据库名称**”。选择 **创建**。

1. 在左侧导航栏中，选择 **表**。在 “**表**” 页中，选择 “**添加表**”，“**手动添加表**”。

1. 在**设置表的属性**页面中，输入**stock****表格名称**。请务必选择之前创建的数据库。选择 **下一步**。

1. 在**添加数据存储**页面中，选择 **Kinesis**。对于**直播名称**，请输入**ExampleInputStream**。对于 **Kinesis 来源网址**，请选择 Enter。**https://kinesis.us-east-1.amazonaws.com**如果您复制并粘贴 **Kinesis 源网址**，请务必删除所有前导或尾随空格。选择 **下一步**。

1. 在**分类**页面中，选择 **JSON**。选择 **下一步**。

1. 在**定义架构**页面中，选择 Add Column 以添加列。添加具有以下属性的列：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/example-notebook-streams.html)

   选择**下一步**。

1. 在下一页上，验证您的设置，然后选择**完成**。

1. 从表列表中选择新创建的表。

1. 选择 **“编辑表”**，然后添加包含键`managed-flink.proctime`和值的属性`proctime`。

1. 选择**应用**。

## 使用 Kinesis Data Streams 创建 Studio 笔记本
<a name="example-notebook-streams-create"></a>

现在，您已经创建了应用程序使用的资源，接下来就可以创建自己的 Studio 笔记本了。

**Topics**
+ [使用创建 Studio 笔记本 AWS 管理控制台](#example-notebook-create-streams-console)
+ [使用创建 Studio 笔记本 AWS CLI](#example-notebook-msk-create-api)

### 使用创建 Studio 笔记本 AWS 管理控制台
<a name="example-notebook-create-streams-console"></a>

1. [在家打开适用于 Apache Flink 的托管服务控制台？ https://console.aws.amazon.com/managed-flink/ region=us](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)-east-1\$1/应用程序/仪表板。

1. 在 **Managed Service for Apache Flink 应用程序**页面中，选择 **Studio** 选项卡。选择**创建 Studio 笔记本**。
**注意**  
您也可以从 Amazon MSK 或 Kinesis Data Streams 控制台创建 Studio 笔记本，方法是选择输入的 Amazon MSK 集群或 Kinesis 数据流，然后**选择** “实时处理数据”。

1. 在 **创建笔记本实例页面上**，提供以下信息：
   + 输入笔记本**MyNotebook**的名称。
   + 为 Glue **数据库AWS 选择****默认值**。

   选择**创建 Studio 笔记本**。

1. 在**MyNotebook**页面中，选择 “**运行**”。等待“**状态”显示为“****正在运行**”。笔记本电脑运行时会产生费用。

### 使用创建 Studio 笔记本 AWS CLI
<a name="example-notebook-msk-create-api"></a>

要使用创建 Studio 笔记本 AWS CLI，请执行以下操作：

1. 验证账户 ID。创建应用程序时，您需要用到此值。

1. 创建角色`arn:aws:iam::AccountID:role/ZeppelinRole`并通过控制台向自动创建的角色添加以下权限。

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. 创建以下内容的名为 `create.json` 的文件。确保将占位符值替换为您自己的信息。

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 要创建应用程序，请运行以下命令：

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 命令完成后，您会看到显示新 Studio 笔记本详细信息的输出。下面是输出的一个示例。

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 要开始应用程序，请运行以下命令。请将占位符值替换为账户 ID。

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## 将数据发送到您的 Kinesis 数据流
<a name="example-notebook-streams-send"></a>

要将测试数据发送到您的 Kinesis 数据流，请执行以下操作：

1. 打开 [Kinesis Data Generator (KDG)](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)。

1. 选择**使用创建 Cognito 用户**。 CloudFormation

1.  CloudFormation 控制台随即打开 Kinesis 数据生成器模板。选择**下一步**。

1. 在 **指定堆栈详细信息**页面上，输入 Cognito 用户的用户名和密码。选择 **下一步**。

1. 在 **配置堆栈选项** 页面上，请选择 **下一步**。

1. 在 **Review Kinesis-Data-Generator-Cognito-User** 页面中，选择**我确认 AWS CloudFormation 可能会创建 IAM 资源**。 复选框。选择**创建堆栈**。

1. 等待 CloudFormation 堆栈完成创建。**堆栈完成后，在控制台中打开 **Kinesis-Data-Generator-Cognito-User 堆栈，然后**选择输出选项卡。 CloudFormation **打开为**KinesisDataGeneratorUrl**输出值列出的 URL。

1. 在 **Amazon Kinesis 数据生成器**页面中，使用您在步骤 4 中创建的凭证登录。

1. 在下一页，提供以下值：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/example-notebook-streams.html)

   对于 “**记录模板**”，粘贴以下代码：

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. 选择 “**发送数据**”。

1. 生成器会将数据发送到 Kinesis 数据流。

   在完成下一部分的同时，让发电机继续运行。

## 测试您的 Studio 笔记本
<a name="example-notebook-streams-test"></a>

在本节中，您将使用 Studio 笔记本来查询 Kinesis 数据流中的数据。

1. [在家打开适用于 Apache Flink 的托管服务控制台？ https://console.aws.amazon.com/managed-flink/ region=us](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)-east-1\$1/应用程序/仪表板。

1. 在 **Managed Service for Apache Flink 应用程序**页面上，选择 **Studio 笔记本**选项卡。选择 **MyNotebook**。

1. 在**MyNotebook**页面中，选择 “在 **Apache 齐柏林飞艇中打开**”。

   Apache Zeppelin 接口会在新选项卡中打开。

1. 在《**欢迎来到齐柏**林飞艇》中！ 页面上，选择**齐柏林飞艇笔记**。

1. 在 **Zeppelin Note 页面中**，在新笔记中输入以下查询：

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   选择运行图标。

   片刻之后，注释将显示来自 Kinesis 数据流的数据。

要打开应用程序的 Apache Flink 控制面板以查看操作方面，请选择 **FLINK JOB**。有关 Flink 控制面板的更多信息，请参阅《[Managed Service for Apache Flink [开发者指南》中的 Apache](https://docs.aws.amazon.com/) Flink 控制面板](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)。

有关 Flink Streaming SQL 查询的更多示例，请参阅 [Apache Fl](https://nightlies.apache.org/flink/flink-docs-release-1.15/) ink 文档中的[查询](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

# 使用 Amazon MSK 创建 Studio 笔记本
<a name="example-notebook-msk"></a>

本教程描述如何创建使用 Amazon MSK 集群作为源的 Studio 笔记本。

**Topics**
+ [设置 Amazon MSK 集群](#example-notebook-msk-setup)
+ [将 NAT 网关添加到您的 VPC](#example-notebook-msk-nat)
+ [创建 AWS Glue 连接和表](#example-notebook-msk-glue)
+ [使用 Amazon MSK 创建 Studio 笔记本](#example-notebook-msk-create)
+ [向您的Amazon MSK 集群发送数据](#example-notebook-msk-send)
+ [测试您的 Studio 笔记本](#example-notebook-msk-test)

## 设置 Amazon MSK 集群
<a name="example-notebook-msk-setup"></a>

在此教程中，您需要一个允许纯文本访问的 Amazon MSK 集群。如果您尚未设置 Amazon MSK 集群，请按照[使用亚马逊 MSK 入门](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)教程创建亚马逊 VPC、亚马逊 MSK 集群、主题和亚马逊 EC2 客户端实例。

在学习教程时，执行以下操作：
+ 在[步骤 3：创建 Amazon MSK 集群](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html)中，在步骤 4 中，将`ClientBroker`值从`TLS`更改为**PLAINTEXT**。

## 将 NAT 网关添加到您的 VPC
<a name="example-notebook-msk-nat"></a>

如果您按照[使用 Amazon MSK 入门教程创建了 Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) 集群，或者您的现有 Amazon VPC 还没有用于其私有子网的 NAT 网关，则必须将 NAT 网关添加到您的 Amazon VPC 中。下图演示了架构。

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/images/vpc_05.png)


要为您的 Amazon VPC 创建 NAT 网关，请执行以下操作：

1. 打开位于 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 的 Amazon VPC 控制台。

1. 从左侧导航栏中选择 **NAT 网关**。

1. 在 **NAT 网关**页面上，选择**创建 NAT 网关**。

1. 在**创建 NAT 网关**页面上，提供以下值：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/example-notebook-msk.html)

   选择 **Create NAT Gateway**（创建 NAT 网关）。

1. 在左侧导航栏中，选择 **路由表**。

1. 选择 **Create Route Table**。

1. 在 **创建(路由表)** 页面上，提供以下信息：
   + **名称标签：****ZeppelinRouteTable**
   + **VPC**：选择您的 VPC（例如 **AWS KafkaTutorialVPC**）。

   选择**创建**。

1. 在路由表列表中，选择**ZeppelinRouteTable**。选择 **路由**选项卡，然后选择 **编辑路由**。

1. 在**编辑路由**页面上，选择**添加路由**。

1. 在 ******目标位置**字段，输入**0.0.0.0/0**。对于**目标**，选择 **NAT 网关**ZeppelinGateway****。选择 **保存路由**。选择 **关闭**。

1. 在 “路由表” 页面上，**ZeppelinRouteTable**选中，选择 “**子网关联**” 选项卡。选择**编辑子网关联**。

1. 在**编辑子网关联**页面中，选择 **AWS KafkaTutorialSubnet2** 和 **AWS KafkaTutorialSubnet3**。选择**保存**。

## 创建 AWS Glue 连接和表
<a name="example-notebook-msk-glue"></a>

您的 Studio 笔记本使用[AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)数据库来存储有关您的Amazon MSK 数据来源的元数据。在本节中，您将创建一个描述如何访问您的 Amazon MSK 集群的 AWS Glue 连接，以及一个描述如何将数据源中的数据呈现给客户端（例如 Studio 笔记本）的 AWS Glue 表。

**创建连接**

1. 登录 AWS 管理控制台 并打开 AWS Glue 控制台，网址为[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)。

1. 如果您还没有 AWS Glue 数据库，请从左侧导航栏中选择 “**数据库**”。选择 **添加数据库**。在“**添加数据库**” 窗口中，输入 **default**“**数据库名称**”。选择 **创建**。

1. 从左侧导航菜单中，选择**连接**。选择 **添加连接**。

1. 在 “**添加连接**” 窗口中，提供以下值：
   + 对于 **连接名称**，输入 **ZeppelinConnection**。
   + 对于 **Connection type (连接类型)**，选择 **Kafka**。
   + 对于 **Kafka 引导服务器 URLs**，请为您的集群提供引导代理字符串。您可以从 MSK 控制台或通过输入以下 CLI 命令来获取引导程序代理：

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + 取消选中 “**需要 SSL 连接**” 复选框。

   选择 **下一步**。

1. 在 **VPC** 页面上，提供以下值：
   + 对于 **VPC**，请选择您的 VPC 的名称（例如 ** AWS KafkaTutorialVPC**。）
   + 对于**子网**，选择 **AWS KafkaTutorialSubnet2**。
   + 对于**安全组**，请选择所有可用的组。

   选择 **下一步**。

1. 在“**连接属性**/**连接访问权限**” 页中，选择“**完成**”

**创建表**
**注意**  
您可以按照以下步骤所述手动创建表，也可以在 Apache Zeppelin 的笔记本中使用 Managed Service for Apache Flink创建表连接器代码，通过 DDL 语句创建表。然后，您可以签入 AWS Glue 以确保表格已正确创建。

1. 在左侧导航栏中，选择 **表**。在 “**表**” 页中，选择 “**添加表**”，“**手动添加表**”。

1. 在**设置表的属性**页面中，输入**stock****表格名称**。请务必选择之前创建的数据库。选择 **下一步**。

1. 在**添加数据存储**页面中，选择 **Kafka**。在**主题名称**中，输入您的主题名称（例如 **AWS KafkaTutorialTopic**）。对于 “**连接**”，选择**ZeppelinConnection**。

1. 在**分类**页面中，选择 **JSON**。选择 **下一步**。

1. 在**定义架构**页面中，选择 Add Column 以添加列。添加具有以下属性的列：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/example-notebook-msk.html)

   选择**下一步**。

1. 在下一页上，验证您的设置，然后选择**完成**。

1. 从表列表中选择新创建的表。

1. 选择**编辑表格**并添加以下属性：
   + 键：`managed-flink.proctime`，值：`proctime`
   + 键：`flink.properties.group.id`，值：`test-consumer-group`
   + 键：`flink.properties.auto.offset.reset`，值：`latest`
   + 键：`classification`，值：`json`

   如果没有这些键/值对，Flink 笔记本就会遇到错误。

1. 选择**应用**。

## 使用 Amazon MSK 创建 Studio 笔记本
<a name="example-notebook-msk-create"></a>

现在，您已经创建了应用程序使用的资源，接下来就可以创建自己的 Studio 笔记本了。

**Topics**
+ [使用创建 Studio 笔记本 AWS 管理控制台](#example-notebook-create-msk-console)
+ [使用创建 Studio 笔记本 AWS CLI](#example-notebook-msk-create-api)

**注意**  
您也可以从 Amazon MSK 控制台创建 Studio 笔记本，方法是选择现有集群，然后选择 “**实时处理数据**”。

### 使用创建 Studio 笔记本 AWS 管理控制台
<a name="example-notebook-create-msk-console"></a>

1. [在家打开适用于 Apache Flink 的托管服务控制台？ https://console.aws.amazon.com/managed-flink/ region=us](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)-east-1\$1/应用程序/仪表板。

1. 在 **Managed Service for Apache Flink 应用程序**页面中，选择 **Studio** 选项卡。选择**创建 Studio 笔记本**。
**注意**  
要从Amazon MSK 或 Kinesis Data Streams 控制台创建 Studio 笔记本，请选择您输入的Amazon MSK 集群或 Kinesis 数据流，然后**选择“**实时处理数据”。

1. 在 **创建笔记本实例** 页面上，提供以下信息：
   + 输入 ****MyNotebook** Studio 笔记本的名称**。
   + 为 **Glue 数据库AWS 选择****默认值**。

   选择**创建 Studio 笔记本**。

1. 在该**MyNotebook**页面中，选择 “**配置**” 选项卡。在 **联网** 部分中，选择 **编辑**。

1. 在**编辑网络连接 MyNotebook**页面中，选择**基于 Amazon MSK 集群的 VPC 配置**。为Amazon MSK 集群选择您的**Amazon MSK** 集群。选择 **Save changes（保存更改）**。

1. 在**MyNotebook**页面中，选择 “**运行**”。等待“**状态”显示为“****正在运行**”。

### 使用创建 Studio 笔记本 AWS CLI
<a name="example-notebook-msk-create-api"></a>

要使用创建 Studio 笔记本 AWS CLI，请执行以下操作：

1. 验证您具有以下信息：创建应用程序时，您需要用到这些值。
   + 您的 账户 ID
   + 包含您的 Amazon MSK 集群的 Amazon VPC 的子网 IDs 和安全组 ID。

1. 创建以下内容的名为 `create.json` 的文件。确保将占位符值替换为您自己的信息。

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 要创建应用程序，请运行以下命令：

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 命令完成后，您应该会看到类似于以下内容的输出，其中显示了新 Studio 笔记本的详细信息：

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 要开始应用程序，请运行以下命令。请将占位符值替换为账户 ID。

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## 向您的Amazon MSK 集群发送数据
<a name="example-notebook-msk-send"></a>

在本节中，你将在亚马逊 EC2 客户端中运行 Python 脚本，将数据发送到你的亚马逊 MSK 数据源。

1. 连接到您的亚马逊 EC2 客户端。

1. 运行以下命令安装 Python 版本 3、Pip 和 Kafka for Python 软件包，然后确认操作：

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. 通过输入以下命令 AWS CLI 在您的客户端计算机上进行配置：

   ```
   aws configure
   ```

   提供您的账户凭证，**us-east-1**并提供`region`。

1. 创建以下内容的名为 `stock.py` 的文件。将示例值替换为您的 Amazon MSK 集群的 Bootstrap Brokers 字符串，如果您的主题不是，请更新主题名称：**AWS KafkaTutorialTopic**

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. 使用以下命令运行脚本：

   ```
   $ python3 stock.py
   ```

1. 完成以下部分后，请让脚本继续运行。

## 测试您的 Studio 笔记本
<a name="example-notebook-msk-test"></a>

在本节中，您将使用 Studio 笔记本查询来自 Amazon MSK 集群的数据。

1. [在家打开适用于 Apache Flink 的托管服务控制台？ https://console.aws.amazon.com/managed-flink/ region=us](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)-east-1\$1/应用程序/仪表板。

1. 在 **Managed Service for Apache Flink 应用程序**页面上，选择 **Studio 笔记本**选项卡。选择 **MyNotebook**。

1. 在**MyNotebook**页面中，选择 “在 **Apache 齐柏林飞艇中打开**”。

   Apache Zeppelin 接口会在新选项卡中打开。

1. 在**欢迎来到 Zeppelin\$1**页面上，选择**Zeppelin 新笔记**。

1. 在 **Zeppelin Note **页面上，在新笔记中输入以下查询：

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   选择运行图标。

   该应用程序显示来自 Amazon MSK 集群的数据。

要打开应用程序的 Apache Flink 仪表板以查看操作方面，请选择 **FLINK JOB**。有关 Flink 控制面板的更多信息，请参阅《[Managed Service for Apache Flink [开发者指南》中的 Apache](https://docs.aws.amazon.com/) Flink 控制面板](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)。

有关 Flink Streaming SQL 查询的更多示例，请参阅 [Apache Fl](https://nightlies.apache.org/flink/flink-docs-release-1.15/) ink 文档中的[查询](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

# 清除您的应用程序和依赖资源
<a name="example-notebook-cleanup"></a>

## 删除您的 Studio 笔记本
<a name="example-notebook-cleanup-app"></a>

1. 打开 Managed Service for Apache Flink 控制台。

1. 选择 **MyNotebook**。

1. 选择**操作**，然后选择**删除**。

## 删除您的 AWS Glue 数据库和连接
<a name="example-notebook-cleanup-glue"></a>

1. 打开 AWS Glue 控制台，网址为[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)。

1. 从左侧导航栏中选择 **数据库**。选中 “**默认**” 旁边的复选框将其选中。选择**操作**，**删除数据库**。确认您的选择。

1. 从左侧导航菜单中，选择**连接**。选中旁边的复选框**ZeppelinConnection**将其选中。选择**操作**，**删除连接**。确认您的选择。

## 删除 IAM 角色和策略
<a name="example-notebook-msk-cleanup-iam"></a>

1. 使用 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 打开 IAM 控制台。

1. 从左侧导航菜单中，选择 **角色**。

1. 使用搜索栏搜索**ZeppelinRole**角色。

1. 选择**ZeppelinRole**角色。选择 **删除角色**。确认删除操作。

## 删除您的 CloudWatch 日志组
<a name="example-notebook-cleanup-cw"></a>

当您使用控制台创建应用程序时，控制台会为您创建 CloudWatch 日志组和日志流。如果您使用创建应用程序，则没有日志组和流 AWS CLI。

1. 打开 CloudWatch 控制台，网址为[https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)。

1. 从左侧导航菜单中，选择 **日志组**。

1. 选择**/AWS/KinesisAnalytics/MyNotebook**日志组。

1. 依次选择 **Actions**（操作）和 **Delete log group(s)**（删除日志组）。确认删除操作。

## 清除 Kinesis Data Streams 资源
<a name="example-notebook-cleanup-streams"></a>

要删除您的 Kinesis stream，请打开 Kinesis Data Streams 控制台，选择您的 Kinesis stream，然后选择**操作**、**删除**。

## 清理 MSK 资源
<a name="example-notebook-cleanup-msk"></a>

如果您为本教程创建了 Amazon MSK 集群，请执行本部分中的步骤。本部分包含清理Amazon EC2 客户端实例、Amazon VPC 和 Amazon MSK 集群的说明。

### 删除您的Amazon MSK 集群
<a name="example-notebook-msk-cleanup-msk"></a>

如果您为本教程创建了 Amazon MSK 集群，请执行这些步骤。

1. 在[https://console.aws.amazon.com/msk/家打开亚马逊 MSK 控制台？ region=us](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)-east-1\$1/home/。

1. 选择 **AWS KafkaTutorialCluster**。选择**删除**。**delete**在出现的窗口中输入并确认您的选择。

### 终止您的客户端实例
<a name="example-notebook-msk-cleanup-client"></a>

如果您为本教程创建了 Amazon EC2 客户端实例，请按照以下步骤操作。

1. 打开位于 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 的 Amazon EC2 控制台。

1. 从左侧导航栏中选择**实例**。

1. 选中旁边的复选框**ZeppelinClient**将其选中。

1. 依次选择**实例状态**，**终止实例**。

### 删除 Amazon VPC
<a name="example-notebook-msk-cleanup-vpc"></a>

如果您为本教程创建了 Amazon VPC，请按照以下步骤操作。

1. 打开位于 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 的 Amazon EC2 控制台。

1. 从左侧导航栏中选择 **“网络接口**”。

1. 在搜索栏输入您的 VPC ID，然后按输入进行搜索。

1. 选中表格标题中的复选框以选择所有显示的网络接口。

1. 依次选择**操作**、**分离**。在出现的窗口中，在 “**强制分离**”下选择 “**启用**”。选择 “**分离**”，然后等待所有网络接口都变为 “**可用**” 状态。

1. 选中表格标题中的复选框，以再次选择所有显示的网络接口。

1. 依次选择**操作**和**删除**。确认该操作。

1. 打开位于 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 的 Amazon VPC 控制台。

1. 选择 **AWS KafkaTutorialVPC**。依次选择 **操作** 和 **删除 VPC**。输入**delete**并确认删除。

# 教程：将 Studio 笔记本部署为具有持久状态的 Managed Service for Apache Flink 应用程序。
<a name="example-notebook-deploy"></a>

以下教程演示了如何将 Studio Notebook 部署为具有持久状态的 Managed Service for Apache Flink 应用程序。

**Topics**
+ [满足先决条件](#example-notebook-durable-setup)
+ [使用部署具有持久状态的应用程序 AWS 管理控制台](#example-notebook-deploy-console)
+ [使用部署具有持久状态的应用程序 AWS CLI](#example-notebook-deploy-cli)

## 满足先决条件
<a name="example-notebook-durable-setup"></a>

使用 Kinesis Data Streams 或 Amazon MSK 按照[教程：在 Managed Service for Apache Flink 中创建 Studio 笔记本](example-notebook.md)操作创建新的 Studio 笔记本。为 Studio 笔记本命名`ExampleTestDeploy`。

## 使用部署具有持久状态的应用程序 AWS 管理控制台
<a name="example-notebook-deploy-console"></a>

1. 在 “**应用程序代码位置”（控制台中为*可选*）**下添加要存储打包代码的 S3 存储桶位置。这样就可以直接从笔记本部署和运行应用程序的步骤。

1. 向应用程序角色添加所需的权限，以启用您用于读取和写入 Amazon S3 存储桶的角色，以及启动Managed Service for Apache Flink应用程序：
   + 亚马逊 3 FullAccess
   + 亚马逊托管-flinkFullAccess
   + 访问您的来源、目的地 VPCs 以及（如果适用）。有关更多信息，请参阅 [审核 Studio 笔记本的 IAM 权限](how-zeppelin-iam.md)。

1. 使用下面的示例代码：

   ```
   %flink.ssql(type=update) 
   CREATE TABLE exampleoutput (
     'ticket' VARCHAR,
     'price' DOUBLE
   )
   WITH (
     'connector' = 'kinesis',
     'stream' = 'ExampleOutputStream',
     'aws.region' = 'us-east-1',
     'scan.stream.initpos' = 'LATEST',
     'format' = 'json'
   );
   
   INSERT INTO exampleoutput SELECT ticker, price FROM exampleinputstream
   ```

1. 启动此功能后，您将在笔记本中每张笔记的右上角看到一个新的下拉列表，上面写着笔记本的名称。您可执行以下操作：
   + 在中查看 Studio 笔记本的设置 AWS 管理控制台。
   + 制作您的 Zeppelin Note 并将其导出到 Amazon S3。此时，请为您的应用程序提供一个名称，然后选择 “**生成并导出**”。导出完成后，您将收到通知。
   + 如果需要，您可以在 Amazon S3 中查看和运行对可执行文件的任何其他测试。
   + 构建完成后，您将能够将代码部署为具有持久状态和自动扩展功能的 Kinesis 流媒体应用程序。
   + 使用下拉列表并选择**将 Zeppelin Note 部署为 Kinesis 流式应用程序**。查看应用程序名称并选择**通过 AWS 控制台部署**。
   + 这将引导您进入为 Apache Flink 应用程序创建托管服务的 AWS 管理控制台 页面。请注意，应用程序名称、并行度、代码位置、默认 Glue DB、VPC（如果适用）和 IAM 角色已预先填充。验证 IAM 角色是否具有访问您的源和目标所需的权限。默认情况下，快照处于启用状态，以实现持久的应用程序状态管理。
   + 选择**创建应用程序**。
   + 您可以选择**配置**和修改任何设置，然后选择**运行**以启动您的流媒体应用程序。

## 使用部署具有持久状态的应用程序 AWS CLI
<a name="example-notebook-deploy-cli"></a>

要使用部署应用程序 AWS CLI，您必须更新 AWS CLI 以使用与 Beta 2 信息一起提供的服务模型。有关如何使用更新的服务模型的信息，请参阅[完成 先决条件满足先决条件](example-notebook.md#example-notebook-setup)。

以下示例代码将创建一个新的 Studio 笔记本：

```
aws kinesisanalyticsv2 create-application \
     --application-name <app-name> \
     --runtime-environment ZEPPELIN-FLINK-3_0 \
     --application-mode INTERACTIVE \
     --service-execution-role <iam-role>
     --application-configuration '{ 
       "ZeppelinApplicationConfiguration": { 
         "CatalogConfiguration": { 
           "GlueDataCatalogConfiguration": { 
             "DatabaseARN": "arn:aws:glue:us-east-1:<account>:database/<glue-database-name>" 
           } 
         } 
       },
       "FlinkApplicationConfiguration": {
         "ParallelismConfiguration": {
           "ConfigurationType": "CUSTOM",
           "Parallelism": 4,
           "ParallelismPerKPU": 4
         }
       },
       "DeployAsApplicationConfiguration": {
            "S3ContentLocation": { 
               "BucketARN": "arn:aws:s3:::<s3bucket>",
               "BasePath": "/something/"
            }
        },
       "VpcConfigurations": [
         {
           "SecurityGroupIds": [
             "<security-group>"
           ],
           "SubnetIds": [
             "<subnet-1>",
             "<subnet-2>"
           ]
         }
       ]
     }' \
     --region us-east-1
```

以下代码示例将启动一个新的 Studio 笔记本：

```
aws kinesisanalyticsv2 start-application \
    --application-name <app-name> \
    --region us-east-1 \
    --no-verify-ssl
```

以下代码返回应用程序的 Apache Zeppelin 笔记本页面的 URL：

```
aws kinesisanalyticsv2 create-application-presigned-url \
    --application-name <app-name> \
    --url-type ZEPPELIN_UI_URL \

    --region us-east-1 \
    --no-verify-ssl
```

# 查看用于分析 Studio 笔记本中数据的示例查询
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [使用 Amazon MSK/Apache Kafka 创建表格](#how-zeppelin-examples-creating-tables)
+ [使用 Kinesis 创建表](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [查询滚动窗口](#how-zeppelin-examples-tumbling)
+ [查询滑动窗口](#how-zeppelin-examples-sliding)
+ [使用交互式 SQL](#how-zeppelin-examples-interactive-sql)
+ [使用 BlackHole SQL 连接器](#how-zeppelin-examples-blackhole-connector-sql)
+ [使用 Scala 生成示例数据](#notebook-example-data-generator)
+ [使用交互式 Scala](#notebook-example-interactive-scala)
+ [使用交互式 Python](#notebook-example-interactive-python)
+ [结合使用交互式 Python、SQL 和 Scala](#notebook-example-interactive-pythonsqlscala)
+ [使用跨账户 Kinesis 数据流](#notebook-example-crossaccount-kds)

有关 Apache Flink SQL 查询设置的信息，请参阅用于交互式数据分析的[齐柏林飞艇笔记本上的 Flink](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html)。

要在 Apache Flink 控制面板中查看您的应用程序，请在应用程序的 **** Zeppelin Note **页面中选择 FLINK ** JOB。

有关窗口查询的更多信息，请参阅 [Apache Flink 文档](https://nightlies.apache.org/flink/flink-docs-release-1.15/)中的 [Windows](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html)。

有关 Apache Flink Streaming SQL 查询的更多示例，请参阅 [Apache](https://nightlies.apache.org/flink/flink-docs-release-1.15/) Flink 文档中的[查询](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

## 使用 Amazon MSK/Apache Kafka 创建表格
<a name="how-zeppelin-examples-creating-tables"></a>

您可以使用带有 Managed Service for Apache Flink Studio 的 Amazon MSK Flink 连接器通过纯文本、SSL 或 IAM 身份验证对您的连接进行身份验证。根据您的要求使用特定属性创建表。

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

您可以在 [Apache Kafka SQL](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/) Connector 中将它们与其他属性结合使用。

## 使用 Kinesis 创建表
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

在以下示例中，您将使用 Kinesis 创建表：

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

有关您可以使用的其他属性的更多信息，请参阅 [Amazon Kinesis Data Streams SQL 连接器](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/)。

## 查询滚动窗口
<a name="how-zeppelin-examples-tumbling"></a>

以下 Flink Streaming SQL 查询从表中选择每个五秒钟的滚动窗口中的最高价格：`ZeppelinTopic`

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## 查询滑动窗口
<a name="how-zeppelin-examples-sliding"></a>

以下 Apache Flink Streaming SQL 查询从表格中选择每个五秒钟滑动窗口中的最高价格：`ZeppelinTopic`

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## 使用交互式 SQL
<a name="how-zeppelin-examples-interactive-sql"></a>

此示例打印事件时间和处理时间的最大值以及键值表中的值之和。确保您有[使用 Scala 生成示例数据](#notebook-example-data-generator)正在运行的示例数据生成脚本。要在 Studio 笔记本中尝试其他 SQL 查询，例如筛选和联接，请参阅 Apache Flink 文档中的 Apache Flink 文档：[查询](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## 使用 BlackHole SQL 连接器
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

 BlackHole SQL 连接器不需要您创建 Kinesis 数据流或 Amazon MSK 集群来测试您的查询。有关 BlackHole SQL 连接器的信息，请参阅 Apache Flink 文档中的 [BlackHole SQL 连接器](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html)。在此示例中，默认目录是内存中的目录。

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## 使用 Scala 生成示例数据
<a name="notebook-example-data-generator"></a>

此示例使用 Scala 生成示例数据。您可以使用此示例数据来测试各种查询。使用 create table 语句创建键值表。

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## 使用交互式 Scala
<a name="notebook-example-interactive-scala"></a>

这是[使用交互式 SQL](#how-zeppelin-examples-interactive-sql) 的 Scala 翻译。有关更多 Scala 示例，请参阅 Apache Flink 文档中的[表 API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html)。

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## 使用交互式 Python
<a name="notebook-example-interactive-python"></a>

这是[使用交互式 SQL](#how-zeppelin-examples-interactive-sql) 的 Python 翻译。有关更多 Python 示例，请参阅 Apache Flink 文档中的[表 API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html)。

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## 结合使用交互式 Python、SQL 和 Scala
<a name="notebook-example-interactive-pythonsqlscala"></a>

您可以在笔记本中使用 SQL、Python 和 Scala 的任意组合进行交互式分析。在计划部署为具有持久状态的应用程序的 Studio 笔记本中，可以组合使用 SQL 和 Scala。此示例向您展示了被忽略的部分以及那些在应用程序中部署的具有持久状态的部分。

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## 使用跨账户 Kinesis 数据流
<a name="notebook-example-crossaccount-kds"></a>

要使用除拥有 Studio 笔记本的账户之外的账户中的 Kinesis 数据流，请在运行 Studio 笔记本的账户中创建服务执行角色，在拥有数据流的账户中创建角色信任策略。在创建表 DDL 语句的 Kinesis 连接器中使用`aws.credentials.provider`、`aws.credentials.role.arn`和`aws.credentials.role.sessionName`，根据数据流创建表。

为 Studio 笔记本帐户使用以下服务执行角色。

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

对数据流帐户使用`AmazonKinesisFullAccess`策略和以下角色信任策略。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

使用以下段落作为创建表语句。

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```