

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Amazon MSK 建立 Studio 筆記本
<a name="example-notebook-msk"></a>

本教學課程說明如何建立使用 Amazon MSK 叢集作為來源的 Studio 筆記本。

**Topics**
+ [設定 Amazon MSK 叢集](#example-notebook-msk-setup)
+ [將 NAT 閘道新增至 VPC](#example-notebook-msk-nat)
+ [建立 AWS Glue 連線和資料表](#example-notebook-msk-glue)
+ [使用 Amazon MSK 建立 Studio 筆記本](#example-notebook-msk-create)
+ [將資料傳送至 Amazon MSK 叢集](#example-notebook-msk-send)
+ [測試 Studio 筆記本](#example-notebook-msk-test)

## 設定 Amazon MSK 叢集
<a name="example-notebook-msk-setup"></a>

在本教學課程中，您需要一個允許純文字存取的 Amazon MSK 叢集。如果尚未設定 Amazon MSK 叢集，請依照 [Amazon MSK 使用入門](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)教學課程來建立 Amazon VPC、Amazon MSK 叢集、主題和 Amazon EC2 用戶端執行個體。

跟隨教學課程學習時，請執行下列動作：
+ 在[步驟 3：建立 Amazon MSK 叢集](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html)的步驟 4 中，將 `ClientBroker` 值從 `TLS` 變更為 **PLAINTEXT**。

## 將 NAT 閘道新增至 VPC
<a name="example-notebook-msk-nat"></a>

如果依照 [Amazon MSK 使用入門](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)教學課程建立 Amazon MSK 叢集，或者您現有的 Amazon VPC 還沒有適用於其私有子網路的 NAT 閘道，則必須將 NAT 閘道新增到 Amazon VPC。下圖顯示一般架構。

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/images/vpc_05.png)


若要為您的 Amazon VPC 建立 NAT 閘道，請執行下列動作：

1. 前往 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 開啟 Amazon VPC 主控台。

1. 從左側導覽列選擇 **NAT 閘道**。

1. 在 **NAT 閘道**頁面，選擇**建立 NAT 閘道**。

1. 在**建立 NAT 閘道**頁面，提供下列值：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/example-notebook-msk.html)

   選擇**建立 NAT 閘道**。

1. 在導覽列中，選擇**路由表**。

1. 選擇**建立路由表**。

1. 在**建立路由表**頁面，提供以下資訊：
   + **名稱標籤**：**ZeppelinRouteTable**
   + **VPC****：選擇 VPC (例如AWS KafkaTutorialVPC)**。

   選擇**建立**。

1. 在路由表清單中，選擇 **ZeppelinRouteTable**。選擇**路由**標籤，然後選擇**編輯路由**。

1. 在**編輯路由**標籤中，選擇**新增路由**。

1. 在 **** 中，為**目標**輸入 **0.0.0.0/0**。為**目標**選擇 **NAT 閘道**、**ZeppelinGateway**。選擇**儲存路由**。選擇**關閉**。

1. 在「路由表」頁面，已選取 **ZeppelinRouteTable** 時，選擇**子網路關聯**標籤。選擇**編輯子網路關聯**。

1. 在**編輯子網路關聯**頁面，選擇 **AWS KafkaTutorialSubnet2** 和 **AWS KafkaTutorialSubnet3**。選擇 **Save** (儲存)。

## 建立 AWS Glue 連線和資料表
<a name="example-notebook-msk-glue"></a>

您的 Studio 筆記本使用 [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) 資料庫取得有關 Amazon MSK 資料來源的中繼資料。在本節中，您會建立 AWS Glue 連線，說明如何存取 Amazon MSK 叢集，以及說明如何將資料來源中的資料呈現給 Studio 筆記本等用戶端的 AWS Glue 資料表。

**建立連線**

1. 登入 AWS 管理主控台 ，並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) 開啟 AWS Glue 主控台。

1. 如果您還沒有 AWS Glue 資料庫，請從左側導覽列中選擇**資料庫**。選擇**新增資料庫**。在**新增資料庫**視窗中，為**資料庫名稱**輸入 **default**。選擇 **Create** (建立)。

1. 從左側導覽列選擇**連線**。選擇**新增連線**。

1. 在**新增連線**視窗中，提供下列值：
   + 對於**連線名稱**，請輸入 **ZeppelinConnection**。
   + 對於**連線類型**，請選擇 **Kafka**。
   + 對於 **Kafka 啟動伺服器 URL**，請為叢集提供啟動代理程式字串。您可以從 MSK 主控台或輸入下列 CLI 命令來取得啟動代理程式：

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + 取消核取**需要 SSL 連線**核取方塊。

   選擇**下一步**。

1. 在 **VPC** 頁面，提供下列值：
   + 針對 **VPC**，選擇 VPC 的名稱 （例如 ** AWS KafkaTutorialVPC**)。
   + 對於**子網路**，請選擇 **AWS KafkaTutorialSubnet2**。
   + 對於**安全群組**，請選擇所有可用的群組。

   選擇 **Next (下一步)**。

1. 在**連線屬性** / **連線存取權**頁面，選擇**完成**。

**建立資料表**
**注意**  
您可以依照下列步驟所述手動建立資料表，也可以在 Apache Zeppelin 的筆記本中，使用針對 Managed Service for Apache Flink 的建立資料表連接器程式碼，透過 DDL 陳述式建立資料表。然後，您可以簽入 AWS Glue ，以確保正確建立資料表。

1. 在左側導覽列中，選擇**資料表**。在**資料表**頁面，選擇**新增資料表** > **手動新增資料表**。

1. 在**設定資料表**頁面，為**資料表名稱**輸入 **stock**。請務必選取先前建立的資料庫。選擇 **Next (下一步)**。

1. 在**新增資料存放區**頁面，選擇 **Kafka**。對於**主題名稱**，請輸入您的主題名稱 (例如 **AWS KafkaTutorialTopic**)。針對**連線**，選擇 **ZeppelinConnection**。

1. 在**分類**頁面，選擇 **JSON**。選擇 **Next (下一步)**。

1. 在**定義結構描述**頁面，選擇「新增資料欄」以新增資料欄。新增具有下列屬性的欄：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/example-notebook-msk.html)

   選擇 **Next (下一步)**。

1. 在下一頁上，確認您的設定，然後選擇**完成**。

1. 從資料表清單中選取您新建立的資料表。

1. 選擇**編輯資料表**並新增下列屬性：
   + 金鑰：`managed-flink.proctime`，值： `proctime`
   + 金鑰：`flink.properties.group.id`，值： `test-consumer-group`
   + 金鑰：`flink.properties.auto.offset.reset`，值： `latest`
   + 金鑰：`classification`，值： `json`

   如果沒有這些鍵/值對，Flink 筆記本會執行為錯誤。

1. 選擇**套用**。

## 使用 Amazon MSK 建立 Studio 筆記本
<a name="example-notebook-msk-create"></a>

現在，您已建立應用程式使用的資源，接下來可以建立您的 Studio 筆記本。

**Topics**
+ [使用 建立 Studio 筆記本 AWS 管理主控台](#example-notebook-create-msk-console)
+ [使用 建立 Studio 筆記本 AWS CLI](#example-notebook-msk-create-api)

**注意**  
您也可以選擇現有叢集，然後選擇**即時處理資料**，從 Amazon MSK 主控台建立 Studio 筆記本。

### 使用 建立 Studio 筆記本 AWS 管理主控台
<a name="example-notebook-create-msk-console"></a>

1. 前往 [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) 開啟 Managed Service in the Apache Flink 主控台。

1. 在 **Managed Service for Apache Flink 應用程式**頁面，選擇 **Studio** 標籤。選擇**建立 Studio 筆記本**。
**注意**  
若要從 Amazon MSK 或 Kinesis Data Streams 主控台建立 Studio 筆記本，請選取您的輸入 Amazon MSK 叢集或 Kinesis 資料串流，然後選擇**即時處理資料**。

1. 在**建立 Studio 筆記本**頁面，提供下列資訊：
   + 為 **Studio 筆記本名稱**輸入 **MyNotebook**。
   + 為 **AWS Glue 資料庫**選擇**預設值**。

   選擇**建立 Studio 筆記本**。

1. 在 **MyNotebook** 頁面，選擇**組態**標籤。在**網路模式**區段中，選擇**編輯**。

1. 在**編輯 MyNotebook 聯網**頁面，選擇**以 Amazon MSK 叢集為基礎的 VPC 組態**。為 **Amazon MSK 叢集**選擇 Amazon MSK 叢集。選擇**儲存變更**。

1. 在 **MyNotebook** 頁面，選擇**執行**。等待**狀態**顯示為**執行中**。

### 使用 建立 Studio 筆記本 AWS CLI
<a name="example-notebook-msk-create-api"></a>

若要使用 建立 Studio 筆記本 AWS CLI，請執行下列動作：

1. 請務必備妥下列資訊：您需要這些值來建立應用程式。
   + 帳戶 ID。
   + 子網路 ID 以及包含 Amazon MSK 叢集的 Amazon VPC 的安全群組 ID。

1. 建立稱為 `create.json` 的檔案，其中具有以下內容。使用您的資訊取代預留位置的值。

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 若要建立應用程式，請執行下列命令：

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 命令完成後，您應該會看到類似如下的輸出，其中顯示新 Studio 筆記本的詳細資料：

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 若要執行應用程式，請執行下列命令：使用您的帳戶 ID 取代範例值。

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## 將資料傳送至 Amazon MSK 叢集
<a name="example-notebook-msk-send"></a>

在本節中，您會在 Amazon EC2 用戶端中執行 Python 指令碼，以將資料傳送到您的 Amazon MSK 資料來源。

1. 連線到 Amazon EC2 用戶端。

1. 執行以下命令來安裝 Python 版本 3、Pip 和 Kafka for Python 套件，並確認操作：

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. 輸入下列命令，在用戶端機器 AWS CLI 上設定 ：

   ```
   aws configure
   ```

   提供帳戶憑證，並為 `region` 提供 **us-east-1**。

1. 建立稱為 `stock.py` 的檔案，其中具有以下內容。使用 Amazon MSK 叢集的啟動代理程式字串取代範例值，如果您的主題不是 **AWS KafkaTutorialTopic**，請更新主題名稱：

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. 使用下列命令執行指令碼：

   ```
   $ python3 stock.py
   ```

1. 完成下一節時，讓指令碼保持執行狀態。

## 測試 Studio 筆記本
<a name="example-notebook-msk-test"></a>

在本節中，您可以使用 Studio 筆記本查詢 Amazon MSK 叢集中的資料。

1. 前往 [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) 開啟 Managed Service in the Apache Flink 主控台。

1. 在 **Managed Service for Apache Flink 應用程式**頁面，選擇 **Studio 筆記本**標籤。選擇 **MyNotebook**。

1. 在 **MyNotebook** 頁面，選擇**在 Apache Zeppelin 中開啟**。

   Apache Zeppelin 介面會在新標籤中開啟。

1. 在**歡迎來到 Zeppelin\$1** 頁面，選擇 **Zeppelin 新筆記**。

1. 在 **Zeppelin 筆記**頁面，在新筆記中輸入以下查詢：

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   選擇執行圖示。

   應用程式會顯示 Amazon MSK 叢集中的資料。

若要為應用程式開啟 Apache Flink 儀表板以檢視操作層面，請選擇 **FLINK 作業**。如需 Flink 儀表板的詳細資訊，請參閱 [Managed Service for Apache Flink 開發人員指南](https://docs.aws.amazon.com/)中的 [Apache Flink 儀表板](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)。

如需 Flink 串流 SQL 查詢的更多範例，請參閱 [Apache Flink 文件](https://nightlies.apache.org/flink/flink-docs-release-1.15/)中的[查詢](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。