

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Managed Service for Apache Flink 中 Studio 筆記本的範例和教學課程
<a name="how-zeppelin-examples"></a>

**Topics**
+ [教學課程：在 Managed Service for Apache Flink 中建立 Studio 筆記本](example-notebook.md)
+ [教學課程：將 Studio 筆記本部署為具有持久狀態的 Managed Service for Apache Flink 應用程式](example-notebook-deploy.md)
+ [檢視 Studio 筆記本中分析資料的範例查詢](how-zeppelin-sql-examples.md)

# 教學課程：在 Managed Service for Apache Flink 中建立 Studio 筆記本
<a name="example-notebook"></a>

下列教學課程示範如何建立 Studio 筆記本，從 Kinesis 資料串流或 Amazon MSK 叢集讀取資料。

**Topics**
+ [完成先決條件](#example-notebook-setup)
+ [建立 AWS Glue 資料庫](#example-notebook-glue)
+ [後續步驟：使用 Kinesis Data Streams 或 Amazon MSK 建立 Studio 筆記本](#examples-notebook-nextsteps)
+ [使用 Kinesis Data Streams 建立 Studio 筆記本](example-notebook-streams.md)
+ [使用 Amazon MSK 建立 Studio 筆記本](example-notebook-msk.md)
+ [清除您的應用程式和相依資源](example-notebook-cleanup.md)

## 完成先決條件
<a name="example-notebook-setup"></a>

請確定您的 AWS CLI 是第 2 版或更新版本。若要安裝最新的 AWS CLI，請參閱[安裝、更新和解除安裝第 2 AWS CLI 版](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html)。

## 建立 AWS Glue 資料庫
<a name="example-notebook-glue"></a>

您的 Studio 筆記本使用 [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) 資料庫取得有關 Amazon MSK 資料來源的中繼資料。

**建立 AWS Glue 資料庫**

1. 在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) 開啟 AWS Glue 主控台。

1. 選擇**新增資料庫**。在**新增資料庫**視窗中，為**資料庫名稱**輸入 **default**。選擇**建立**。

## 後續步驟：使用 Kinesis Data Streams 或 Amazon MSK 建立 Studio 筆記本
<a name="examples-notebook-nextsteps"></a>

借助本教學課程，您可以建立使用 Kinesis Data Streams 或 Amazon MSK 的 Studio 筆記本：
+ [使用 Kinesis Data Streams 建立 Studio 筆記本](example-notebook-streams.md)：使用 Kinesis Data Streams，您可以快速建立使用 Kinesis 資料串流作為來源的應用程式。您只需要將 Kinesis 資料串流建立為相依資源。
+ [使用 Amazon MSK 建立 Studio 筆記本](example-notebook-msk.md)：使用 Amazon MSK，您可以建立使用 Amazon MSK 叢集做為來源的應用程式。您需要建立一個 Amazon VPC、一個 Amazon EC2 用戶端執行個體和一個 Amazon MSK 叢集作為相依資源。

# 使用 Kinesis Data Streams 建立 Studio 筆記本
<a name="example-notebook-streams"></a>

本教學課程說明如何建立使用 Kinesis 資料串流作為來源的 Studio 筆記本。

**Topics**
+ [完成先決條件](#example-notebook-streams-setup)
+ [建立 AWS Glue 資料表](#example-notebook-streams-glue)
+ [使用 Kinesis Data Streams 建立 Studio 筆記本](#example-notebook-streams-create)
+ [將資料傳送至 Kinesis 資料串流](#example-notebook-streams-send)
+ [測試 Studio 筆記本](#example-notebook-streams-test)

## 完成先決條件
<a name="example-notebook-streams-setup"></a>

建立 Studio 筆記本之前，請先建立 Kinesis 資料串流 (`ExampleInputStream`)。您的應用程式使用此串流作為應用程式來源。

您可以使用 Amazon Kinesis 主控台或以下 AWS CLI 命令來建立該串流。如需主控台指示，請參閱《Amazon Kinesis Data Streams 開發人員指南》中的[建立和更新資料串流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)**。將該串流命名為 **ExampleInputStream**，並將**開啟的碎片數量**設定為 **1**。

若要使用 建立串流 (`ExampleInputStream`) AWS CLI，請使用下列 Amazon Kinesis `create-stream` AWS CLI 命令。

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## 建立 AWS Glue 資料表
<a name="example-notebook-streams-glue"></a>

您的 Studio 筆記本使用 [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) 資料庫取得有關 Kinesis Data Streams 資料來源的中繼資料。

**注意**  
您可以先手動建立資料庫，也可以讓 Managed Service for Apache Flink 在您建立筆記本時為您建立資料庫。同樣，您可以依照本節所述手動建立資料表，也可以在 Apache Zeppelin 的筆記本中，使用針對 Managed Service for Apache Flink 的建立資料表連接器程式碼，透過 DDL 陳述式建立資料表。然後，您可以簽入 AWS Glue ，以確保正確建立資料表。

**建立資料表**

1. 登入 AWS 管理主控台 ，並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) 開啟 AWS Glue 主控台。

1. 如果您還沒有 AWS Glue 資料庫，請從左側導覽列中選擇**資料庫**。選擇**新增資料庫**。在**新增資料庫**視窗中，為**資料庫名稱**輸入 **default**。選擇 **Create** (建立)。

1. 在左側導覽列中，選擇**資料表**。在**資料表**頁面，選擇**新增資料表** > **手動新增資料表**。

1. 在**設定資料表**頁面，為**資料表名稱**輸入 **stock**。請務必選取先前建立的資料庫。選擇 **Next (下一步)**。

1. 在**新增資料存放區**頁面，選擇 **Kinesis**。對於**串流名稱**，輸入 **ExampleInputStream**。針對 **Kinesis 來源 URL**，請選擇輸入 **https://kinesis.us-east-1.amazonaws.com**。如果您複製並貼上 **Kinesis 來源 URL**，請務必刪除任何前置或尾端空格。選擇 **Next (下一步)**。

1. 在**分類**頁面，選擇 **JSON**。選擇 **Next (下一步)**。

1. 在**定義結構描述**頁面，選擇「新增資料欄」以新增資料欄。新增具有下列屬性的欄：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/example-notebook-streams.html)

   選擇 **Next (下一步)**。

1. 在下一頁上，確認您的設定，然後選擇**完成**。

1. 從資料表清單中選取您新建立的資料表。

1. 選擇**編輯資料表**，然後新增索引鍵為 `managed-flink.proctime` 值為 `proctime` 的屬性。

1. 選擇**套用**。

## 使用 Kinesis Data Streams 建立 Studio 筆記本
<a name="example-notebook-streams-create"></a>

現在，您已建立應用程式使用的資源，接下來可以建立您的 Studio 筆記本。

**Topics**
+ [使用 建立 Studio 筆記本 AWS 管理主控台](#example-notebook-create-streams-console)
+ [使用 建立 Studio 筆記本 AWS CLI](#example-notebook-msk-create-api)

### 使用 建立 Studio 筆記本 AWS 管理主控台
<a name="example-notebook-create-streams-console"></a>

1. 前往 [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) 開啟 Managed Service in the Apache Flink 主控台。

1. 在 **Managed Service for Apache Flink 應用程式**頁面，選擇 **Studio** 標籤。選擇**建立 Studio 筆記本**。
**注意**  
您也可以藉由選取輸入 Amazon MSK 叢集或 Kinesis 資料串流，然後選擇**即時處理資料**，從 Amazon MSK 或 Kinesis Data Streams 主控台建立 Studio 筆記本。

1. 在**建立 Studio 筆記本**頁面，提供下列資訊：
   + 為筆記本名稱輸入 **MyNotebook**。
   + 為 **AWS Glue 資料庫**選擇**預設值**。

   選擇**建立 Studio 筆記本**。

1. 在 **MyNotebook** 頁面，選擇**執行**。等待**狀態**顯示為**執行中**。筆記本執行時需支付費用。

### 使用 建立 Studio 筆記本 AWS CLI
<a name="example-notebook-msk-create-api"></a>

若要使用 建立 Studio 筆記本 AWS CLI，請執行下列動作：

1. 驗證您的帳戶 ID。您需要此值來建立應用程式。

1. 建立角色 `arn:aws:iam::AccountID:role/ZeppelinRole`，並將下列許可新增至主控台自動建立的角色。

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. 建立稱為 `create.json` 的檔案，其中具有以下內容。使用您的資訊取代預留位置的值。

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 若要建立應用程式，請執行下列命令：

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 命令完成後，您應該會看到類似如下的輸出，其中顯示新 Studio 筆記本的詳細資料：以下為輸出範例。

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 若要執行應用程式，請執行下列命令：使用您的帳戶 ID 取代範例值。

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## 將資料傳送至 Kinesis 資料串流
<a name="example-notebook-streams-send"></a>

若要將測試資料傳送至 Kinesis 資料串流，請執行下列動作：

1. 開啟 [Kinesis 資料產生器](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)。

1. 選擇使用 **CloudFormation 建立 Cognito 使用者**。

1.  CloudFormation 主控台會開啟 Kinesis Data Generator 範本。選擇 **Next (下一步)**。

1. 在**指定堆疊詳細資訊**頁面，輸入 Cognito 使用者的使用者名稱和密碼。選擇 **Next (下一步)**。

1. 在**設定堆疊選項**頁面，選擇**下一步**。

1. 在**檢閱 Kinesis-Data-Generator-Cognito-User** 頁面中，選擇**我確認 AWS CloudFormation 可能會建立 IAM 資源。**核取方塊。選擇 **Create Stack** (建立堆疊)。

1. 等待 CloudFormation 堆疊完成建立。堆疊完成後，在 CloudFormation 主控台中開啟 **Kinesis-Data-Generator-Cognito-User** 堆疊，然後選擇**輸出**索引標籤。開啟針對 **KinesisDataGeneratorUrl** 輸出值所列出的 URL。

1. 在 **Amazon Kinesis 資料產生器**頁面，使用您在步驟 4 中建立的憑證登入。

1. 在下一頁面，提供下列值：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/example-notebook-streams.html)

   為**記錄範本**貼上下列程式碼：

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. 選擇**傳送資料**。

1. 產生器會將資料傳送至 Kinesis 資料串流。

   完成下一節時，讓產生器保持執行狀態。

## 測試 Studio 筆記本
<a name="example-notebook-streams-test"></a>

在本節中，您可以使用 Studio 筆記本查詢 Kinesis 資料串流中的資料。

1. 前往 [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) 開啟 Managed Service in the Apache Flink 主控台。

1. 在 **Managed Service for Apache Flink 應用程式**頁面，選擇 **Studio 筆記本**標籤。選擇 **MyNotebook**。

1. 在 **MyNotebook** 頁面，選擇**在 Apache Zeppelin 中開啟**。

   Apache Zeppelin 介面會在新標籤中開啟。

1. 在**歡迎來到 Zeppelin\$1** 頁面，選擇 **Zeppelin 筆記**。

1. 在 **Zeppelin 筆記**頁面，在新筆記中輸入以下查詢：

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   選擇執行圖示。

   一小段時間後，筆記會顯示 Kinesis 資料串流中的資料。

若要為應用程式開啟 Apache Flink 儀表板以檢視操作層面，請選擇 **FLINK 作業**。如需 Flink 儀表板的詳細資訊，請參閱 [Managed Service for Apache Flink 開發人員指南](https://docs.aws.amazon.com/)中的 [Apache Flink 儀表板](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)。

如需 Flink 串流 SQL 查詢的更多範例，請參閱 [Apache Flink 文件](https://nightlies.apache.org/flink/flink-docs-release-1.15/)中的[查詢](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

# 使用 Amazon MSK 建立 Studio 筆記本
<a name="example-notebook-msk"></a>

本教學課程說明如何建立使用 Amazon MSK 叢集作為來源的 Studio 筆記本。

**Topics**
+ [設定 Amazon MSK 叢集](#example-notebook-msk-setup)
+ [將 NAT 閘道新增至 VPC](#example-notebook-msk-nat)
+ [建立 AWS Glue 連線和資料表](#example-notebook-msk-glue)
+ [使用 Amazon MSK 建立 Studio 筆記本](#example-notebook-msk-create)
+ [將資料傳送至 Amazon MSK 叢集](#example-notebook-msk-send)
+ [測試 Studio 筆記本](#example-notebook-msk-test)

## 設定 Amazon MSK 叢集
<a name="example-notebook-msk-setup"></a>

在本教學課程中，您需要一個允許純文字存取的 Amazon MSK 叢集。如果尚未設定 Amazon MSK 叢集，請依照 [Amazon MSK 使用入門](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)教學課程來建立 Amazon VPC、Amazon MSK 叢集、主題和 Amazon EC2 用戶端執行個體。

跟隨教學課程學習時，請執行下列動作：
+ 在[步驟 3：建立 Amazon MSK 叢集](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html)的步驟 4 中，將 `ClientBroker` 值從 `TLS` 變更為 **PLAINTEXT**。

## 將 NAT 閘道新增至 VPC
<a name="example-notebook-msk-nat"></a>

如果依照 [Amazon MSK 使用入門](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)教學課程建立 Amazon MSK 叢集，或者您現有的 Amazon VPC 還沒有適用於其私有子網路的 NAT 閘道，則必須將 NAT 閘道新增到 Amazon VPC。下圖顯示一般架構。

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/images/vpc_05.png)


若要為您的 Amazon VPC 建立 NAT 閘道，請執行下列動作：

1. 前往 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 開啟 Amazon VPC 主控台。

1. 從左側導覽列選擇 **NAT 閘道**。

1. 在 **NAT 閘道**頁面，選擇**建立 NAT 閘道**。

1. 在**建立 NAT 閘道**頁面，提供下列值：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/example-notebook-msk.html)

   選擇**建立 NAT 閘道**。

1. 在導覽列中，選擇**路由表**。

1. 選擇**建立路由表**。

1. 在**建立路由表**頁面，提供以下資訊：
   + **名稱標籤**：**ZeppelinRouteTable**
   + **VPC****：選擇 VPC (例如AWS KafkaTutorialVPC)**。

   選擇**建立**。

1. 在路由表清單中，選擇 **ZeppelinRouteTable**。選擇**路由**標籤，然後選擇**編輯路由**。

1. 在**編輯路由**標籤中，選擇**新增路由**。

1. 在 **** 中，為**目標**輸入 **0.0.0.0/0**。為**目標**選擇 **NAT 閘道**、**ZeppelinGateway**。選擇**儲存路由**。選擇**關閉**。

1. 在「路由表」頁面，已選取 **ZeppelinRouteTable** 時，選擇**子網路關聯**標籤。選擇**編輯子網路關聯**。

1. 在**編輯子網路關聯**頁面，選擇 **AWS KafkaTutorialSubnet2** 和 **AWS KafkaTutorialSubnet3**。選擇 **Save** (儲存)。

## 建立 AWS Glue 連線和資料表
<a name="example-notebook-msk-glue"></a>

您的 Studio 筆記本使用 [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) 資料庫取得有關 Amazon MSK 資料來源的中繼資料。在本節中，您會建立 AWS Glue 連線，說明如何存取 Amazon MSK 叢集，以及說明如何將資料來源中的資料呈現給 Studio 筆記本等用戶端的 AWS Glue 資料表。

**建立連線**

1. 登入 AWS 管理主控台 ，並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) 開啟 AWS Glue 主控台。

1. 如果您還沒有 AWS Glue 資料庫，請從左側導覽列中選擇**資料庫**。選擇**新增資料庫**。在**新增資料庫**視窗中，為**資料庫名稱**輸入 **default**。選擇 **Create** (建立)。

1. 從左側導覽列選擇**連線**。選擇**新增連線**。

1. 在**新增連線**視窗中，提供下列值：
   + 對於**連線名稱**，請輸入 **ZeppelinConnection**。
   + 對於**連線類型**，請選擇 **Kafka**。
   + 對於 **Kafka 啟動伺服器 URL**，請為叢集提供啟動代理程式字串。您可以從 MSK 主控台或輸入下列 CLI 命令來取得啟動代理程式：

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + 取消核取**需要 SSL 連線**核取方塊。

   選擇**下一步**。

1. 在 **VPC** 頁面，提供下列值：
   + 針對 **VPC**，選擇 VPC 的名稱 （例如 ** AWS KafkaTutorialVPC**)。
   + 對於**子網路**，請選擇 **AWS KafkaTutorialSubnet2**。
   + 對於**安全群組**，請選擇所有可用的群組。

   選擇 **Next (下一步)**。

1. 在**連線屬性** / **連線存取權**頁面，選擇**完成**。

**建立資料表**
**注意**  
您可以依照下列步驟所述手動建立資料表，也可以在 Apache Zeppelin 的筆記本中，使用針對 Managed Service for Apache Flink 的建立資料表連接器程式碼，透過 DDL 陳述式建立資料表。然後，您可以簽入 AWS Glue ，以確保正確建立資料表。

1. 在左側導覽列中，選擇**資料表**。在**資料表**頁面，選擇**新增資料表** > **手動新增資料表**。

1. 在**設定資料表**頁面，為**資料表名稱**輸入 **stock**。請務必選取先前建立的資料庫。選擇 **Next (下一步)**。

1. 在**新增資料存放區**頁面，選擇 **Kafka**。對於**主題名稱**，請輸入您的主題名稱 (例如 **AWS KafkaTutorialTopic**)。針對**連線**，選擇 **ZeppelinConnection**。

1. 在**分類**頁面，選擇 **JSON**。選擇 **Next (下一步)**。

1. 在**定義結構描述**頁面，選擇「新增資料欄」以新增資料欄。新增具有下列屬性的欄：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/example-notebook-msk.html)

   選擇 **Next (下一步)**。

1. 在下一頁上，確認您的設定，然後選擇**完成**。

1. 從資料表清單中選取您新建立的資料表。

1. 選擇**編輯資料表**並新增下列屬性：
   + 金鑰：`managed-flink.proctime`，值： `proctime`
   + 金鑰：`flink.properties.group.id`，值： `test-consumer-group`
   + 金鑰：`flink.properties.auto.offset.reset`，值： `latest`
   + 金鑰：`classification`，值： `json`

   如果沒有這些鍵/值對，Flink 筆記本會執行為錯誤。

1. 選擇**套用**。

## 使用 Amazon MSK 建立 Studio 筆記本
<a name="example-notebook-msk-create"></a>

現在，您已建立應用程式使用的資源，接下來可以建立您的 Studio 筆記本。

**Topics**
+ [使用 建立 Studio 筆記本 AWS 管理主控台](#example-notebook-create-msk-console)
+ [使用 建立 Studio 筆記本 AWS CLI](#example-notebook-msk-create-api)

**注意**  
您也可以選擇現有叢集，然後選擇**即時處理資料**，從 Amazon MSK 主控台建立 Studio 筆記本。

### 使用 建立 Studio 筆記本 AWS 管理主控台
<a name="example-notebook-create-msk-console"></a>

1. 前往 [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) 開啟 Managed Service in the Apache Flink 主控台。

1. 在 **Managed Service for Apache Flink 應用程式**頁面，選擇 **Studio** 標籤。選擇**建立 Studio 筆記本**。
**注意**  
若要從 Amazon MSK 或 Kinesis Data Streams 主控台建立 Studio 筆記本，請選取您的輸入 Amazon MSK 叢集或 Kinesis 資料串流，然後選擇**即時處理資料**。

1. 在**建立 Studio 筆記本**頁面，提供下列資訊：
   + 為 **Studio 筆記本名稱**輸入 **MyNotebook**。
   + 為 **AWS Glue 資料庫**選擇**預設值**。

   選擇**建立 Studio 筆記本**。

1. 在 **MyNotebook** 頁面，選擇**組態**標籤。在**網路模式**區段中，選擇**編輯**。

1. 在**編輯 MyNotebook 聯網**頁面，選擇**以 Amazon MSK 叢集為基礎的 VPC 組態**。為 **Amazon MSK 叢集**選擇 Amazon MSK 叢集。選擇**儲存變更**。

1. 在 **MyNotebook** 頁面，選擇**執行**。等待**狀態**顯示為**執行中**。

### 使用 建立 Studio 筆記本 AWS CLI
<a name="example-notebook-msk-create-api"></a>

若要使用 建立 Studio 筆記本 AWS CLI，請執行下列動作：

1. 請務必備妥下列資訊：您需要這些值來建立應用程式。
   + 帳戶 ID。
   + 子網路 ID 以及包含 Amazon MSK 叢集的 Amazon VPC 的安全群組 ID。

1. 建立稱為 `create.json` 的檔案，其中具有以下內容。使用您的資訊取代預留位置的值。

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 若要建立應用程式，請執行下列命令：

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 命令完成後，您應該會看到類似如下的輸出，其中顯示新 Studio 筆記本的詳細資料：

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 若要執行應用程式，請執行下列命令：使用您的帳戶 ID 取代範例值。

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## 將資料傳送至 Amazon MSK 叢集
<a name="example-notebook-msk-send"></a>

在本節中，您會在 Amazon EC2 用戶端中執行 Python 指令碼，以將資料傳送到您的 Amazon MSK 資料來源。

1. 連線到 Amazon EC2 用戶端。

1. 執行以下命令來安裝 Python 版本 3、Pip 和 Kafka for Python 套件，並確認操作：

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. 輸入下列命令，在用戶端機器 AWS CLI 上設定 ：

   ```
   aws configure
   ```

   提供帳戶憑證，並為 `region` 提供 **us-east-1**。

1. 建立稱為 `stock.py` 的檔案，其中具有以下內容。使用 Amazon MSK 叢集的啟動代理程式字串取代範例值，如果您的主題不是 **AWS KafkaTutorialTopic**，請更新主題名稱：

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. 使用下列命令執行指令碼：

   ```
   $ python3 stock.py
   ```

1. 完成下一節時，讓指令碼保持執行狀態。

## 測試 Studio 筆記本
<a name="example-notebook-msk-test"></a>

在本節中，您可以使用 Studio 筆記本查詢 Amazon MSK 叢集中的資料。

1. 前往 [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) 開啟 Managed Service in the Apache Flink 主控台。

1. 在 **Managed Service for Apache Flink 應用程式**頁面，選擇 **Studio 筆記本**標籤。選擇 **MyNotebook**。

1. 在 **MyNotebook** 頁面，選擇**在 Apache Zeppelin 中開啟**。

   Apache Zeppelin 介面會在新標籤中開啟。

1. 在**歡迎來到 Zeppelin\$1** 頁面，選擇 **Zeppelin 新筆記**。

1. 在 **Zeppelin 筆記**頁面，在新筆記中輸入以下查詢：

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   選擇執行圖示。

   應用程式會顯示 Amazon MSK 叢集中的資料。

若要為應用程式開啟 Apache Flink 儀表板以檢視操作層面，請選擇 **FLINK 作業**。如需 Flink 儀表板的詳細資訊，請參閱 [Managed Service for Apache Flink 開發人員指南](https://docs.aws.amazon.com/)中的 [Apache Flink 儀表板](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)。

如需 Flink 串流 SQL 查詢的更多範例，請參閱 [Apache Flink 文件](https://nightlies.apache.org/flink/flink-docs-release-1.15/)中的[查詢](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

# 清除您的應用程式和相依資源
<a name="example-notebook-cleanup"></a>

## 刪除 Studio 筆記本
<a name="example-notebook-cleanup-app"></a>

1. 開啟 Managed Service for Apache Flink 主控台。

1. 選擇 **MyNotebook**。

1. 依序選擇**動作**和**刪除**。

## 刪除您的 AWS Glue 資料庫和連線
<a name="example-notebook-cleanup-glue"></a>

1. 在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) 開啟 AWS Glue 主控台。

1. 從左側導覽列選擇**資料庫**。核取**預設**旁邊的核取方塊以選取它。依序選擇**動作**和**刪除資料庫**。確認您的選擇。

1. 從左側導覽列選擇**連線**。選中 **ZeppelinConnection** 旁邊的核取方塊以選取它。依序選擇**動作**和**刪除連線**。確認您的選擇。

## 刪除 IAM 角色和政策
<a name="example-notebook-msk-cleanup-iam"></a>

1. 前往 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 開啟 IAM 主控台。

1. 從左側導覽選單選擇**角色**。

1. 使用搜尋列搜尋 **ZeppelinRole** 角色。

1. 選擇 **ZeppelinRole** 角色。選擇**刪除角色**。確認刪除。

## 刪除 CloudWatch 日誌群組
<a name="example-notebook-cleanup-cw"></a>

使用主控台建立應用程式時，主控台會為您建立 CloudWatch 日誌群組和日誌串流。如果您已使用 AWS CLI建立應用程式，則沒有日誌群組和串流。

1. 透過 [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/) 開啟 CloudWatch 主控台。

1. 從左側導覽列選擇**日誌群組**。

1. 選擇 **/AWS/KinesisAnalytics/MyNotebook** 日誌群組。

1. 選擇**動作**、**刪除日誌群組**。確認刪除。

## 清除 Kinesis Data Streams 資源
<a name="example-notebook-cleanup-streams"></a>

若要刪除 Kinesis 串流，請開啟 Kinesis Data Streams 主控台，選取您的 Kinesis 串流，然後選擇**動作** > **刪除**。

## 清除 MSK 資源
<a name="example-notebook-cleanup-msk"></a>

如果您已在本教學課程中建立 Amazon MSK 叢集，請按照本節中的步驟進行操作。本節說明如何清理 Amazon EC2 用戶端執行個體、Amazon VPC 和 Amazon MSK 叢集。

### 刪除您的 Amazon MSK 叢集
<a name="example-notebook-msk-cleanup-msk"></a>

如果您已在本教學課程中建立 Amazon MSK 叢集，請按照以下步驟進行操作。

1. 開啟 Amazon MSK 主控台，網址為 [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)。

1. 選擇 **AWS KafkaTutorialCluster**。選擇 **刪除**。在出現的視窗中輸入 **delete**，然後確認選擇。

### 終止您的用戶端執行個體
<a name="example-notebook-msk-cleanup-client"></a>

如果您已在本教學課程中建立 Amazon EC2 用戶端執行個體，請按照以下步驟進行操作。

1. 前往 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 開啟 Amazon EC2 主控台。

1. 從左側導覽列中選擇**執行個體**。

1. 選擇 **ZeppelinClient** 旁邊的核取方塊以選取它。

1. 依序選擇**執行個體狀態**和**終止執行個體**。

### 刪除 Amazon VPC
<a name="example-notebook-msk-cleanup-vpc"></a>

如果您已在本教學課程中建立 Amazon VPC，請按照以下步驟進行操作。

1. 前往 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 開啟 Amazon EC2 主控台。

1. 從左側導覽列選擇**網路介面**。

1. 在搜尋列中輸入 VPC ID，然後按 Enter 鍵。

1. 選取資料表標題中的核取方塊，以選取所有顯示的網路介面。

1. 選擇 **Actions** (動作)、**Detach** (分離)。在出現的視窗中，選擇**強制分離**下方的**啟用**。選擇**分離**，然後等待所有網路介面都到達**可用**狀態。

1. 選取資料表標題中的核取方塊，以再次選取所有顯示的網路介面。

1. 選擇 **動作**、**刪除**。確認動作。

1. 在 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 開啟 Amazon VPC 主控台。

1. 選取 **AWS KafkaTutorialVPC**。依序選擇**動作**和**刪除 VPC**。輸入 **delete** 並確認刪除。

# 教學課程：將 Studio 筆記本部署為具有持久狀態的 Managed Service for Apache Flink 應用程式
<a name="example-notebook-deploy"></a>

下列教學課程示範如何將 Studio 筆記本部署為具有持久狀態的 Managed Service for Apache Flink 應用程式。

**Topics**
+ [完成事前準備](#example-notebook-durable-setup)
+ [使用 部署具有持久狀態的應用程式 AWS 管理主控台](#example-notebook-deploy-console)
+ [使用 部署具有持久狀態的應用程式 AWS CLI](#example-notebook-deploy-cli)

## 完成事前準備
<a name="example-notebook-durable-setup"></a>

按照[教學課程：在 Managed Service for Apache Flink 中建立 Studio 筆記本](example-notebook.md)建立新的 Studio 筆記本，使用 Kinesis Data Streams 或 Amazon MSK。命名 Studio 筆記本 `ExampleTestDeploy`。

## 使用 部署具有持久狀態的應用程式 AWS 管理主控台
<a name="example-notebook-deploy-console"></a>

1. 在主控台中的**應用程式程式碼位置 - *選用***下，新增您希望將封裝程式碼存放到的 S3 儲存貯體位置。這可讓步驟直接從筆記本部署和執行應用程式。

1. 將必要的許可新增至應用程式角色，以啟用您要用來讀取和寫入 Amazon S3 儲存貯體以及啟動 Managed Service for Apache Flink 應用程式的角色：
   + AmazonS3FullAccess
   + Amazonmanaged-flinkFullAccess
   + 視情況存取您的來源、目的地和 VPC。如需詳細資訊，請參閱[檢閱 Studio 筆記本的 IAM 許可](how-zeppelin-iam.md)。

1. 請參閱以下範例程式碼：

   ```
   %flink.ssql(type=update) 
   CREATE TABLE exampleoutput (
     'ticket' VARCHAR,
     'price' DOUBLE
   )
   WITH (
     'connector' = 'kinesis',
     'stream' = 'ExampleOutputStream',
     'aws.region' = 'us-east-1',
     'scan.stream.initpos' = 'LATEST',
     'format' = 'json'
   );
   
   INSERT INTO exampleoutput SELECT ticker, price FROM exampleinputstream
   ```

1. 啟動此功能後，您將在筆記本中每條筆記的右上角看到一個新的下拉式選單，其中包含筆記本的名稱。您可以執行下列動作：
   + 在 AWS 管理主控台中檢視 Studio 筆記本設定。
   + 建立 Zeppelin 筆記，並將其匯出至 Amazon S3。此時，請為您的應用程式提供名稱，然後選擇**建置和匯出**。匯出完成後，您會收到通知。
   + 如有需要，您可以在 Amazon S3 中的可執行檔上檢視和執行任何其他測試。
   + 建置完成後，您將能夠將程式碼部署為具有持久狀態和自動調度資源的 Kinesis 串流應用程式。
   + 使用下拉式選單並選擇**將 Zeppelin 筆記部署為 Kinesis 串流應用程式**。檢閱應用程式名稱，然後選擇**透過 AWS 主控台部署**。
   + 這將引導您前往建立 Managed Service for Apache Flink 應用程式的 AWS 管理主控台 頁面。請注意，已預先填入應用程式名稱、平行處理層級、程式碼位置、預設 Glue DB、VPC (如果適用) 和 IAM 角色。驗證 IAM 角色是否具有來源和目的地的必要許可。快照預設啟用，以進行持久的應用程式狀態管理。
   + 選擇**建立應用程式**。
   + 您可以選擇**設定**並修改任何設定，然後選擇**執行**以啟動串流應用程式。

## 使用 部署具有持久狀態的應用程式 AWS CLI
<a name="example-notebook-deploy-cli"></a>

若要使用 部署應用程式 AWS CLI，您必須更新 AWS CLI 以使用 Beta 2 資訊隨附的服務模型。如需如何使用更新的服務模型之相關資訊，請參閱 [完成先決條件完成事前準備](example-notebook.md#example-notebook-setup)。

以下範例程式碼會建立 Studio 筆記本：

```
aws kinesisanalyticsv2 create-application \
     --application-name <app-name> \
     --runtime-environment ZEPPELIN-FLINK-3_0 \
     --application-mode INTERACTIVE \
     --service-execution-role <iam-role>
     --application-configuration '{ 
       "ZeppelinApplicationConfiguration": { 
         "CatalogConfiguration": { 
           "GlueDataCatalogConfiguration": { 
             "DatabaseARN": "arn:aws:glue:us-east-1:<account>:database/<glue-database-name>" 
           } 
         } 
       },
       "FlinkApplicationConfiguration": {
         "ParallelismConfiguration": {
           "ConfigurationType": "CUSTOM",
           "Parallelism": 4,
           "ParallelismPerKPU": 4
         }
       },
       "DeployAsApplicationConfiguration": {
            "S3ContentLocation": { 
               "BucketARN": "arn:aws:s3:::<s3bucket>",
               "BasePath": "/something/"
            }
        },
       "VpcConfigurations": [
         {
           "SecurityGroupIds": [
             "<security-group>"
           ],
           "SubnetIds": [
             "<subnet-1>",
             "<subnet-2>"
           ]
         }
       ]
     }' \
     --region us-east-1
```

下列程式碼範例會啟動 Studio 筆記本：

```
aws kinesisanalyticsv2 start-application \
    --application-name <app-name> \
    --region us-east-1 \
    --no-verify-ssl
```

下列程式碼會傳回應用程式的 Apache Zeppelin 筆記本頁面的 URL：

```
aws kinesisanalyticsv2 create-application-presigned-url \
    --application-name <app-name> \
    --url-type ZEPPELIN_UI_URL \

    --region us-east-1 \
    --no-verify-ssl
```

# 檢視 Studio 筆記本中分析資料的範例查詢
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [使用 Amazon MSK/Apache Kafka 建立資料表](#how-zeppelin-examples-creating-tables)
+ [使用 Kinesis 建立資料表](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [查詢輪轉時段](#how-zeppelin-examples-tumbling)
+ [查詢滑動視窗](#how-zeppelin-examples-sliding)
+ [使用互動式 SQL](#how-zeppelin-examples-interactive-sql)
+ [使用 BlackHole SQL 連接器](#how-zeppelin-examples-blackhole-connector-sql)
+ [使用 Scala 產生範例資料](#notebook-example-data-generator)
+ [使用互動式 Scala](#notebook-example-interactive-scala)
+ [使用互動式 Python](#notebook-example-interactive-python)
+ [使用互動式 Python、SQL 和 Scala 的組合](#notebook-example-interactive-pythonsqlscala)
+ [使用跨帳戶 Kinesis 資料串流](#notebook-example-crossaccount-kds)

如需 Apache Flink SQL 查詢設定的相關資訊，請參閱[在 Zeppelin 筆記本上使用 Flink 進行互動式資料分析](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html)。

若要在 Apache Flink 儀表板中檢視應用程式，請在應用程式的 **Zeppelin 筆記**頁面中選擇 **FLINK 作業**。

如需視窗查詢的詳細資訊，請參閱 [Apache Flink 文件](https://nightlies.apache.org/flink/flink-docs-release-1.15/)中的[視窗](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html)。

如需 Apache Flink 串流 SQL 查詢的更多範例，請參閱 [Apache Flink 文件](https://nightlies.apache.org/flink/flink-docs-release-1.15/)中的[查詢](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

## 使用 Amazon MSK/Apache Kafka 建立資料表
<a name="how-zeppelin-examples-creating-tables"></a>

您可以將 Amazon MSK Flink 連接器與 Managed Service for Apache Flink Studio 搭配使用，以使用純文字、SSL 或 IAM 身分驗證來驗證您的連線。根據您的需求，使用特定屬性建立資料表。

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

您可以將這些與 [ Apache Kafka SQL 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/)的其他屬性結合使用。

## 使用 Kinesis 建立資料表
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

下列範例示範如何使用 Kinesis 建立資料表：

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

如需可使用的其他屬性的詳細資訊，請參閱 [Amazon Kinesis Data Streams SQL 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/)。

## 查詢輪轉時段
<a name="how-zeppelin-examples-tumbling"></a>

下列 Flink 串流 SQL 查詢會從 `ZeppelinTopic` 資料表中選取每個五秒鐘輪轉時段中的最高價格：

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## 查詢滑動視窗
<a name="how-zeppelin-examples-sliding"></a>

下列 Apache Flink 串流 SQL 查詢會從 `ZeppelinTopic` 資料表中選取每個五秒滑動視窗中的最高價格：

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## 使用互動式 SQL
<a name="how-zeppelin-examples-interactive-sql"></a>

此範例會列印事件時間和處理時間的最大值，以及索引鍵-值資料表中的值的總和。請確定您擁有 [使用 Scala 產生範例資料](#notebook-example-data-generator) 執行中的範例資料產生指令碼。若要在您的 Studio 筆記本中嘗試其他 SQL 查詢 (例如篩選和聯結)，請參閱《Apache Flink 文件》中的[查詢](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## 使用 BlackHole SQL 連接器
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

BlackHole SQL 連接器不需要您建立 Kinesis 資料串流或 Amazon MSK 叢集來測試查詢。如需 BlackHole SQL 連接器的相關資訊，請參閱《Apache Flink 文件》中的 [BlackHole SQL 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html)。在此範例中，預設目錄是記憶體內目錄。

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## 使用 Scala 產生範例資料
<a name="notebook-example-data-generator"></a>

此範例使用 Scala 生成範例資料。您可以使用此範例資料測試各種查詢。使用 create table 陳述式來建立索引鍵-值資料表。

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## 使用互動式 Scala
<a name="notebook-example-interactive-scala"></a>

這是 [使用互動式 SQL](#how-zeppelin-examples-interactive-sql) 的 Scala 翻譯。如需更多 Scala 範例，請參閱《Apache Flink 文件》中的[資料表 API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html)。

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## 使用互動式 Python
<a name="notebook-example-interactive-python"></a>

這是 [使用互動式 SQL](#how-zeppelin-examples-interactive-sql) 的 Python 翻譯。如需更多 Python 範例，請參閱《Apache Flink 文件》中的[資料表 API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html)。

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## 使用互動式 Python、SQL 和 Scala 的組合
<a name="notebook-example-interactive-pythonsqlscala"></a>

您可以在筆記本中使用 SQL、Python 和 Scala 的任意組合進行互動式分析。在您計劃部署為具有持久狀態的應用程式的 Studio 筆記本中，可以使用 SQL 和 Scala 的組合。此範例顯示略過的區段，以及在應用程式中部署為持久狀態的區段。

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## 使用跨帳戶 Kinesis 資料串流
<a name="notebook-example-crossaccount-kds"></a>

若要使用擁有您 Studio 筆記本之帳戶以外的帳戶中的 Kinesis 資料串流，請在執行 Studio 筆記本的帳戶中建立服務執行角色，在具有資料串流的帳戶中建立角色信任政策。在 Kinesis 連接器中的 create table DDL 陳述式中，使用 `aws.credentials.provider`、`aws.credentials.role.arn` 和 `aws.credentials.role.sessionName`，針對資料串流建立資料表。

對 Studio 筆記本帳戶使用下列服務執行角色。

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

對資料串流帳戶使用 `AmazonKinesisFullAccess` 政策和下列角色信任政策。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

為 create table 陳述式使用下面的段落。

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```