

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Amazon MSK 创建 Studio 笔记本
<a name="example-notebook-msk"></a>

本教程描述如何创建使用 Amazon MSK 集群作为源的 Studio 笔记本。

**Topics**
+ [设置 Amazon MSK 集群](#example-notebook-msk-setup)
+ [将 NAT 网关添加到您的 VPC](#example-notebook-msk-nat)
+ [创建 AWS Glue 连接和表](#example-notebook-msk-glue)
+ [使用 Amazon MSK 创建 Studio 笔记本](#example-notebook-msk-create)
+ [向您的Amazon MSK 集群发送数据](#example-notebook-msk-send)
+ [测试您的 Studio 笔记本](#example-notebook-msk-test)

## 设置 Amazon MSK 集群
<a name="example-notebook-msk-setup"></a>

在此教程中，您需要一个允许纯文本访问的 Amazon MSK 集群。如果您尚未设置 Amazon MSK 集群，请按照[使用亚马逊 MSK 入门](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)教程创建亚马逊 VPC、亚马逊 MSK 集群、主题和亚马逊 EC2 客户端实例。

在学习教程时，执行以下操作：
+ 在[步骤 3：创建 Amazon MSK 集群](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html)中，在步骤 4 中，将`ClientBroker`值从`TLS`更改为**PLAINTEXT**。

## 将 NAT 网关添加到您的 VPC
<a name="example-notebook-msk-nat"></a>

如果您按照[使用 Amazon MSK 入门教程创建了 Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) 集群，或者您的现有 Amazon VPC 还没有用于其私有子网的 NAT 网关，则必须将 NAT 网关添加到您的 Amazon VPC 中。下图演示了架构。

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/images/vpc_05.png)


要为您的 Amazon VPC 创建 NAT 网关，请执行以下操作：

1. 打开位于 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 的 Amazon VPC 控制台。

1. 从左侧导航栏中选择 **NAT 网关**。

1. 在 **NAT 网关**页面上，选择**创建 NAT 网关**。

1. 在**创建 NAT 网关**页面上，提供以下值：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/example-notebook-msk.html)

   选择 **Create NAT Gateway**（创建 NAT 网关）。

1. 在左侧导航栏中，选择 **路由表**。

1. 选择 **Create Route Table**。

1. 在 **创建(路由表)** 页面上，提供以下信息：
   + **名称标签：****ZeppelinRouteTable**
   + **VPC**：选择您的 VPC（例如 **AWS KafkaTutorialVPC**）。

   选择**创建**。

1. 在路由表列表中，选择**ZeppelinRouteTable**。选择 **路由**选项卡，然后选择 **编辑路由**。

1. 在**编辑路由**页面上，选择**添加路由**。

1. 在 ******目标位置**字段，输入**0.0.0.0/0**。对于**目标**，选择 **NAT 网关**ZeppelinGateway****。选择 **保存路由**。选择 **关闭**。

1. 在 “路由表” 页面上，**ZeppelinRouteTable**选中，选择 “**子网关联**” 选项卡。选择**编辑子网关联**。

1. 在**编辑子网关联**页面中，选择 **AWS KafkaTutorialSubnet2** 和 **AWS KafkaTutorialSubnet3**。选择**保存**。

## 创建 AWS Glue 连接和表
<a name="example-notebook-msk-glue"></a>

您的 Studio 笔记本使用[AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)数据库来存储有关您的Amazon MSK 数据来源的元数据。在本节中，您将创建一个描述如何访问您的 Amazon MSK 集群的 AWS Glue 连接，以及一个描述如何将数据源中的数据呈现给客户端（例如 Studio 笔记本）的 AWS Glue 表。

**创建连接**

1. 登录 AWS 管理控制台 并打开 AWS Glue 控制台，网址为[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)。

1. 如果您还没有 AWS Glue 数据库，请从左侧导航栏中选择 “**数据库**”。选择 **添加数据库**。在“**添加数据库**” 窗口中，输入 **default**“**数据库名称**”。选择 **创建**。

1. 从左侧导航菜单中，选择**连接**。选择 **添加连接**。

1. 在 “**添加连接**” 窗口中，提供以下值：
   + 对于 **连接名称**，输入 **ZeppelinConnection**。
   + 对于 **Connection type (连接类型)**，选择 **Kafka**。
   + 对于 **Kafka 引导服务器 URLs**，请为您的集群提供引导代理字符串。您可以从 MSK 控制台或通过输入以下 CLI 命令来获取引导程序代理：

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + 取消选中 “**需要 SSL 连接**” 复选框。

   选择 **下一步**。

1. 在 **VPC** 页面上，提供以下值：
   + 对于 **VPC**，请选择您的 VPC 的名称（例如 ** AWS KafkaTutorialVPC**。）
   + 对于**子网**，选择 **AWS KafkaTutorialSubnet2**。
   + 对于**安全组**，请选择所有可用的组。

   选择 **下一步**。

1. 在“**连接属性**/**连接访问权限**” 页中，选择“**完成**”

**创建表**
**注意**  
您可以按照以下步骤所述手动创建表，也可以在 Apache Zeppelin 的笔记本中使用 Managed Service for Apache Flink创建表连接器代码，通过 DDL 语句创建表。然后，您可以签入 AWS Glue 以确保表格已正确创建。

1. 在左侧导航栏中，选择 **表**。在 “**表**” 页中，选择 “**添加表**”，“**手动添加表**”。

1. 在**设置表的属性**页面中，输入**stock****表格名称**。请务必选择之前创建的数据库。选择 **下一步**。

1. 在**添加数据存储**页面中，选择 **Kafka**。在**主题名称**中，输入您的主题名称（例如 **AWS KafkaTutorialTopic**）。对于 “**连接**”，选择**ZeppelinConnection**。

1. 在**分类**页面中，选择 **JSON**。选择 **下一步**。

1. 在**定义架构**页面中，选择 Add Column 以添加列。添加具有以下属性的列：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/example-notebook-msk.html)

   选择**下一步**。

1. 在下一页上，验证您的设置，然后选择**完成**。

1. 从表列表中选择新创建的表。

1. 选择**编辑表格**并添加以下属性：
   + 键：`managed-flink.proctime`，值：`proctime`
   + 键：`flink.properties.group.id`，值：`test-consumer-group`
   + 键：`flink.properties.auto.offset.reset`，值：`latest`
   + 键：`classification`，值：`json`

   如果没有这些键/值对，Flink 笔记本就会遇到错误。

1. 选择**应用**。

## 使用 Amazon MSK 创建 Studio 笔记本
<a name="example-notebook-msk-create"></a>

现在，您已经创建了应用程序使用的资源，接下来就可以创建自己的 Studio 笔记本了。

**Topics**
+ [使用创建 Studio 笔记本 AWS 管理控制台](#example-notebook-create-msk-console)
+ [使用创建 Studio 笔记本 AWS CLI](#example-notebook-msk-create-api)

**注意**  
您也可以从 Amazon MSK 控制台创建 Studio 笔记本，方法是选择现有集群，然后选择 “**实时处理数据**”。

### 使用创建 Studio 笔记本 AWS 管理控制台
<a name="example-notebook-create-msk-console"></a>

1. [在家打开适用于 Apache Flink 的托管服务控制台？ https://console.aws.amazon.com/managed-flink/ region=us](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)-east-1\$1/应用程序/仪表板。

1. 在 **Managed Service for Apache Flink 应用程序**页面中，选择 **Studio** 选项卡。选择**创建 Studio 笔记本**。
**注意**  
要从Amazon MSK 或 Kinesis Data Streams 控制台创建 Studio 笔记本，请选择您输入的Amazon MSK 集群或 Kinesis 数据流，然后**选择“**实时处理数据”。

1. 在 **创建笔记本实例** 页面上，提供以下信息：
   + 输入 ****MyNotebook** Studio 笔记本的名称**。
   + 为 **Glue 数据库AWS 选择****默认值**。

   选择**创建 Studio 笔记本**。

1. 在该**MyNotebook**页面中，选择 “**配置**” 选项卡。在 **联网** 部分中，选择 **编辑**。

1. 在**编辑网络连接 MyNotebook**页面中，选择**基于 Amazon MSK 集群的 VPC 配置**。为Amazon MSK 集群选择您的**Amazon MSK** 集群。选择 **Save changes（保存更改）**。

1. 在**MyNotebook**页面中，选择 “**运行**”。等待“**状态”显示为“****正在运行**”。

### 使用创建 Studio 笔记本 AWS CLI
<a name="example-notebook-msk-create-api"></a>

要使用创建 Studio 笔记本 AWS CLI，请执行以下操作：

1. 验证您具有以下信息：创建应用程序时，您需要用到这些值。
   + 您的 账户 ID
   + 包含您的 Amazon MSK 集群的 Amazon VPC 的子网 IDs 和安全组 ID。

1. 创建以下内容的名为 `create.json` 的文件。确保将占位符值替换为您自己的信息。

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 要创建应用程序，请运行以下命令：

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 命令完成后，您应该会看到类似于以下内容的输出，其中显示了新 Studio 笔记本的详细信息：

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 要开始应用程序，请运行以下命令。请将占位符值替换为账户 ID。

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## 向您的Amazon MSK 集群发送数据
<a name="example-notebook-msk-send"></a>

在本节中，你将在亚马逊 EC2 客户端中运行 Python 脚本，将数据发送到你的亚马逊 MSK 数据源。

1. 连接到您的亚马逊 EC2 客户端。

1. 运行以下命令安装 Python 版本 3、Pip 和 Kafka for Python 软件包，然后确认操作：

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. 通过输入以下命令 AWS CLI 在您的客户端计算机上进行配置：

   ```
   aws configure
   ```

   提供您的账户凭证，**us-east-1**并提供`region`。

1. 创建以下内容的名为 `stock.py` 的文件。将示例值替换为您的 Amazon MSK 集群的 Bootstrap Brokers 字符串，如果您的主题不是，请更新主题名称：**AWS KafkaTutorialTopic**

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. 使用以下命令运行脚本：

   ```
   $ python3 stock.py
   ```

1. 完成以下部分后，请让脚本继续运行。

## 测试您的 Studio 笔记本
<a name="example-notebook-msk-test"></a>

在本节中，您将使用 Studio 笔记本查询来自 Amazon MSK 集群的数据。

1. [在家打开适用于 Apache Flink 的托管服务控制台？ https://console.aws.amazon.com/managed-flink/ region=us](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)-east-1\$1/应用程序/仪表板。

1. 在 **Managed Service for Apache Flink 应用程序**页面上，选择 **Studio 笔记本**选项卡。选择 **MyNotebook**。

1. 在**MyNotebook**页面中，选择 “在 **Apache 齐柏林飞艇中打开**”。

   Apache Zeppelin 接口会在新选项卡中打开。

1. 在**欢迎来到 Zeppelin\$1**页面上，选择**Zeppelin 新笔记**。

1. 在 **Zeppelin Note **页面上，在新笔记中输入以下查询：

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   选择运行图标。

   该应用程序显示来自 Amazon MSK 集群的数据。

要打开应用程序的 Apache Flink 仪表板以查看操作方面，请选择 **FLINK JOB**。有关 Flink 控制面板的更多信息，请参阅《[Managed Service for Apache Flink [开发者指南》中的 Apache](https://docs.aws.amazon.com/) Flink 控制面板](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)。

有关 Flink Streaming SQL 查询的更多示例，请参阅 [Apache Fl](https://nightlies.apache.org/flink/flink-docs-release-1.15/) ink 文档中的[查询](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。