Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 Amazon 創建工作室筆記本 MSK
本教學課程說明如何建立使用 Amazon MSK 叢集做為來源的 Studio 筆記本。
本教學課程包含下列章節:
設置 Amazon MSK 群集
在本教學中,您需要一個允許純文字存取的 Amazon MSK 叢集。如果您尚未設定 Amazon MSK 叢集,請按照 Amazon 入門使用MSK教學建立 Amazon VPC、Amazon MSK 叢集、主題和 Amazon 用EC2戶端執行個體。
跟隨教學課程學習時,請執行下列動作:
在步驟 3:建立 Amazon MSK 叢集中,在步驟 4 中,將
ClientBroker
值從變更TLS
為PLAINTEXT
。
將NAT閘道新增至您的 VPC
如果您按照開始使用 Amazon MSK 教學建立 Amazon MSK 叢集,或者您現有的 Amazon 尚VPC未擁有其私有子網路的NAT閘道,則必須將NAT閘道新增到 Amazon VPC。下圖顯示一般架構。
要為您的 Amazon 創建NAT網關VPC,請執行以下操作:
在打開 Amazon VPC 控制台https://console.aws.amazon.com/vpc/
。 從左側導覽列選擇「NAT閘道」。
在 [NAT閘道] 頁面上,選擇 [建立NAT閘道]。
在 [建立NAT閘道] 頁面上,提供下列值:
名稱-可選 ZeppelinGateway
子網 AWS KafkaTutorialSubnet1 彈性 IP 配置識別碼 選擇可用的彈性 IP。如果沒有IPs可用的彈性,請選擇 [配置彈性 IP],然後選擇主控台建立的 Elasic IP。 選擇 [建立NAT閘道]。
在導覽列中,選擇路由表。
選擇建立路由表。
在建立路由表頁面,提供以下資訊:
名稱標籤:
ZeppelinRouteTable
VPC:選擇您的VPC(例如 AWS KafkaTutorialVPC)。
選擇 Create (建立)。
在路由表清單中,選擇ZeppelinRouteTable。選擇路由標籤,然後選擇編輯路由。
在編輯路由標籤中,選擇新增路由。
在 中,為目標輸入
0.0.0.0/0
。針對「目標」,選擇「NAT閘道」ZeppelinGateway。選擇儲存路由。選擇關閉。在「路由表」頁面上,選取後,ZeppelinRouteTable選擇「子網路關聯」標籤。選擇編輯子網路關聯。
在 [編輯子網路關聯] 頁面中,選擇 [AWS KafkaTutorialSubnet2] 和 [AWS KafkaTutorialSubnet3]。選擇 Save (儲存)。
創建一個 AWS Glue 連接和表
您的 Studio 筆記本使用資料AWS Glue庫取得有關 Amazon MSK 資料來源的中繼資料。在本節中,您會建立說明如何存取 Amazon MSK 叢集的 AWS Glue 連線,以及一個說明如何將資料來源中的資料呈現給用戶端 (例如 Studio 筆記本) 的 AWS Glue 表格。
建立連線
登入 AWS Management Console 並開啟 AWS Glue 主控台,位於https://console.aws.amazon.com/glue/
。 如果您還沒有資料 AWS Glue 庫,請從左側導覽列選擇 [資料庫]。選擇新增資料庫。在新增資料庫視窗中,為資料庫名稱輸入
default
。選擇 Create (建立)。從左側導覽列選擇連線。選擇新增連線。
在新增連線視窗中,提供下列值:
對於連線名稱,請輸入
ZeppelinConnection
。對於連線類型,請選擇 Kafka。
對於 Kafka 引導服務器 URLs,為您的集群提供引導代理字符串。您可以從MSK控制台或輸入以下CLI命令來獲取引導代理程序:
aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn
ClusterArn
取消勾選需要SSL連線核取方塊。
選擇 Next (下一步)。
在VPC頁面中,提供下列值:
對於 VPC,選擇您的名稱VPC(例如 AWS KafkaTutorialVPC。)
對於子網路,選擇 AWS KafkaTutorialSubnet2。
對於安全群組,請選擇所有可用的群組。
選擇 Next (下一步)。
在連線屬性 / 連線存取權頁面,選擇完成。
建立資料表
注意
您可以依照下列步驟中所述手動建立表格,也可以在 Apache Zeppelin 的筆記本中,使用針對 Apache Flink 的受管理服務建立表格連接器程式碼,透過陳述式建立表格。DDL然後,您可以入庫 AWS Glue 以確保表格已正確建立。
在左側導覽列中,選擇資料表。在資料表頁面,選擇新增資料表 > 手動新增資料表。
在設定資料表頁面,為資料表名稱輸入
stock
。請務必選取先前建立的資料庫。選擇 Next (下一步)。在新增資料存放區頁面,選擇 Kafka。對於主題名稱,輸入您的主題名稱(例如 AWS KafkaTutorialTopic)。對於「連線」,請選擇ZeppelinConnection。
在「分類」頁面中,選擇JSON。選擇 Next (下一步)。
在定義結構描述頁面,選擇「新增資料欄」以新增資料欄。新增具有下列屬性的欄:
欄名稱 資料類型 ticker
string
price
double
選擇 Next (下一步)。
在下一頁上,確認您的設定,然後選擇完成。
-
從資料表清單中選取您新建立的資料表。
-
選擇 「編輯表格」 並新增下列內容:
-
鍵:
managed-flink.proctime
,值:proctime
-
鍵:
flink.properties.group.id
,值:test-consumer-group
-
鍵:
flink.properties.auto.offset.reset
,值:latest
-
鍵:
classification
,值:json
如果沒有這些鍵/值對,Flink 筆記本會遇到錯誤。
-
-
選擇套用。
使用 Amazon 創建工作室筆記本 MSK
現在,您已建立應用程式使用的資源,接下來可以建立您的 Studio 筆記本。
您可以使用 AWS Management Console 或建立應用程式 AWS CLI。
注意
您也可以選擇現有叢集,然後選擇「即時處理資料」,從 Amazon MSK 主控台建立 Studio 筆記本。
建立工作室筆記本 AWS Management Console
在 Managed Service for Apache Flink 應用程式頁面,選擇 Studio 標籤。選擇建立 Studio 筆記本。
注意
若要從 Amazon MSK 或 Kinesis Data Streams 主控台建立 Studio 筆記本,請選取您輸入的 Amazon MSK 叢集或 Kinesis 資料串流,然後選擇「即時處理資料」。
在建立 Studio 筆記本頁面,提供下列資訊:
-
為 Studio 筆記本名稱輸入
MyNotebook
。 為 AWS Glue 資料庫選擇預設值。
選擇建立 Studio 筆記本。
-
在MyNotebook頁面中,選擇「組態」頁籤。在網路模式區段中,選擇編輯。
在 [編輯以下項目的聯網 MyNotebook] 頁面中,選擇以 Amazon MSK 叢集為基礎的VPC組態。為 Amazon MSK 群集選擇您的 Amazon MSK 群集。選擇 Save changes (儲存變更)。
在MyNotebook頁面中,選擇 [執行]。等待狀態顯示為執行中。
建立工作室筆記本 AWS CLI
若要使用建立您的 Studio 筆記本 AWS CLI,請執行下列動作:
請務必備妥下列資訊:您需要這些值來建立應用程式。
帳戶 ID。
包含您的 Amazon MSK 叢集之 Amazon VPC 的子網路IDs和安全群組識別碼。
建立稱為
create.json
的檔案,其中具有以下內容。使用您的資訊取代預留位置的值。{ "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::
AccountID
:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1
", "SubnetID 2
", "SubnetID 3
" ], "SecurityGroupIds": [ "VPC Security Group ID
" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID
:database/default" } } } } }若要建立應用程式,請執行下列命令:
aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
命令完成後,您應該會看到類似如下的輸出,其中顯示新 Studio 筆記本的詳細資料:
{ "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
若要執行應用程式,請執行下列命令:使用您的帳戶 ID 取代範例值。
aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:
012345678901
:application/MyNotebook\
將資料傳送到您的 Amazon MSK 叢集
在本節中,您會在 Amazon EC2 用戶端中執行 Python 指令碼,以將資料傳送到您的 Amazon MSK 資料來源。
Connect 到您的 Amazon EC2 客戶端。
執行以下命令來安裝 Python 版本 3、Pip 和 Kafka for Python 套件,並確認操作:
sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
輸入下列命令, AWS CLI 在用戶端電腦上設定:
aws configure
提供帳戶憑證,並為
region
提供us-east-1
。建立稱為
stock.py
的檔案,其中具有以下內容。將範例值取代為 Amazon MSK 叢集的 Bootstrap 代理程式字串,如果您的主題不是,請更新主題名稱 AWS KafkaTutorialTopic:from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "
<<Bootstrap Broker List>>
" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())使用下列命令執行指令碼:
$ python3 stock.py
完成下一節時,讓指令碼保持執行狀態。
測試 Studio 筆記本
在本節中,您可以使用 Studio 筆記本查詢 Amazon MSK 叢集中的資料。
在 Managed Service for Apache Flink 應用程式頁面,選擇 Studio 筆記本標籤。選擇MyNotebook。
在MyNotebook頁面中,選擇在 Apache 齊柏林飛艇中打開。
Apache Zeppelin 介面會在新標籤中開啟。
在歡迎來到 Zeppelin! 頁面,選擇 Zeppelin 新筆記。
在 Zeppelin 筆記頁面,在新筆記中輸入以下查詢:
%flink.ssql(type=update) select * from stock
選擇執行圖示。
應用程式會顯示來自 Amazon MSK 叢集的資料。
若要為您的應用程式開啟 Apache Flink 儀表板以檢視操作層面,請選擇FLINKJOB。如需 Flink 儀表板的詳細資訊,請參閱 Managed Service for Apache Flink 開發人員指南中的 Apache Flink 儀表板。
如需 Flink 串流SQL查詢的更多範例,請參閱 Apache Flink