使用 Amazon 创建 Studio 笔记本 MSK - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon 创建 Studio 笔记本 MSK

本教程介绍如何创建使用 Amazon MSK 集群作为源的 Studio 笔记本。

设置 Amazon MSK 集群

在本教程中,您需要一个允许纯文本访问的 Amazon MSK 集群。如果您尚未设置亚马逊MSK集群,请按照亚马逊入门MSK教程创建亚马逊VPC、亚马逊MSK集群、主题和亚马逊EC2客户端实例。

在学习教程时,执行以下操作:

向您的NAT网关添加网关 VPC

如果您按照亚马逊入门MSK教程创建了亚马逊MSK集群,或者您的现有亚马逊还VPC没有用于其私有子网的NAT网关,则必须向您的亚马逊VPC添加NAT网关。下图演示了架构。

AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.

要为您的 Amazon 创建NAT网关VPC,请执行以下操作:

  1. 打开 Amazon VPC 控制台,网址为https://console.aws.amazon.com/vpc/

  2. 从左侧导航栏中选择NAT网关

  3. NAT网关页面上,选择创建NAT网关

  4. 创建NAT网关页面上,提供以下值:

    姓名-可选 ZeppelinGateway
    子网 AWS KafkaTutorialSubnet1
    弹性 IP 分配 ID 选择可用的弹性 IP。如果没有IPs可用的弹性,请选择分配弹性 IP,然后选择控制台创建的 Elasic IP。

    选择创建NAT网关

  5. 在左侧导航栏中,选择 路由表

  6. 选择 Create Route Table

  7. 创建(路由表) 页面上,提供以下信息:

    • 名称标签:ZeppelinRouteTable

    • VPC: 选择你的VPC(例如 AWS KafkaTutorialVPC)。

    选择创建

  8. 在路由表列表中,选择ZeppelinRouteTable。选择 路由选项卡,然后选择 编辑路由

  9. 编辑路由页面上,选择添加路由

  10. 目标位置字段,输入0.0.0.0/0。对于 “目标”,选择 “NAT网关ZeppelinGateway。选择 保存路由。选择 关闭

  11. 在 “路由表” 页面上,ZeppelinRouteTable选中,选择 “子网关联” 选项卡。选择编辑子网关联

  12. 编辑子网关联页面中,选择 AWS KafkaTutorialSubnet2AWS KafkaTutorialSubnet3。选择保存

创建 AWS Glue 连接和表

您的 Studio 笔记本使用AWS Glue数据库来存储有关您的 Amazon MSK 数据源的元数据。在本节中,您将创建一个描述如何访问您的 Amazon MSK 集群的 AWS Glue 连接,以及一个描述如何将数据源中的数据呈现给客户端(例如 Studio 笔记本)的 AWS Glue 表。

