使用 Amazon 創建工作室筆記本 MSK - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Amazon 創建工作室筆記本 MSK

本教學課程說明如何建立使用 Amazon MSK 叢集做為來源的 Studio 筆記本。

設置 Amazon MSK 群集

在本教學中,您需要一個允許純文字存取的 Amazon MSK 叢集。如果您尚未設定 Amazon MSK 叢集,請按照 Amazon 入門使用MSK教學建立 Amazon VPC、Amazon MSK 叢集、主題和 Amazon 用EC2戶端執行個體。

跟隨教學課程學習時,請執行下列動作:

將NAT閘道新增至您的 VPC

如果您按照開始使用 Amazon MSK 教學建立 Amazon MSK 叢集,或者您現有的 Amazon 尚VPC未擁有其私有子網路的NAT閘道,則必須將NAT閘道新增到 Amazon VPC。下圖顯示一般架構。

AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.

要為您的 Amazon 創建NAT網關VPC,請執行以下操作:

  1. 在打開 Amazon VPC 控制台https://console.aws.amazon.com/vpc/

  2. 從左側導覽列選擇「NAT閘道」。

  3. 在 [NAT閘道] 頁面上,選擇 [建立NAT閘道]。

  4. 在 [建立NAT閘道] 頁面上,提供下列值:

    名稱-可選 ZeppelinGateway
    子網 AWS KafkaTutorialSubnet1
    彈性 IP 配置識別碼 選擇可用的彈性 IP。如果沒有IPs可用的彈性,請選擇 [配置彈性 IP],然後選擇主控台建立的 Elasic IP。

    選擇 [建立NAT閘道]。

  5. 在導覽列中,選擇路由表

  6. 選擇建立路由表

  7. 建立路由表頁面,提供以下資訊:

    • 名稱標籤ZeppelinRouteTable

    • VPC:選擇您的VPC(例如 AWS KafkaTutorialVPC)。

    選擇 Create (建立)。

  8. 在路由表清單中,選擇ZeppelinRouteTable。選擇路由標籤,然後選擇編輯路由

  9. 編輯路由標籤中,選擇新增路由

  10. 中,為目標輸入 0.0.0.0/0。針對「目標」,選擇「NAT閘道ZeppelinGateway。選擇儲存路由。選擇關閉

  11. 在「路由表」頁面上,選取後,ZeppelinRouteTable選擇「子網路關聯」標籤。選擇編輯子網路關聯

  12. 在 [編輯子網路關聯] 頁面中,選擇 [AWS KafkaTutorialSubnet2] 和 [AWS KafkaTutorialSubnet3]。選擇 Save (儲存)。

創建一個 AWS Glue 連接和表

您的 Studio 筆記本使用資料AWS Glue庫取得有關 Amazon MSK 資料來源的中繼資料。在本節中,您會建立說明如何存取 Amazon MSK 叢集的 AWS Glue 連線,以及一個說明如何將資料來源中的資料呈現給用戶端 (例如 Studio 筆記本) 的 AWS Glue 表格。

建立連線
  1. 登入 AWS Management Console 並開啟 AWS Glue 主控台,位於https://console.aws.amazon.com/glue/

  2. 如果您還沒有資料 AWS Glue 庫,請從左側導覽列選擇 [資料庫]。選擇新增資料庫。在新增資料庫視窗中,為資料庫名稱輸入 default。選擇 Create (建立)。

  3. 從左側導覽列選擇連線。選擇新增連線

  4. 新增連線視窗中,提供下列值:

    • 對於連線名稱,請輸入 ZeppelinConnection

    • 對於連線類型,請選擇 Kafka

    • 對於 Kafka 引導服務器 URLs,為您的集群提供引導代理字符串。您可以從MSK控制台或輸入以下CLI命令來獲取引導代理程序:

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • 取消勾選需要SSL連線核取方塊。

    選擇 Next (下一步)

  5. VPC頁面中,提供下列值:

    • 對於 VPC,選擇您的名稱VPC(例如 AWS KafkaTutorialVPC。)

    • 對於子網路,選擇 AWS KafkaTutorialSubnet2

    • 對於安全群組,請選擇所有可用的群組。

    選擇 Next (下一步)

  6. 連線屬性 / 連線存取權頁面,選擇完成

建立資料表
注意

您可以依照下列步驟中所述手動建立表格,也可以在 Apache Zeppelin 的筆記本中,使用針對 Apache Flink 的受管理服務建立表格連接器程式碼,透過陳述式建立表格。DDL然後,您可以入庫 AWS Glue 以確保表格已正確建立。

  1. 在左側導覽列中,選擇資料表。在資料表頁面,選擇新增資料表 > 手動新增資料表

  2. 設定資料表頁面,為資料表名稱輸入 stock。請務必選取先前建立的資料庫。選擇 Next (下一步)

  3. 新增資料存放區頁面,選擇 Kafka。對於主題名稱,輸入您的主題名稱(例如 AWS KafkaTutorialTopic)。對於「連線」,請選擇ZeppelinConnection

  4. 在「分」頁面中,選擇JSON。選擇 Next (下一步)

  5. 定義結構描述頁面,選擇「新增資料欄」以新增資料欄。新增具有下列屬性的欄:

    欄名稱 資料類型
    ticker string
    price double

    選擇 Next (下一步)

  6. 在下一頁上,確認您的設定,然後選擇完成

  7. 從資料表清單中選取您新建立的資料表。

  8. 選擇 「編輯表格」 並新增下列內容:

    • 鍵:managed-flink.proctime,值:proctime

    • 鍵:flink.properties.group.id,值:test-consumer-group

    • 鍵:flink.properties.auto.offset.reset,值:latest

    • 鍵:classification,值:json

    如果沒有這些鍵/值對,Flink 筆記本會遇到錯誤。

  9. 選擇套用

