

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Kinesis Data Streams 创建 Studio 笔记本
<a name="example-notebook-streams"></a>

本教程描述如何创建使用 Kinesis 数据流作为源的 Studio 笔记本。

**Topics**
+ [完成 先决条件](#example-notebook-streams-setup)
+ [创建 AWS Glue 表格](#example-notebook-streams-glue)
+ [使用 Kinesis Data Streams 创建 Studio 笔记本](#example-notebook-streams-create)
+ [将数据发送到您的 Kinesis 数据流](#example-notebook-streams-send)
+ [测试您的 Studio 笔记本](#example-notebook-streams-test)

## 完成 先决条件
<a name="example-notebook-streams-setup"></a>

在创建 Studio 笔记本之前，请创建 Kinesis 数据流 (`ExampleInputStream`) 。您的应用程序使用此流作为应用程序源。

可以使用 Amazon Kinesis 控制台或以下 AWS CLI 命令创建这些流。有关控制台说明，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建和更新数据流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)。为流命名**ExampleInputStream**并将**打开的分片数**设置为**1**。

要使用创建直播 (`ExampleInputStream`) AWS CLI，请使用以下 Amazon Kinesis 命令`create-stream` AWS CLI 。

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## 创建 AWS Glue 表格
<a name="example-notebook-streams-glue"></a>

您的 Studio 笔记本使用[AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)数据库来存储有关您的 Kinesis 数据流数据来源的元数据。

**注意**  
您可以先手动创建数据库，也可以让 Managed Service for Apache Flink 在创建笔记本时为您创建数据库。同样，您可以按照本节所述手动创建表，也可以在 Apache Zeppelin 的笔记本中使用 Managed Service for Apache Flink 创建表连接器代码，通过 DDL 语句创建表。然后，您可以签入 AWS Glue 以确保表格已正确创建。

**创建表**

1. 登录 AWS 管理控制台 并打开 AWS Glue 控制台，网址为[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)。

1. 如果您还没有 AWS Glue 数据库，请从左侧导航栏中选择 “**数据库**”。选择 **添加数据库**。在“**添加数据库**” 窗口中，输入 **default**“**数据库名称**”。选择 **创建**。

1. 在左侧导航栏中，选择 **表**。在 “**表**” 页中，选择 “**添加表**”，“**手动添加表**”。

1. 在**设置表的属性**页面中，输入**stock****表格名称**。请务必选择之前创建的数据库。选择 **下一步**。

1. 在**添加数据存储**页面中，选择 **Kinesis**。对于**直播名称**，请输入**ExampleInputStream**。对于 **Kinesis 来源网址**，请选择 Enter。**https://kinesis.us-east-1.amazonaws.com**如果您复制并粘贴 **Kinesis 源网址**，请务必删除所有前导或尾随空格。选择 **下一步**。

1. 在**分类**页面中，选择 **JSON**。选择 **下一步**。

1. 在**定义架构**页面中，选择 Add Column 以添加列。添加具有以下属性的列：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/example-notebook-streams.html)

   选择**下一步**。

1. 在下一页上，验证您的设置，然后选择**完成**。

1. 从表列表中选择新创建的表。

1. 选择 **“编辑表”**，然后添加包含键`managed-flink.proctime`和值的属性`proctime`。

1. 选择**应用**。

## 使用 Kinesis Data Streams 创建 Studio 笔记本
<a name="example-notebook-streams-create"></a>

现在，您已经创建了应用程序使用的资源，接下来就可以创建自己的 Studio 笔记本了。

**Topics**
+ [使用创建 Studio 笔记本 AWS 管理控制台](#example-notebook-create-streams-console)
+ [使用创建 Studio 笔记本 AWS CLI](#example-notebook-msk-create-api)

### 使用创建 Studio 笔记本 AWS 管理控制台
<a name="example-notebook-create-streams-console"></a>

1. [在家打开适用于 Apache Flink 的托管服务控制台？ https://console.aws.amazon.com/managed-flink/ region=us](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)-east-1\$1/应用程序/仪表板。

1. 在 **Managed Service for Apache Flink 应用程序**页面中，选择 **Studio** 选项卡。选择**创建 Studio 笔记本**。
**注意**  
您也可以从 Amazon MSK 或 Kinesis Data Streams 控制台创建 Studio 笔记本，方法是选择输入的 Amazon MSK 集群或 Kinesis 数据流，然后**选择** “实时处理数据”。

1. 在 **创建笔记本实例页面上**，提供以下信息：
   + 输入笔记本**MyNotebook**的名称。
   + 为 Glue **数据库AWS 选择****默认值**。

   选择**创建 Studio 笔记本**。

1. 在**MyNotebook**页面中，选择 “**运行**”。等待“**状态”显示为“****正在运行**”。笔记本电脑运行时会产生费用。

### 使用创建 Studio 笔记本 AWS CLI
<a name="example-notebook-msk-create-api"></a>

要使用创建 Studio 笔记本 AWS CLI，请执行以下操作：

1. 验证账户 ID。创建应用程序时，您需要用到此值。

1. 创建角色`arn:aws:iam::AccountID:role/ZeppelinRole`并通过控制台向自动创建的角色添加以下权限。

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. 创建以下内容的名为 `create.json` 的文件。确保将占位符值替换为您自己的信息。

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 要创建应用程序，请运行以下命令：

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 命令完成后，您会看到显示新 Studio 笔记本详细信息的输出。下面是输出的一个示例。

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 要开始应用程序，请运行以下命令。请将占位符值替换为账户 ID。

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## 将数据发送到您的 Kinesis 数据流
<a name="example-notebook-streams-send"></a>

要将测试数据发送到您的 Kinesis 数据流，请执行以下操作：

1. 打开 [Kinesis Data Generator (KDG)](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)。

1. 选择**使用创建 Cognito 用户**。 CloudFormation

1.  CloudFormation 控制台随即打开 Kinesis 数据生成器模板。选择**下一步**。

1. 在 **指定堆栈详细信息**页面上，输入 Cognito 用户的用户名和密码。选择 **下一步**。

1. 在 **配置堆栈选项** 页面上，请选择 **下一步**。

1. 在 **Review Kinesis-Data-Generator-Cognito-User** 页面中，选择**我确认 AWS CloudFormation 可能会创建 IAM 资源**。 复选框。选择**创建堆栈**。

1. 等待 CloudFormation 堆栈完成创建。**堆栈完成后，在控制台中打开 **Kinesis-Data-Generator-Cognito-User 堆栈，然后**选择输出选项卡。 CloudFormation **打开为**KinesisDataGeneratorUrl**输出值列出的 URL。

1. 在 **Amazon Kinesis 数据生成器**页面中，使用您在步骤 4 中创建的凭证登录。

1. 在下一页，提供以下值：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/example-notebook-streams.html)

   对于 “**记录模板**”，粘贴以下代码：

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. 选择 “**发送数据**”。

1. 生成器会将数据发送到 Kinesis 数据流。

   在完成下一部分的同时，让发电机继续运行。

## 测试您的 Studio 笔记本
<a name="example-notebook-streams-test"></a>

在本节中，您将使用 Studio 笔记本来查询 Kinesis 数据流中的数据。

1. [在家打开适用于 Apache Flink 的托管服务控制台？ https://console.aws.amazon.com/managed-flink/ region=us](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)-east-1\$1/应用程序/仪表板。

1. 在 **Managed Service for Apache Flink 应用程序**页面上，选择 **Studio 笔记本**选项卡。选择 **MyNotebook**。

1. 在**MyNotebook**页面中，选择 “在 **Apache 齐柏林飞艇中打开**”。

   Apache Zeppelin 接口会在新选项卡中打开。

1. 在《**欢迎来到齐柏**林飞艇》中！ 页面上，选择**齐柏林飞艇笔记**。

1. 在 **Zeppelin Note 页面中**，在新笔记中输入以下查询：

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   选择运行图标。

   片刻之后，注释将显示来自 Kinesis 数据流的数据。

要打开应用程序的 Apache Flink 控制面板以查看操作方面，请选择 **FLINK JOB**。有关 Flink 控制面板的更多信息，请参阅《[Managed Service for Apache Flink [开发者指南》中的 Apache](https://docs.aws.amazon.com/) Flink 控制面板](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)。

有关 Flink Streaming SQL 查询的更多示例，请参阅 [Apache Fl](https://nightlies.apache.org/flink/flink-docs-release-1.15/) ink 文档中的[查询](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。