创建连接
  1. 登录 AWS Management Console 并打开 AWS Glue 控制台,网址为https://console.aws.amazon.com/glue/

  2. 如果您还没有 AWS Glue 数据库,请从左侧导航栏中选择 “数据库”。选择 添加数据库。在“添加数据库” 窗口中,输入 default数据库名称”。选择 创建

  3. 从左侧导航菜单中,选择连接。选择 添加连接

  4. 在 “添加连接” 窗口中,提供以下值:

    • 对于 连接名称,输入 ZeppelinConnection

    • 对于 Connection type (连接类型),选择 Kafka

    • 对于 Kafka 引导服务器 URLs,请为您的集群提供引导代理字符串。您可以从MSK控制台获取引导代理,也可以通过输入以下CLI命令获取:

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • 取消选中 “需要SSL连接” 复选框。

    选择下一步

  5. 在该VPC页面中,提供以下值:

    • 对于 VPC,请选择您的姓名VPC(例如 AWS KafkaTutorialVPC

    • 对于子网,选择 AWS KafkaTutorialSubnet2

    • 对于安全组,请选择所有可用的组。

    选择 下一步

  6. 在“连接属性/连接访问权限” 页中,选择“完成

创建表
注意

您可以按照以下步骤中的说明手动创建表,也可以在 Apache Zeppelin 的笔记本中使用为 Apache Flink 托管服务创建表连接器代码,通过语句创建表。DDL然后,您可以签入 AWS Glue 以确保表格已正确创建。

  1. 在左侧导航栏中,选择 。在 “” 页中,选择 “添加表”,“手动添加表”。

  2. 设置表的属性页面中,输入stock表格名称。请务必选择之前创建的数据库。选择 下一步

  3. 添加数据存储页面中,选择 Kafka。在主题名称中,输入您的主题名称(例如 AWS KafkaTutorialTopic)。对于 “连接”,选择ZeppelinConnection

  4. 分类页面中,选择JSON。选择下一步

  5. 定义架构页面中,选择 Add Column 以添加列。添加具有以下属性的列:

    列名称 数据类型
    ticker string
    price double

    选择下一步

  6. 在下一页上,验证您的设置,然后选择完成

  7. 从表列表中选择新创建的表。

  8. 选择编辑表格并添加以下属性:

    • 键:managed-flink.proctime,值:proctime

    • 键:flink.properties.group.id,值:test-consumer-group

    • 键:flink.properties.auto.offset.reset,值:latest

    • 键:classification,值:json

    如果没有这些键/值对,Flink 笔记本就会遇到错误。

  9. 选择 应用

使用 Amazon 创建 Studio 笔记本 MSK

现在,您已经创建了应用程序使用的资源,接下来就可以创建自己的 Studio 笔记本了。

您可以使用 AWS Management Console 或创建应用程序 AWS CLI。
注意

您也可以从 Amazon MSK 控制台创建 Studio 笔记本,方法是选择现有集群,然后选择 “实时处理数据”。

使用创建 Studio 笔记本 AWS Management Console

  1. 在家打开适用于 Apache Flink 的托管服务控制台? https://console.aws.amazon.com/managed-flink/ region=us-east-1#/应用程序/仪表板。

  2. Managed Service for Apache Flink 应用程序页面中,选择 Studio 选项卡。选择创建 Studio 笔记本

    注意

    要从亚马逊MSK或 Kinesis Data Streams 控制台创建 Studio 笔记本,请选择您输入的MSK亚马逊集群或 Kinesis 数据流,然后选择 “实时处理数据”。

  3. 创建笔记本实例 页面上,提供以下信息:

    • 输入 MyNotebook Studio 笔记本的名称

    • Glue 数据库AWS 选择默认值

    选择创建 Studio 笔记本

  4. 在该MyNotebook页面中,选择 “配置” 选项卡。在 联网 部分中,选择 编辑

  5. “编辑联网” MyNotebook 页面中,选择基于 Amazon MSK 集群的VPC配置。为亚马逊MSK集群选择您的亚马逊MSK集群。选择 Save changes(保存更改)

  6. MyNotebook页面中,选择 “运行”。等待“状态”显示为“正在运行”。

使用创建 Studio 笔记本 AWS CLI

要使用创建 Studio 笔记本 AWS CLI,请执行以下操作:

  1. 验证您具有以下信息:创建应用程序时,您需要用到这些值。

    • 您的 账户 ID

    • 包含您的亚马逊MSK集群的 Amazon VPC 的子网IDs和安全组 ID。

  2. 创建以下内容的名为 create.json 的文件。确保将占位符值替换为您自己的信息。

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. 要创建应用程序,请运行以下命令:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. 命令完成后,您应该会看到类似于以下内容的输出,其中显示了新 Studio 笔记本的详细信息:

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. 要开始应用程序,请运行以下命令。请将占位符值替换为账户 ID。

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

将数据发送到您的 Amazon MSK 集群

在本节中,你将在亚马逊EC2客户端中运行 Python 脚本,将数据发送到你的亚马逊MSK数据源。

  1. 连接到您的亚马逊EC2客户端。

  2. 运行以下命令安装 Python 版本 3、Pip 和 Kafka for Python 软件包,然后确认操作:

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. 通过输入以下命令 AWS CLI 在您的客户端计算机上进行配置:

    aws configure

    提供您的账户凭证,us-east-1并提供region

  4. 创建以下内容的名为 stock.py 的文件。将示例值替换为您的 Amazon MSK 集群的 Bootstrap Brokers 字符串,如果您的主题不AWS KafkaTutorialTopic是,请更新主题名称:

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. 使用以下命令运行脚本:

    $ python3 stock.py
  6. 完成以下部分后,请让脚本继续运行。

测试您的 Studio 笔记本

在本节中,您将使用 Studio 笔记本查询来自您的 Amazon MSK 集群的数据。

  1. 在家打开适用于 Apache Flink 的托管服务控制台? https://console.aws.amazon.com/managed-flink/ region=us-east-1#/应用程序/仪表板。

  2. Managed Service for Apache Flink 应用程序页面上,选择 Studio 笔记本选项卡。选择MyNotebook

  3. MyNotebook页面中,选择 “在 Apache 齐柏林飞艇中打开”。

    Apache Zeppelin 接口会在新选项卡中打开。

  4. 欢迎来到 Zeppelin!页面上,选择Zeppelin 新笔记

  5. Zeppelin Note 页面上,在新笔记中输入以下查询:

    %flink.ssql(type=update) select * from stock

    选择运行图标。

    该应用程序显示来自 Amazon MSK 集群的数据。

要打开应用程序的 Apache Flink 仪表板以查看操作方面,请选择。FLINKJOB有关 Flink 控制面板的更多信息,请参阅《Managed Service for Apache Flink 开发者指南》中的 Apache Flink 控制面板。

有关 Flink 直播SQL查询的更多示例,请参阅 Apache Fl ink 文档中的查询