使用 Amazon 創建工作室筆記本 MSK

現在,您已建立應用程式使用的資源,接下來可以建立您的 Studio 筆記本。

您可以使用 AWS Management Console 或建立應用程式 AWS CLI。
注意

您也可以選擇現有叢集,然後選擇「即時處理資料」,從 Amazon MSK 主控台建立 Studio 筆記本。

建立工作室筆記本 AWS Management Console

  1. https://console.aws.amazon.com/managed-flink/家中打開 Apache Flink 控制台的託管服務? 區域 = us-east-1 #/應用程序/儀表板。

  2. Managed Service for Apache Flink 應用程式頁面,選擇 Studio 標籤。選擇建立 Studio 筆記本

    注意

    若要從 Amazon MSK 或 Kinesis Data Streams 主控台建立 Studio 筆記本,請選取您輸入的 Amazon MSK 叢集或 Kinesis 資料串流,然後選擇「即時處理資料」。

  3. 建立 Studio 筆記本頁面,提供下列資訊:

    • Studio 筆記本名稱輸入 MyNotebook

    • AWS Glue 資料庫選擇預設值

    選擇建立 Studio 筆記本

  4. MyNotebook頁面中,選擇「組態」頁籤。在網路模式區段中,選擇編輯

  5. 在 [編輯以下項目的聯網 MyNotebook] 頁面中,選擇以 Amazon MSK 叢集為基礎的VPC組態。為 Amazon MSK 群集選擇您的 Amazon MSK 群集。選擇 Save changes (儲存變更)。

  6. MyNotebook頁面中,選擇 [執行]。等待狀態顯示為執行中

建立工作室筆記本 AWS CLI

若要使用建立您的 Studio 筆記本 AWS CLI,請執行下列動作:

  1. 請務必備妥下列資訊:您需要這些值來建立應用程式。

    • 帳戶 ID。

    • 包含您的 Amazon MSK 叢集之 Amazon VPC 的子網路IDs和安全群組識別碼。

  2. 建立稱為 create.json 的檔案,其中具有以下內容。使用您的資訊取代預留位置的值。

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. 若要建立應用程式,請執行下列命令:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. 命令完成後,您應該會看到類似如下的輸出,其中顯示新 Studio 筆記本的詳細資料:

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. 若要執行應用程式,請執行下列命令:使用您的帳戶 ID 取代範例值。

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

將資料傳送到您的 Amazon MSK 叢集

在本節中,您會在 Amazon EC2 用戶端中執行 Python 指令碼,以將資料傳送到您的 Amazon MSK 資料來源。

  1. Connect 到您的 Amazon EC2 客戶端。

  2. 執行以下命令來安裝 Python 版本 3、Pip 和 Kafka for Python 套件,並確認操作:

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. 輸入下列命令, AWS CLI 在用戶端電腦上設定:

    aws configure

    提供帳戶憑證,並為 region 提供 us-east-1

  4. 建立稱為 stock.py 的檔案,其中具有以下內容。將範例值取代為 Amazon MSK 叢集的 Bootstrap 代理程式字串,如果您的主題不是,請更新主題名稱 AWS KafkaTutorialTopic

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. 使用下列命令執行指令碼:

    $ python3 stock.py
  6. 完成下一節時,讓指令碼保持執行狀態。

測試 Studio 筆記本

在本節中,您可以使用 Studio 筆記本查詢 Amazon MSK 叢集中的資料。

  1. https://console.aws.amazon.com/managed-flink/家中打開 Apache Flink 控制台的託管服務? 區域 = us-east-1 #/應用程序/儀表板。

  2. Managed Service for Apache Flink 應用程式頁面,選擇 Studio 筆記本標籤。選擇MyNotebook

  3. MyNotebook頁面中,選擇在 Apache 齊柏林飛艇中打開

    Apache Zeppelin 介面會在新標籤中開啟。

  4. 歡迎來到 Zeppelin! 頁面,選擇 Zeppelin 新筆記

  5. Zeppelin 筆記頁面,在新筆記中輸入以下查詢:

    %flink.ssql(type=update) select * from stock

    選擇執行圖示。

    應用程式會顯示來自 Amazon MSK 叢集的資料。

若要為您的應用程式開啟 Apache Flink 儀表板以檢視操作層面,請選擇FLINKJOB。如需 Flink 儀表板的詳細資訊,請參閱 Managed Service for Apache Flink 開發人員指南中的 Apache Flink 儀表板

如需 Flink 串流SQL查詢的更多範例,請參閱 Apache Flink 文件中的查詢