

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Studio 筆記本搭配 Managed Service for Apache Flink
<a name="how-notebook"></a>

適用於 Managed Service for Apache Flink 的 Studio 筆記本可讓您即時以互動方式查詢資料串流，並使用標準 SQL、Python 和 Scala 輕鬆建置和執行串流處理應用程式。只要在 AWS 管理主控台中按幾下，您就可以啟動無伺服器筆記本來查詢資料串流，並在幾秒鐘內取得結果。

筆記本是基於 Web 的開發環境。使用筆記本，您不僅能獲得簡單的互動式開發體驗，還能使用 Apache Flink 提供的進階功能。Studio 筆記本使用由 [Apache Zeppelin](https://zeppelin.apache.org/) 提供支援的筆記本，使用 [Apache Flink](https://flink.apache.org/) 作為串流處理引擎。Studio 筆記本無縫結合了這些技術，讓所有技能背景的開發人員都能存取資料串流的進階分析。

Apache Zeppelin 為您的 Studio 筆記本提供了完整的分析工具套件，包括以下專案：
+ 資料視覺化
+ 將資料匯出到檔案
+ 控制輸出格式以便於分析

若要開始使用 Managed Service for Apache Flink 和 Apache Zeppelin，請參閱[教學課程：在 Managed Service for Apache Flink 中建立 Studio 筆記本](example-notebook.md)。如需 Apache Zeppelin 的詳細資訊，請參閱 [Apache Zeppelin 文件](http://zeppelin.apache.org)。

 借助筆記本，您可以使用 SQL、Python 或 Scala 中的 Apache Flink [資料表 API 和 SQL](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/) 或 Scala 中的 [DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) 進行查詢建模。只需點按幾下，即可將 Studio 筆記本升級為適用於生產工作負載的、持續執行的、非互動式 Managed Service for Apache Flink 串流處理應用程式。

**Topics**
+ [使用正確的 Studio 筆記本執行期版本](studio-notebook-versions.md)
+ [建立 Studio 筆記本](how-zeppelin-creating.md)
+ [執行串流資料的互動式分析](how-zeppelin-interactive.md)
+ [將 部署為具有持久狀態的應用程式](how-notebook-durable.md)
+ [IAM 許可](how-zeppelin-iam.md)
+ [使用連接器和相依性](how-zeppelin-connectors.md)
+ [使用者定義的函數](how-zeppelin-udf.md)
+ [啟用檢查點](how-zeppelin-checkpoint.md)
+ [升級 Studio 執行期](upgrading-studio-runtime.md)
+ [使用 AWS Glue](how-zeppelin-glue.md)
+ [Managed Service for Apache Flink 中 Studio 筆記本的範例和教學課程](how-zeppelin-examples.md)
+ [Managed Service for Apache Flink 的 Studio 筆記本疑難排解](how-zeppelin-troubleshooting.md)
+ [為 Managed Service for Apache Flink Studio 筆記本建立自訂 IAM 政策](how-zeppelin-appendix-iam.md)

# 使用正確的 Studio 筆記本執行期版本
<a name="studio-notebook-versions"></a>

透過 Amazon Managed Service for Apache Flink Studio，您可以在互動式筆記本中即時查詢資料串流，並使用標準 SQL、Python 和 Scala 建置和執行串流處理應用程式。Studio 筆記本採用 [Apache Zeppelin](https://zeppelin.apache.org/) 技術，並使用 [Apache Flink](https://flink.apache.org/) 做為串流處理引擎。

**注意**  
我們將**於 2024 年 11 月 5 日以 Apache Flink 1.11 版**取代 Studio 執行期。從這個日期開始，您將無法執行新的筆記本或使用此版本建立新的應用程式。我們建議您在該時間之前升級至最新的執行時間 (Apache Flink 1.15 和 Apache Zeppelin 0.10)。如需如何升級筆記本的指引，請參閱 [升級 Studio 執行期](upgrading-studio-runtime.md)。


**Studio 執行期**  

| Apache Flink 版本 | Apache Zeppelin 版本 | Python 版本 |  | 
| --- | --- | --- | --- | 
| 1.15 | 0.1 | 3.8 | 建議 | 
| 1.13 | 0.9 | 3.8 | 支援至 2024 年 10 月 16 日 | 
| 1.11 | 0.9 | 3.7 | 已於 2025 年 2 月 24 日棄用 | 

# 建立 Studio 筆記本
<a name="how-zeppelin-creating"></a>

Studio 筆記本包含用 SQL、Python 或 Scala 編寫的查詢或程式，這些查詢或程式可以在串流資料上執行並傳回分析結果。您使用主控台或 CLI 建立應用程式，並提供查詢以分析資料來源中的資料。

應用程式具有下列元件：
+ 資料來源，例如 Amazon MSK 叢集、Kinesis 資料串流或 Amazon S3 儲存貯體。
+  AWS Glue 資料庫。此資料庫包含儲存資料來源、目標結構描述和端點的資料表。如需詳細資訊，請參閱 [使用 AWS Glue](how-zeppelin-glue.md)。
+ 應用程式的程式碼。您的程式碼會實作您的分析查詢或程式。
+ 您的應用程式設定和執行期屬性。如需應用程式設定和執行期屬性的相關資訊，請參閱 [Apache Flink 應用程式開發人員指南](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html)中的下列主題：
  + **應用程式平行處理和擴展**：您可以使用應用程式的平行處理設定來控制應用程式可同時執行的查詢數目。如果查詢具有多個執行路徑，則還可以利用增加的平行處理，如下列情況所示：
    + 處理 Kinesis 資料串流的多個碎片時
    + 使用 `KeyBy` 運算子分割資料時。
    + 使用多個視窗運算子時

    如需應用程式擴展的詳細資訊，請參閱 [Managed Service for Apache Flink 中的應用程式擴展](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html)。
  + **記錄和監控**：如需應用程式記錄和監控的相關資訊，請參閱 [Amazon Managed Service for Apache Flink 中的記錄和監控](https://docs.aws.amazon.com/managed-flink/latest/java/monitoring-overview.html)。
  + 應用程式使用檢查點和儲存點進行容錯。依預設，Studio 筆記本不啟用檢查點和儲存點。

您可以使用 AWS 管理主控台 或 建立 Studio 筆記本 AWS CLI。

從主控台建立應用程式時，您可以使用下列選項：
+ 在 Amazon MSK 主控台中，選擇您的叢集，然後選擇**即時處理資料**。
+ 在 Kinesis Data Streams 主控台中，選擇您的資料串流，然後在**應用程式**標籤上選擇**即時處理資料**。
+ 在 Managed Service for Apache Flink 主控台中，選擇 **Studio** 標籤，然後選擇**建立 Studio 筆記本**。

# 執行串流資料的互動式分析
<a name="how-zeppelin-interactive"></a>

您使用由 Apache Zeppelin 提供支援的無伺服器筆記本與串流資料互動。您的筆記本可以包含多條筆記，每條筆記可以有一個或多個段落，可以在其中撰寫程式碼。

下列範例 SQL 查詢顯示如何從資料來源擷取資料：

```
%flink.ssql(type=update)
select * from stock;
```

如需 Flink 串流 SQL 查詢的更多範例，請參閱 Apache Flink 文件中的[Managed Service for Apache Flink 中 Studio 筆記本的範例和教學課程](how-zeppelin-examples.md)以下內容和[查詢](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/overview/)。

您可以在 Studio 筆記本中使用 Flink SQL 查詢來查詢串流資料。也可以使用 Python (資料表 API) 和 Scala (資料表和 Datastream API) 編寫程式，以互動方式查詢串流資料。您可以檢視查詢或程式的結果，在幾秒鐘內更新它們，然後重執行以檢視更新的結果。

## Flink 解譯器
<a name="how-zeppelin-interactive-interpreters"></a>

您可以使用*解譯器*指定 Managed Service for Apache Flink 用來執行應用程式的語言。您可以將下列解譯器用於 Managed Service for Apache Flink：


| 名稱 | 類別 | Description | 
| --- |--- |--- |
| %flink | FlinkInterpreter | Creates ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment and provides a Scala environment | 
| %flink.pyflink | PyFlinkInterpreter | Provides a python environment | 
| %flink.ipyflink | IPyFlinkInterpreter | Provides an ipython environment | 
| %flink.ssql | FlinkStreamSqlInterpreter | Provides a stream sql environment | 
| %flink.bsql | FlinkBatchSqlInterpreter | Provides a batch sql environment | 

如需 Flink 解譯器的詳細資訊，請參閱 [Apache Zeppelin 的 Flink 解譯器](https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html)。

如果您使用 `%flink.pyflink` 或 `%flink.ipyflink` 作為解譯器，則需要使用 `ZeppelinContext` 來視覺化筆記本內的結果。

如需更多的 PyFlink 具體範例，請參閱[使用適用於 Studio 和 Python 的 Managed Service for Apache Flink 以互動方式查詢資料串流](https://aws.amazon.com/blogs/big-data/query-your-data-streams-interactively-using-kinesis-data-analytics-studio-and-python/)。

## Apache Flink 資料表環境變數
<a name="how-zeppelin-interactive-env-vars"></a>

Apache Zeppelin 使用環境變數提供對資料表環境資源的存取。

您可以使用以下變數存取 Scala 資料表環境資源：


| 變數 | 資源 | 
| --- |--- |
| senv | StreamExecutionEnvironment | 
| stenv | Blink 規劃器的 StreamTableEnvironment | 

您可以使用以下變數存取 Python 資料表環境資源：


| 變數 | 資源 | 
| --- |--- |
| s\$1env | StreamExecutionEnvironment | 
| st\$1env | Blink 規劃器的 StreamTableEnvironment | 

如需使用資料表環境的詳細資訊，請參閱 Apache Flink 文件中的[概念和常見 API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/common/)。

# 將 部署為具有持久狀態的應用程式
<a name="how-notebook-durable"></a>

您可以建立程式碼並將其匯出到 Amazon S3。您可以將在筆記中撰寫的程式碼升級為持續執行的串流處理應用程式。在 Managed Service for Apache Flink 上執行 Apache Flink 應用程式有兩種模式：使用 Studio 筆記本，您可以互動方式開發程式碼、即時檢視程式碼結果，並在筆記中以視覺化方式呈現。您將筆記部署為在串流模式下執行後，Managed Service for Apache Flink 可以建立一個持續執行的應用程式、從來源讀取資料、寫入目的地、讓應用程式維持長時間執行狀態，以及根據來源串流的輸送量自動擴展資源。

**注意**  
應用程式的程式碼匯出到的 S3 儲存貯體必須與 Studio 筆記本位於相同的區域。

只有在符合下列條件的情況下，才能部署 Studio 筆記本的筆記：
+ 段落必須按順序排列。部署應用程式時，筆記中的所有段落都會依序執行 (從左至右、從上至下)，如同它們在筆記中顯示的一樣。您可以透過在筆記中選擇**執行所有段落**來檢查此順序。
+ 你的程式碼是 Python 和 SQL 或 Scala 和 SQL 的組合。對於部署即應用程式，目前不支援 Python 和 Scala 的組合。
+ 您的筆記必須只包含下列解譯器：`%flink`、`%flink.ssql`、`%flink.pyflink`、`%flink.ipyflink`、`%md`。
+ 不支援使用 [Zeppelin 內容](https://zeppelin.apache.org/docs/0.9.0/usage/other_features/zeppelin_context.html)物件 `z`。不傳回任何結果的方法不會執行任何動作，除記錄警告之外。其他方法將引發 Python 例外狀況或無法在 Scala 中編譯。
+ 筆記必須產生單一 Apache Flink 作業。
+ 不支援將具有[動態資料表](https://zeppelin.apache.org/docs/0.9.0/usage/dynamic_form/intro.html)的筆記部署為應用程式。
+ %md ([Markdown](https://zeppelin.apache.org/docs/0.9.0/interpreter/markdown.html)) 段落在部署為應用程式時會略過，因為這些段落預期會包含人類可讀的文件，不適合作為產生的應用程式的一部分執行。
+ 部署為應用程式時，將會略過不在 Zeppelin 中執行的停用段落。即使停用的段落使用不相容的解譯器 (例如含有`%flink` `and %flink.ssql` 解譯器的筆記中的 `%flink.ipyflink`)，在將筆記部署為應用程式時，仍會略過該解譯器，並且不會產生錯誤。
+ 來源程式碼 (Flink SQL、PyFlink 或 Flink Scala) 中必須至少有一個段落處於啟用且可執行狀態，才能成功部署應用程式。
+ 在某個段落內的解譯器指令中設定平行處理 (例如 `%flink.ssql(parallelism=32)`) 將在從筆記部署的應用程式中略過。反之，您可以透過 AWS 管理主控台、 AWS Command Line Interface 或 AWS API 更新已部署的應用程式，以根據應用程式所需的平行處理層級變更平行處理和/或ParallelismPerKPU 設定，或者您可以為已部署的應用程式啟用自動擴展。
+ 如果要部署為具有持久狀態的應用程式，則您的 VPC 必須具有網際網路存取。如果您的 VPC 無法存取網際網路，請參閱[在無網際網路存取的 VPC 中，部署為具有持久狀態的應用程式](how-zeppelin-troubleshooting.md#how-zeppelin-troubleshooting-deploying-no-internet)。

## Scala/Python 條件
<a name="how-notebook-durable-scala"></a>
+ 在 Scala 或 Python 程式碼中，使用 [Blink 規劃器](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/#dependency-structure) (對於 Scala，是 `senv`，`stenv`；對於 Python，是 `s_env`，`st_env`)，而不是較舊的「Flink」規劃器 (對於 Scala，是 `stenv_2`；對於 Python，是 `st_env_2`)。Apache Flink 專案建議在生產用例中使用 Blink 規劃器，這是 Zeppelin 和 Flink 中的預設規劃器。
+ Python 段落不得在預定部署為應用程式的筆記中使用使用 `!` 的 [shell 調用/指派](https://ipython.readthedocs.io/en/stable/interactive/python-ipython-diff.html#shell-assignment)或 [IPython 魔術命令](https://ipython.readthedocs.io/en/stable/interactive/magics.html)，例如 `%timeit` 或 `%conda`。
+ 您不能使用 Scala 案例類別作為傳遞給高階資料流程運算子 (如 `map` 和 `filter`) 的函數的參數。如需 Scala 案例類別的相關資訊，請參閱 Scala 文件中的[案例類別](https://docs.scala-lang.org/overviews/scala-book/case-classes.html)。

## SQL 條件
<a name="how-notebook-durable-sql"></a>
+ 不允許使用簡單的 SELECT 陳述式，因為沒有相當於段落的輸出部分可以傳遞資料。
+ 在任何指定段落中，DDL 陳述式 (`USE`、`CREATE`、`ALTER`、`DROP`、`SET`、`RESET`) 都必須放在 DML (`INSERT`) 陳述式前面。這是因為，段落中的 DML 陳述式必須作為單一 Flink 作業一起提交。
+ 最多只能有一個段落中包含 DML 陳述式。這是因為，對於「部署即應用程式」功能，我們僅支援提交單一作業至 Flink。

如需詳細資訊和範例，請參閱[搭配使用 SQL 函數與 Amazon Managed Service for Apache Flink、Amazon Translate 和 Amazon Comprehend 來翻譯、修訂和分析串流資料](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesisanalytics-MyApplicatioamazon-translate-and-amazon-comprehend/)。

# 檢閱 Studio 筆記本的 IAM 許可
<a name="how-zeppelin-iam"></a>

透過 AWS 管理主控台建立 Studio 筆記本時，Managed Service for Apache Flink 會為您建立 IAM 角色。它還會將允許下列存取的政策與該角色相關聯：


****  

| 服務 | 存取  | 
| --- | --- | 
| CloudWatch Logs | 清單 | 
| Amazon EC2 | 清單 | 
| AWS Glue | 讀取，寫入 | 
| Managed Service for Apache Flink | 讀取 | 
| Managed Service for Apache Flink V2 | 讀取 | 
| Amazon S3 | 讀取，寫入 | 

# 使用連接器和相依性
<a name="how-zeppelin-connectors"></a>

連接器可讓您跨越各種技術讀取和寫入資料。Managed Service for Apache Flink 會將三個預設連接器與您的 Studio 筆記本綁定。您也可以使用自訂連接器。如需連接器的詳細資訊，請參閱《Apache Flink 文件》中的[資料表和 SQL 連接器](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/)。

## 預設連接器
<a name="zeppelin-default-connectors"></a>

如果您使用 AWS 管理主控台 建立 Studio 筆記本，Managed Service for Apache Flink 預設會包含下列自訂連接器： `flink-sql-connector-kinesis``flink-connector-kafka_2.12`和 `aws-msk-iam-auth`。若要在沒有這些自訂連接器的情況下透過主控台建立 Studio 筆記本，請選擇**使用自訂設定建立**選項。然後，當您進入**組態**頁面時，清除兩個連接器旁邊的核取方塊。

如果您使用 [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) API 來建立 Studio 筆記本，預設不包含 `flink-sql-connector-flink` 和 `flink-connector-kafka` 連接器。若要新增它們，請將它們指定為 `CustomArtifactsConfiguration` 資料類型的 `MavenReference`，如下列範例所示。

`aws-msk-iam-auth` 連接器是與 Amazon MSK 搭配使用的連接器，其中包含可透過 IAM 自動驗證的功能。

**注意**  
下列範例中顯示的連接器版本是我們支援的唯一版本。

```
For the Kinesis connector:

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "org.apache.flink",

      "ArtifactId": "flink-sql-connector-kinesis",
      "Version": "1.15.4"

   }      
}]

For authenticating with AWS MSK through AWS IAM:

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "software.amazon.msk",
      "ArtifactId": "aws-msk-iam-auth",
      "Version": "1.1.6"
   }      
}]
            
For the Apache Kafka connector:  

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "org.apache.flink",

      "ArtifactId": "flink-connector-kafka",
      "Version": "1.15.4"

   }      
}]
```

若要將這些連接器新增至現有的筆記本，請使用 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) API 作業，並將它們指定為 `CustomArtifactsConfigurationUpdate` 資料類型的 `MavenReference`。

**注意**  
針對資料表 API 中的 `flink-sql-connector-kinesis` 連接器，您可以將 `failOnError` 設定為 true。

## 新增相依性和自訂連接器
<a name="zeppelin-custom-connectors"></a>

若要使用 AWS 管理主控台 將相依性或自訂連接器新增至 Studio 筆記本，請遵循下列步驟：

1. 將自訂連接器的檔案上傳到 Amazon S3。

1. 在 中 AWS 管理主控台，選擇建立 Studio 筆記本的**自訂建立**選項。

1. 遵循 Studio 筆記本建立工作流程，直到進入**組態**步驟。

1. 在**自訂連接器**區段，選擇**新增自訂連接器**。

1. 指定相依性或自訂連接器的 Amazon S3 位置。

1. 選擇**儲存變更**。

若要在使用 [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) API 建立新的 Studio 筆記本時新增相依性 JAR 或自訂連接器，請在 `CustomArtifactsConfiguration` 資料類型中指定相依性 JAR 或自訂連接器的 Amazon S3 位置。若要將相依性或自訂連接器新增至現有的 Studio 筆記本，請調用 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) API，並在 `CustomArtifactsConfigurationUpdate` 資料類型中指定相依性 JAR 或自訂連接器的 Amazon S3 位置。

**注意**  
包含相依性或自訂連接器時，還必須包含其中未綁定的所有可轉移相依性。

# 實作使用者定義的函數
<a name="how-zeppelin-udf"></a>

使用者定義的函數 (UDF) 是一些延伸點，可讓您呼叫常用邏輯或無法在查詢中以其他方式表示的自訂邏輯。您可以使用 Python 或類似 Java 或 Scala 的 JVM 語言，在 Studio 筆記本的段落中實作您的 UDF。您也可以將包含以 JVM 語言實作的 UDF 新增至 Studio 筆記本外部 JAR 檔案。

當實作註冊該子類 `UserDefinedFunction` (或您自己的抽像類) 的抽像類的 JAR 時，請使用 Apache Maven 中提供的範圍、Gradle 中的 `compileOnly` 相依性宣告、SBT 中提供的範圍或 UDF 專案建置組態中的等效指令。這可讓 UDF 來源程式碼根據 Flink API 進行編譯，但 Flink API 類別本身並不包含在建置成品中。請參閱來自 UDF jar 範例的此 [pom](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47)，該範例符合 Maven 專案上的這種先決條件。

**注意**  
如需範例設定，請參閱 *AWS 機器學習部落格*中的[搭配使用 SQL 函數與 Amazon Managed Service for Apache Flink、Amazon Translate 和 Amazon Comprehend 來翻譯、修訂和分析串流資料](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/)。

若要使用主控台將 UDF JAR 檔案新增至您的 Studio 筆記本，請依照下列步驟執行：

1. 將 UDF JAR 檔案上傳至 Amazon S3。

1. 在 中 AWS 管理主控台，選擇建立 Studio 筆記本的**自訂建立**選項。

1. 遵循 Studio 筆記本建立工作流程，直到進入**組態**步驟。

1. 在**使用者定義的函數**區段中，選擇**新增使用者定義的函數**。

1. 指定 JAR 檔案的 Amazon S3 位置，或是具有 UDF 實作的 ZIP 檔案。

1. 選擇**儲存變更**。

若要在使用 [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) API 建立新的 Studio 筆記本時新增 UDF JAR，請在 `CustomArtifactConfiguration` 資料類型中指定 JAR 位置。若要將 UDF JAR 新增至現有的 Studio 筆記本，請調用 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) API 作業，並在 `CustomArtifactsConfigurationUpdate` 資料類型中指定 JAR 位置。或者，您可以使用 AWS 管理主控台 將 UDF JAR 檔案新增至 Studio 筆記本。

## 使用使用者定義函數的考量
<a name="how-zeppelin-udf-considerations"></a>
+ Managed Service for Apache Flink Studio 使用 [Apache Zeppelin 術語](https://zeppelin.apache.org/docs/0.9.0/quickstart/explore_ui.html)，其中筆記本是指一個 Zeppelin 執行個體，可以包含多條筆記。然後，每條筆記可以包含多個段落。借助 Managed Service for Apache Flink Studio，解譯器過程在筆記本中的所有筆記間共用。因此，如果您在一條筆記中使用 [createTemporarySystemFunction](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableEnvironment.html#createTemporarySystemFunction-java.lang.String-java.lang.Class-) 執行明確的函數註冊，則可以在同一筆記本的另一條筆記中按原樣引用相同的函數註冊。

  然而，*部署為應用程式*作業只適用於*個別*筆記，而不是筆記本中的所有筆記。當您執行部署為應用程式時，只會使用作用中筆記的內容來產生應用程式。在其他筆記本中執行的任何明確函數註冊都不屬於產生的應用程式相依性。此外，在使用部署為應用程式選項期間，會透過將 JAR 的主類別名稱轉換為小寫字串，進行隱含函數註冊。

   例如，如果 `TextAnalyticsUDF` 是 UDF JAR 的主類別，則隱含註冊將產生函數名稱 `textanalyticsudf`。因此，如果 Studio 的筆記 1 中的明確函數註冊如下所示發生，那麼因為共用解譯器，該筆記本中的所有其他筆記 (例如筆記 2) 均可透過名稱 `myNewFuncNameForClass` 引用該函數：

  `stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())`

   但是，在對筆記 2 執行部署為應用程式操作期間，此明確註冊*將不包含*在相依性中，因此已部署的應用程式將無法按預期執行。由於隱含註冊，依預設，對此函數的所有引用都應帶有 `textanalyticsudf` 而不是 `myNewFuncNameForClass`。

   如果需要進行自訂函數名稱註冊，則筆記 2 本身預計將包含另一個段落來執行另一個明確註冊，如下所示：

  ```
  %flink(parallelism=l)
  import com.amazonaws.kinesis.udf.textanalytics.TextAnalyticsUDF 
  # re-register the JAR for UDF with custom name
  stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())
  ```

  ```
  %flink. ssql(type=update, parallelism=1) 
  INSERT INTO
      table2
  SELECT
      myNewFuncNameForClass(column_name)
  FROM
      table1
  ;
  ```
+ 如果 UDF JAR 包含 Flink SDK，請設定您的 Java 專案，以便 UDF 來源程式碼可以針對 Flink SDK 進行編譯，但 Flink SDK 類別本身不包含在建置成品中，例如 JAR。

  您可以使用 Apache Maven 中的 `provided` 範圍、Gradle 中的 `compileOnly` 相依性宣告、SBT 中的 `provided` 範圍或 UDF 專案建置組態中的等效指令。您可以參閱 UDF jar 範例中的此 [pom](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47)，該範例符合 maven 專案上的這種先決條件。如需完整的逐步教學課程，請參閱[搭配使用 SQL 函數與 Amazon Managed Service for Apache Flink、Amazon Translate 和 Amazon Comprehend 來翻譯、修訂和分析串流資料](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/)。

# 啟用檢查點
<a name="how-zeppelin-checkpoint"></a>

您可以使用環境設定來啟用檢查點。如需檢查點的相關資訊，請參閱 [Managed Service for Apache Flink 開發人員指南](https://docs.aws.amazon.com/managed-flink/latest/java/)中的[容錯](https://docs.aws.amazon.com/managed-flink/latest/java/how-fault.html)。

## 設定檢查點間隔
<a name="how-zeppelin-checkpoint-interval"></a>

以下 Scala 程式碼範例將應用程式的檢查點間隔設定為 1 分鐘：

```
// start a checkpoint every 1 minute
stenv.enableCheckpointing(60000)
```

以下 Python 程式碼範例將應用程式的檢查點間隔設定為 1 分鐘：

```
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.interval", "1min"
)
```

## 設定檢查點類型
<a name="how-zeppelin-checkpoint-type"></a>

以下 Scala 程式碼範例將應用程式的檢查點模式設定為 `EXACTLY_ONCE` (預設值)：

```
// set mode to exactly-once (this is the default)
stenv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
```

以下 Python 程式碼範例將應用程式的檢查點模式設定為 `EXACTLY_ONCE` (預設值)：

```
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.mode", "EXACTLY_ONCE"
)
```

# 升級 Studio 執行期
<a name="upgrading-studio-runtime"></a>

本節包含如何升級 Studio 筆記本執行期的相關資訊。我們建議您一律升級至最新支援的 Studio 執行期。

## 將您的筆記本升級至新的 Studio 執行期
<a name="upgrading-notebook"></a>

視您使用 Studio 的方式而定，升級執行期的步驟會有所不同。選取符合您使用案例的選項。

### 沒有外部相依性的 SQL 查詢或 Python 程式碼
<a name="notebook-no-dependencies"></a>

如果您使用的 SQL 或 Python 沒有任何外部相依性，請使用下列執行期升級程序。我們建議您升級至最新的 Runtime 版本。升級程序與您要升級的執行期版本相同，無延遲。

1. 使用最新的執行期建立新的 Studio 筆記本。

1. 將每個備註的程式碼從舊筆記本複製並貼到新筆記本。

1. 在新的筆記本中，調整程式碼，使其與先前版本變更的任何 Apache Flink 功能相容。
   + 執行新的筆記本。開啟筆記本並依備註依序執行，並測試它是否有效。
   + 對程式碼進行任何必要的變更。
   + 停止新的筆記本。

1. 如果您已將舊筆記本部署為應用程式：
   + 將新的筆記本部署為獨立的新應用程式。
   + 停止舊的應用程式。
   + 在沒有快照的情況下執行新應用程式。

1. 如果舊筆記本正在執行，請將其停止。視需要啟動新的筆記本以供互動式使用。

**在沒有外部相依性的情況下升級的處理流程**

![\[下圖代表在沒有外部相依性的情況下升級筆記本的建議工作流程。\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/images/MSF-Studio-upgrade-without-dependencies.png)


### 具有外部相依性的 SQL 查詢或 Python 程式碼
<a name="notebook-dependencies"></a>

如果您使用 SQL 或 Python，並使用連接器或自訂成品等外部相依性，例如在 Python 或 Java 中實作的使用者定義函數，請遵循此程序。我們建議您升級至最新的執行期。無論您要從哪個執行期版本升級，此程序都相同。

1. 使用最新的執行期建立新的 Studio 筆記本。

1. 將每個備註的程式碼從舊筆記本複製並貼到新筆記本。

1. 更新外部相依性和自訂成品。
   + 尋找與新執行時間的 Apache Flink 版本相容的新連接器。請參閱 Apache Flink 文件中的[資料表和 SQL 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/overview/)，以尋找 Flink 版本的正確連接器。
   + 更新使用者定義函數的程式碼，以符合 Apache Flink API 中的變更，以及使用者定義函數使用的任何 Python 或 JAR 相依性。重新封裝更新後的自訂成品。
   + 將這些新的連接器和成品新增至新的筆記本。

1. 在新的筆記本中，調整程式碼，使其與先前版本變更的任何 Apache Flink 功能相容。
   + 執行新的筆記本。開啟筆記本並依備註依序執行，並測試它是否有效。
   + 對程式碼進行任何必要的變更。
   + 停止新的筆記本。

1. 如果您已將舊筆記本部署為應用程式：
   + 將新的筆記本部署為獨立的新應用程式。
   + 停止舊的應用程式。
   + 在沒有快照的情況下執行新應用程式。

1. 如果舊筆記本正在執行，請將其停止。視需要啟動新的筆記本以供互動式使用。

**使用外部相依性進行升級的程序流程**

![\[下圖代表使用外部相依性升級筆記本的建議工作流程。\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/images/MSF-Studio-upgrade-with-dependencies.png)


# 使用 AWS Glue
<a name="how-zeppelin-glue"></a>

您的 Studio 筆記本會儲存並取得其資料來源和目的地的相關資訊 AWS Glue。建立 Studio 筆記本時，您可以指定包含連線資訊的 AWS Glue 資料庫。當您存取資料來源和目的地時，您可以指定資料庫中包含的 AWS Glue 資料表。您的 AWS Glue 資料表可讓您存取定義資料來源和目的地位置、結構描述和參數的 AWS Glue 連線。

Studio 筆記本使用資料表屬性來儲存應用程式特定的資料。如需詳細資訊，請參閱[資料表屬性](how-zeppelin-glue-properties.md)。

如需如何設定 AWS Glue 連線、資料庫和資料表以搭配 Studio 筆記本使用的範例，請參閱 [教學課程：在 Managed Service for Apache Flink 中建立 Studio 筆記本](example-notebook.md)教學[建立 AWS Glue 資料庫](example-notebook.md#example-notebook-glue)課程中的 。

# 資料表屬性
<a name="how-zeppelin-glue-properties"></a>

除了資料欄位，您的 AWS Glue 資料表會使用資料表屬性，將其他資訊提供給 Studio 筆記本。Managed Service for Apache Flink 使用以下 AWS Glue 資料表屬性：
+ [定義 Apache Flink 時間值](#how-zeppelin-glue-timestamp)：這些屬性定義 Managed Service for Apache Flink 如何發出 Apache Flink 內部資料處理時間值。
+ [使用 Flink 連接器和格式屬性](#how-zeppelin-glue-connector)：這些屬性提供資料串流的相關資訊。

若要將 屬性新增至 AWS Glue 資料表，請執行下列動作：

1. 登入 AWS 管理主控台 ，並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) 開啟 AWS Glue 主控台。

1. 從資料表清單中，選擇應用程式用於儲存其資料連線資訊的資料表。依序選擇**動作**和**編輯資料表詳細資訊**。

1. 在**資料表屬性**下，為**索引鍵**輸入 **managed-flink.proctime**，為**值**輸入 **user\$1action\$1time**。

## 定義 Apache Flink 時間值
<a name="how-zeppelin-glue-timestamp"></a>

Apache Flink 提供描述何時發生串流處理事件的時間值，例如[處理時間](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#processing-time)和[事件時間](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#event-time)。若要將這些值包含在應用程式輸出中，請在 AWS Glue 資料表上定義屬性，指示 Managed Service for Apache Flink 執行時間將這些值發出到指定的欄位中。

您在資料表屬性中使用的索引鍵和值如下所示：


| Timestamp 類型 | 金鑰 | 值 | 
| --- |--- |--- |
| [ 處理時間](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#processing-time) | managed-flink.proctime | The column name that AWS Glue will use to expose the value. This column name does not correspond to an existing table column. | 
| [ 事件時間](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#event-time) | managed-flink.rowtime | The column name that AWS Glue will use to expose the value. This column name corresponds to an existing table column. | 
| managed-flink.watermark.*column\$1name*.milliseconds | The watermark interval in milliseconds | 

## 使用 Flink 連接器和格式屬性
<a name="how-zeppelin-glue-connector"></a>

您可以使用 AWS Glue 資料表屬性向應用程式的 Flink 連接器提供資料來源的相關資訊。Managed Service for Apache Flink 用於連接器的一些屬性範例如下：


| 連接器類型 | 金鑰 | 值 | 
| --- |--- |--- |
| [ Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/kafka.html#connector-options) | 格式 | The format used to deserialize and serialize Kafka messages, e.g. json or csv. | 
| scan.startup.mode | The startup mode for the Kafka consumer, e.g. earliest-offset or timestamp. | 
| [ Kinesis](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kinesis.html#connector-options) | 格式 | The format used to deserialize and serialize Kinesis data stream records, e.g. json or csv. | 
| aws.region | The AWS region where the stream is defined.  | 
| [ S3 (檔案系統)](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html) | format | The format used to deserialize and serialize files, e.g. json or csv. | 
| 路徑 | The Amazon S3 path, e.g. s3://mybucket/. | 

如需 Kinesis 和 Apache Kafka 以外的其他連接器的相關資訊，請參閱連接器的文件。

# Managed Service for Apache Flink 中 Studio 筆記本的範例和教學課程
<a name="how-zeppelin-examples"></a>

**Topics**
+ [教學課程：在 Managed Service for Apache Flink 中建立 Studio 筆記本](example-notebook.md)
+ [教學課程：將 Studio 筆記本部署為具有持久狀態的 Managed Service for Apache Flink 應用程式](example-notebook-deploy.md)
+ [檢視 Studio 筆記本中分析資料的範例查詢](how-zeppelin-sql-examples.md)

# 教學課程：在 Managed Service for Apache Flink 中建立 Studio 筆記本
<a name="example-notebook"></a>

下列教學課程示範如何建立 Studio 筆記本，從 Kinesis 資料串流或 Amazon MSK 叢集讀取資料。

**Topics**
+ [完成先決條件](#example-notebook-setup)
+ [建立 AWS Glue 資料庫](#example-notebook-glue)
+ [後續步驟：使用 Kinesis Data Streams 或 Amazon MSK 建立 Studio 筆記本](#examples-notebook-nextsteps)
+ [使用 Kinesis Data Streams 建立 Studio 筆記本](example-notebook-streams.md)
+ [使用 Amazon MSK 建立 Studio 筆記本](example-notebook-msk.md)
+ [清除您的應用程式和相依資源](example-notebook-cleanup.md)

## 完成先決條件
<a name="example-notebook-setup"></a>

請確定您的 AWS CLI 是第 2 版或更新版本。若要安裝最新的 AWS CLI，請參閱[安裝、更新和解除安裝第 2 AWS CLI 版](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html)。

## 建立 AWS Glue 資料庫
<a name="example-notebook-glue"></a>

您的 Studio 筆記本使用 [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) 資料庫取得有關 Amazon MSK 資料來源的中繼資料。

**建立 AWS Glue 資料庫**

1. 在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) 開啟 AWS Glue 主控台。

1. 選擇**新增資料庫**。在**新增資料庫**視窗中，為**資料庫名稱**輸入 **default**。選擇**建立**。

## 後續步驟：使用 Kinesis Data Streams 或 Amazon MSK 建立 Studio 筆記本
<a name="examples-notebook-nextsteps"></a>

借助本教學課程，您可以建立使用 Kinesis Data Streams 或 Amazon MSK 的 Studio 筆記本：
+ [使用 Kinesis Data Streams 建立 Studio 筆記本](example-notebook-streams.md)：使用 Kinesis Data Streams，您可以快速建立使用 Kinesis 資料串流作為來源的應用程式。您只需要將 Kinesis 資料串流建立為相依資源。
+ [使用 Amazon MSK 建立 Studio 筆記本](example-notebook-msk.md)：使用 Amazon MSK，您可以建立使用 Amazon MSK 叢集做為來源的應用程式。您需要建立一個 Amazon VPC、一個 Amazon EC2 用戶端執行個體和一個 Amazon MSK 叢集作為相依資源。

# 使用 Kinesis Data Streams 建立 Studio 筆記本
<a name="example-notebook-streams"></a>

本教學課程說明如何建立使用 Kinesis 資料串流作為來源的 Studio 筆記本。

**Topics**
+ [完成先決條件](#example-notebook-streams-setup)
+ [建立 AWS Glue 資料表](#example-notebook-streams-glue)
+ [使用 Kinesis Data Streams 建立 Studio 筆記本](#example-notebook-streams-create)
+ [將資料傳送至 Kinesis 資料串流](#example-notebook-streams-send)
+ [測試 Studio 筆記本](#example-notebook-streams-test)

## 完成先決條件
<a name="example-notebook-streams-setup"></a>

建立 Studio 筆記本之前，請先建立 Kinesis 資料串流 (`ExampleInputStream`)。您的應用程式使用此串流作為應用程式來源。

您可以使用 Amazon Kinesis 主控台或以下 AWS CLI 命令來建立該串流。如需主控台指示，請參閱《Amazon Kinesis Data Streams 開發人員指南》中的[建立和更新資料串流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)**。將該串流命名為 **ExampleInputStream**，並將**開啟的碎片數量**設定為 **1**。

若要使用 建立串流 (`ExampleInputStream`) AWS CLI，請使用下列 Amazon Kinesis `create-stream` AWS CLI 命令。

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## 建立 AWS Glue 資料表
<a name="example-notebook-streams-glue"></a>

您的 Studio 筆記本使用 [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) 資料庫取得有關 Kinesis Data Streams 資料來源的中繼資料。

**注意**  
您可以先手動建立資料庫，也可以讓 Managed Service for Apache Flink 在您建立筆記本時為您建立資料庫。同樣，您可以依照本節所述手動建立資料表，也可以在 Apache Zeppelin 的筆記本中，使用針對 Managed Service for Apache Flink 的建立資料表連接器程式碼，透過 DDL 陳述式建立資料表。然後，您可以簽入 AWS Glue ，以確保正確建立資料表。

**建立資料表**

1. 登入 AWS 管理主控台 ，並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) 開啟 AWS Glue 主控台。

1. 如果您還沒有 AWS Glue 資料庫，請從左側導覽列中選擇**資料庫**。選擇**新增資料庫**。在**新增資料庫**視窗中，為**資料庫名稱**輸入 **default**。選擇 **Create** (建立)。

1. 在左側導覽列中，選擇**資料表**。在**資料表**頁面，選擇**新增資料表** > **手動新增資料表**。

1. 在**設定資料表**頁面，為**資料表名稱**輸入 **stock**。請務必選取先前建立的資料庫。選擇 **Next (下一步)**。

1. 在**新增資料存放區**頁面，選擇 **Kinesis**。對於**串流名稱**，輸入 **ExampleInputStream**。針對 **Kinesis 來源 URL**，請選擇輸入 **https://kinesis.us-east-1.amazonaws.com**。如果您複製並貼上 **Kinesis 來源 URL**，請務必刪除任何前置或尾端空格。選擇 **Next (下一步)**。

1. 在**分類**頁面，選擇 **JSON**。選擇 **Next (下一步)**。

1. 在**定義結構描述**頁面，選擇「新增資料欄」以新增資料欄。新增具有下列屬性的欄：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/example-notebook-streams.html)

   選擇 **Next (下一步)**。

1. 在下一頁上，確認您的設定，然後選擇**完成**。

1. 從資料表清單中選取您新建立的資料表。

1. 選擇**編輯資料表**，然後新增索引鍵為 `managed-flink.proctime` 值為 `proctime` 的屬性。

1. 選擇**套用**。

## 使用 Kinesis Data Streams 建立 Studio 筆記本
<a name="example-notebook-streams-create"></a>

現在，您已建立應用程式使用的資源，接下來可以建立您的 Studio 筆記本。

**Topics**
+ [使用 建立 Studio 筆記本 AWS 管理主控台](#example-notebook-create-streams-console)
+ [使用 建立 Studio 筆記本 AWS CLI](#example-notebook-msk-create-api)

### 使用 建立 Studio 筆記本 AWS 管理主控台
<a name="example-notebook-create-streams-console"></a>

1. 前往 [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) 開啟 Managed Service in the Apache Flink 主控台。

1. 在 **Managed Service for Apache Flink 應用程式**頁面，選擇 **Studio** 標籤。選擇**建立 Studio 筆記本**。
**注意**  
您也可以藉由選取輸入 Amazon MSK 叢集或 Kinesis 資料串流，然後選擇**即時處理資料**，從 Amazon MSK 或 Kinesis Data Streams 主控台建立 Studio 筆記本。

1. 在**建立 Studio 筆記本**頁面，提供下列資訊：
   + 為筆記本名稱輸入 **MyNotebook**。
   + 為 **AWS Glue 資料庫**選擇**預設值**。

   選擇**建立 Studio 筆記本**。

1. 在 **MyNotebook** 頁面，選擇**執行**。等待**狀態**顯示為**執行中**。筆記本執行時需支付費用。

### 使用 建立 Studio 筆記本 AWS CLI
<a name="example-notebook-msk-create-api"></a>

若要使用 建立 Studio 筆記本 AWS CLI，請執行下列動作：

1. 驗證您的帳戶 ID。您需要此值來建立應用程式。

1. 建立角色 `arn:aws:iam::AccountID:role/ZeppelinRole`，並將下列許可新增至主控台自動建立的角色。

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. 建立稱為 `create.json` 的檔案，其中具有以下內容。使用您的資訊取代預留位置的值。

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 若要建立應用程式，請執行下列命令：

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 命令完成後，您應該會看到類似如下的輸出，其中顯示新 Studio 筆記本的詳細資料：以下為輸出範例。

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 若要執行應用程式，請執行下列命令：使用您的帳戶 ID 取代範例值。

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## 將資料傳送至 Kinesis 資料串流
<a name="example-notebook-streams-send"></a>

若要將測試資料傳送至 Kinesis 資料串流，請執行下列動作：

1. 開啟 [Kinesis 資料產生器](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)。

1. 選擇使用 **CloudFormation 建立 Cognito 使用者**。

1.  CloudFormation 主控台會開啟 Kinesis Data Generator 範本。選擇 **Next (下一步)**。

1. 在**指定堆疊詳細資訊**頁面，輸入 Cognito 使用者的使用者名稱和密碼。選擇 **Next (下一步)**。

1. 在**設定堆疊選項**頁面，選擇**下一步**。

1. 在**檢閱 Kinesis-Data-Generator-Cognito-User** 頁面中，選擇**我確認 AWS CloudFormation 可能會建立 IAM 資源。**核取方塊。選擇 **Create Stack** (建立堆疊)。

1. 等待 CloudFormation 堆疊完成建立。堆疊完成後，在 CloudFormation 主控台中開啟 **Kinesis-Data-Generator-Cognito-User** 堆疊，然後選擇**輸出**索引標籤。開啟針對 **KinesisDataGeneratorUrl** 輸出值所列出的 URL。

1. 在 **Amazon Kinesis 資料產生器**頁面，使用您在步驟 4 中建立的憑證登入。

1. 在下一頁面，提供下列值：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/example-notebook-streams.html)

   為**記錄範本**貼上下列程式碼：

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. 選擇**傳送資料**。

1. 產生器會將資料傳送至 Kinesis 資料串流。

   完成下一節時，讓產生器保持執行狀態。

## 測試 Studio 筆記本
<a name="example-notebook-streams-test"></a>

在本節中，您可以使用 Studio 筆記本查詢 Kinesis 資料串流中的資料。

1. 前往 [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) 開啟 Managed Service in the Apache Flink 主控台。

1. 在 **Managed Service for Apache Flink 應用程式**頁面，選擇 **Studio 筆記本**標籤。選擇 **MyNotebook**。

1. 在 **MyNotebook** 頁面，選擇**在 Apache Zeppelin 中開啟**。

   Apache Zeppelin 介面會在新標籤中開啟。

1. 在**歡迎來到 Zeppelin\$1** 頁面，選擇 **Zeppelin 筆記**。

1. 在 **Zeppelin 筆記**頁面，在新筆記中輸入以下查詢：

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   選擇執行圖示。

   一小段時間後，筆記會顯示 Kinesis 資料串流中的資料。

若要為應用程式開啟 Apache Flink 儀表板以檢視操作層面，請選擇 **FLINK 作業**。如需 Flink 儀表板的詳細資訊，請參閱 [Managed Service for Apache Flink 開發人員指南](https://docs.aws.amazon.com/)中的 [Apache Flink 儀表板](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)。

如需 Flink 串流 SQL 查詢的更多範例，請參閱 [Apache Flink 文件](https://nightlies.apache.org/flink/flink-docs-release-1.15/)中的[查詢](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

# 使用 Amazon MSK 建立 Studio 筆記本
<a name="example-notebook-msk"></a>

本教學課程說明如何建立使用 Amazon MSK 叢集作為來源的 Studio 筆記本。

**Topics**
+ [設定 Amazon MSK 叢集](#example-notebook-msk-setup)
+ [將 NAT 閘道新增至 VPC](#example-notebook-msk-nat)
+ [建立 AWS Glue 連線和資料表](#example-notebook-msk-glue)
+ [使用 Amazon MSK 建立 Studio 筆記本](#example-notebook-msk-create)
+ [將資料傳送至 Amazon MSK 叢集](#example-notebook-msk-send)
+ [測試 Studio 筆記本](#example-notebook-msk-test)

## 設定 Amazon MSK 叢集
<a name="example-notebook-msk-setup"></a>

在本教學課程中，您需要一個允許純文字存取的 Amazon MSK 叢集。如果尚未設定 Amazon MSK 叢集，請依照 [Amazon MSK 使用入門](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)教學課程來建立 Amazon VPC、Amazon MSK 叢集、主題和 Amazon EC2 用戶端執行個體。

跟隨教學課程學習時，請執行下列動作：
+ 在[步驟 3：建立 Amazon MSK 叢集](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html)的步驟 4 中，將 `ClientBroker` 值從 `TLS` 變更為 **PLAINTEXT**。

## 將 NAT 閘道新增至 VPC
<a name="example-notebook-msk-nat"></a>

如果依照 [Amazon MSK 使用入門](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)教學課程建立 Amazon MSK 叢集，或者您現有的 Amazon VPC 還沒有適用於其私有子網路的 NAT 閘道，則必須將 NAT 閘道新增到 Amazon VPC。下圖顯示一般架構。

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/images/vpc_05.png)


若要為您的 Amazon VPC 建立 NAT 閘道，請執行下列動作：

1. 前往 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 開啟 Amazon VPC 主控台。

1. 從左側導覽列選擇 **NAT 閘道**。

1. 在 **NAT 閘道**頁面，選擇**建立 NAT 閘道**。

1. 在**建立 NAT 閘道**頁面，提供下列值：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/example-notebook-msk.html)

   選擇**建立 NAT 閘道**。

1. 在導覽列中，選擇**路由表**。

1. 選擇**建立路由表**。

1. 在**建立路由表**頁面，提供以下資訊：
   + **名稱標籤**：**ZeppelinRouteTable**
   + **VPC****：選擇 VPC (例如AWS KafkaTutorialVPC)**。

   選擇**建立**。

1. 在路由表清單中，選擇 **ZeppelinRouteTable**。選擇**路由**標籤，然後選擇**編輯路由**。

1. 在**編輯路由**標籤中，選擇**新增路由**。

1. 在 **** 中，為**目標**輸入 **0.0.0.0/0**。為**目標**選擇 **NAT 閘道**、**ZeppelinGateway**。選擇**儲存路由**。選擇**關閉**。

1. 在「路由表」頁面，已選取 **ZeppelinRouteTable** 時，選擇**子網路關聯**標籤。選擇**編輯子網路關聯**。

1. 在**編輯子網路關聯**頁面，選擇 **AWS KafkaTutorialSubnet2** 和 **AWS KafkaTutorialSubnet3**。選擇 **Save** (儲存)。

## 建立 AWS Glue 連線和資料表
<a name="example-notebook-msk-glue"></a>

您的 Studio 筆記本使用 [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) 資料庫取得有關 Amazon MSK 資料來源的中繼資料。在本節中，您會建立 AWS Glue 連線，說明如何存取 Amazon MSK 叢集，以及說明如何將資料來源中的資料呈現給 Studio 筆記本等用戶端的 AWS Glue 資料表。

**建立連線**

1. 登入 AWS 管理主控台 ，並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) 開啟 AWS Glue 主控台。

1. 如果您還沒有 AWS Glue 資料庫，請從左側導覽列中選擇**資料庫**。選擇**新增資料庫**。在**新增資料庫**視窗中，為**資料庫名稱**輸入 **default**。選擇 **Create** (建立)。

1. 從左側導覽列選擇**連線**。選擇**新增連線**。

1. 在**新增連線**視窗中，提供下列值：
   + 對於**連線名稱**，請輸入 **ZeppelinConnection**。
   + 對於**連線類型**，請選擇 **Kafka**。
   + 對於 **Kafka 啟動伺服器 URL**，請為叢集提供啟動代理程式字串。您可以從 MSK 主控台或輸入下列 CLI 命令來取得啟動代理程式：

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + 取消核取**需要 SSL 連線**核取方塊。

   選擇**下一步**。

1. 在 **VPC** 頁面，提供下列值：
   + 針對 **VPC**，選擇 VPC 的名稱 （例如 ** AWS KafkaTutorialVPC**)。
   + 對於**子網路**，請選擇 **AWS KafkaTutorialSubnet2**。
   + 對於**安全群組**，請選擇所有可用的群組。

   選擇 **Next (下一步)**。

1. 在**連線屬性** / **連線存取權**頁面，選擇**完成**。

**建立資料表**
**注意**  
您可以依照下列步驟所述手動建立資料表，也可以在 Apache Zeppelin 的筆記本中，使用針對 Managed Service for Apache Flink 的建立資料表連接器程式碼，透過 DDL 陳述式建立資料表。然後，您可以簽入 AWS Glue ，以確保正確建立資料表。

1. 在左側導覽列中，選擇**資料表**。在**資料表**頁面，選擇**新增資料表** > **手動新增資料表**。

1. 在**設定資料表**頁面，為**資料表名稱**輸入 **stock**。請務必選取先前建立的資料庫。選擇 **Next (下一步)**。

1. 在**新增資料存放區**頁面，選擇 **Kafka**。對於**主題名稱**，請輸入您的主題名稱 (例如 **AWS KafkaTutorialTopic**)。針對**連線**，選擇 **ZeppelinConnection**。

1. 在**分類**頁面，選擇 **JSON**。選擇 **Next (下一步)**。

1. 在**定義結構描述**頁面，選擇「新增資料欄」以新增資料欄。新增具有下列屬性的欄：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/example-notebook-msk.html)

   選擇 **Next (下一步)**。

1. 在下一頁上，確認您的設定，然後選擇**完成**。

1. 從資料表清單中選取您新建立的資料表。

1. 選擇**編輯資料表**並新增下列屬性：
   + 金鑰：`managed-flink.proctime`，值： `proctime`
   + 金鑰：`flink.properties.group.id`，值： `test-consumer-group`
   + 金鑰：`flink.properties.auto.offset.reset`，值： `latest`
   + 金鑰：`classification`，值： `json`

   如果沒有這些鍵/值對，Flink 筆記本會執行為錯誤。

1. 選擇**套用**。

## 使用 Amazon MSK 建立 Studio 筆記本
<a name="example-notebook-msk-create"></a>

現在，您已建立應用程式使用的資源，接下來可以建立您的 Studio 筆記本。

**Topics**
+ [使用 建立 Studio 筆記本 AWS 管理主控台](#example-notebook-create-msk-console)
+ [使用 建立 Studio 筆記本 AWS CLI](#example-notebook-msk-create-api)

**注意**  
您也可以選擇現有叢集，然後選擇**即時處理資料**，從 Amazon MSK 主控台建立 Studio 筆記本。

### 使用 建立 Studio 筆記本 AWS 管理主控台
<a name="example-notebook-create-msk-console"></a>

1. 前往 [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) 開啟 Managed Service in the Apache Flink 主控台。

1. 在 **Managed Service for Apache Flink 應用程式**頁面，選擇 **Studio** 標籤。選擇**建立 Studio 筆記本**。
**注意**  
若要從 Amazon MSK 或 Kinesis Data Streams 主控台建立 Studio 筆記本，請選取您的輸入 Amazon MSK 叢集或 Kinesis 資料串流，然後選擇**即時處理資料**。

1. 在**建立 Studio 筆記本**頁面，提供下列資訊：
   + 為 **Studio 筆記本名稱**輸入 **MyNotebook**。
   + 為 **AWS Glue 資料庫**選擇**預設值**。

   選擇**建立 Studio 筆記本**。

1. 在 **MyNotebook** 頁面，選擇**組態**標籤。在**網路模式**區段中，選擇**編輯**。

1. 在**編輯 MyNotebook 聯網**頁面，選擇**以 Amazon MSK 叢集為基礎的 VPC 組態**。為 **Amazon MSK 叢集**選擇 Amazon MSK 叢集。選擇**儲存變更**。

1. 在 **MyNotebook** 頁面，選擇**執行**。等待**狀態**顯示為**執行中**。

### 使用 建立 Studio 筆記本 AWS CLI
<a name="example-notebook-msk-create-api"></a>

若要使用 建立 Studio 筆記本 AWS CLI，請執行下列動作：

1. 請務必備妥下列資訊：您需要這些值來建立應用程式。
   + 帳戶 ID。
   + 子網路 ID 以及包含 Amazon MSK 叢集的 Amazon VPC 的安全群組 ID。

1. 建立稱為 `create.json` 的檔案，其中具有以下內容。使用您的資訊取代預留位置的值。

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 若要建立應用程式，請執行下列命令：

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 命令完成後，您應該會看到類似如下的輸出，其中顯示新 Studio 筆記本的詳細資料：

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 若要執行應用程式，請執行下列命令：使用您的帳戶 ID 取代範例值。

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## 將資料傳送至 Amazon MSK 叢集
<a name="example-notebook-msk-send"></a>

在本節中，您會在 Amazon EC2 用戶端中執行 Python 指令碼，以將資料傳送到您的 Amazon MSK 資料來源。

1. 連線到 Amazon EC2 用戶端。

1. 執行以下命令來安裝 Python 版本 3、Pip 和 Kafka for Python 套件，並確認操作：

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. 輸入下列命令，在用戶端機器 AWS CLI 上設定 ：

   ```
   aws configure
   ```

   提供帳戶憑證，並為 `region` 提供 **us-east-1**。

1. 建立稱為 `stock.py` 的檔案，其中具有以下內容。使用 Amazon MSK 叢集的啟動代理程式字串取代範例值，如果您的主題不是 **AWS KafkaTutorialTopic**，請更新主題名稱：

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. 使用下列命令執行指令碼：

   ```
   $ python3 stock.py
   ```

1. 完成下一節時，讓指令碼保持執行狀態。

## 測試 Studio 筆記本
<a name="example-notebook-msk-test"></a>

在本節中，您可以使用 Studio 筆記本查詢 Amazon MSK 叢集中的資料。

1. 前往 [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) 開啟 Managed Service in the Apache Flink 主控台。

1. 在 **Managed Service for Apache Flink 應用程式**頁面，選擇 **Studio 筆記本**標籤。選擇 **MyNotebook**。

1. 在 **MyNotebook** 頁面，選擇**在 Apache Zeppelin 中開啟**。

   Apache Zeppelin 介面會在新標籤中開啟。

1. 在**歡迎來到 Zeppelin\$1** 頁面，選擇 **Zeppelin 新筆記**。

1. 在 **Zeppelin 筆記**頁面，在新筆記中輸入以下查詢：

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   選擇執行圖示。

   應用程式會顯示 Amazon MSK 叢集中的資料。

若要為應用程式開啟 Apache Flink 儀表板以檢視操作層面，請選擇 **FLINK 作業**。如需 Flink 儀表板的詳細資訊，請參閱 [Managed Service for Apache Flink 開發人員指南](https://docs.aws.amazon.com/)中的 [Apache Flink 儀表板](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)。

如需 Flink 串流 SQL 查詢的更多範例，請參閱 [Apache Flink 文件](https://nightlies.apache.org/flink/flink-docs-release-1.15/)中的[查詢](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

# 清除您的應用程式和相依資源
<a name="example-notebook-cleanup"></a>

## 刪除 Studio 筆記本
<a name="example-notebook-cleanup-app"></a>

1. 開啟 Managed Service for Apache Flink 主控台。

1. 選擇 **MyNotebook**。

1. 依序選擇**動作**和**刪除**。

## 刪除您的 AWS Glue 資料庫和連線
<a name="example-notebook-cleanup-glue"></a>

1. 在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) 開啟 AWS Glue 主控台。

1. 從左側導覽列選擇**資料庫**。核取**預設**旁邊的核取方塊以選取它。依序選擇**動作**和**刪除資料庫**。確認您的選擇。

1. 從左側導覽列選擇**連線**。選中 **ZeppelinConnection** 旁邊的核取方塊以選取它。依序選擇**動作**和**刪除連線**。確認您的選擇。

## 刪除 IAM 角色和政策
<a name="example-notebook-msk-cleanup-iam"></a>

1. 前往 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 開啟 IAM 主控台。

1. 從左側導覽選單選擇**角色**。

1. 使用搜尋列搜尋 **ZeppelinRole** 角色。

1. 選擇 **ZeppelinRole** 角色。選擇**刪除角色**。確認刪除。

## 刪除 CloudWatch 日誌群組
<a name="example-notebook-cleanup-cw"></a>

使用主控台建立應用程式時，主控台會為您建立 CloudWatch 日誌群組和日誌串流。如果您已使用 AWS CLI建立應用程式，則沒有日誌群組和串流。

1. 透過 [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/) 開啟 CloudWatch 主控台。

1. 從左側導覽列選擇**日誌群組**。

1. 選擇 **/AWS/KinesisAnalytics/MyNotebook** 日誌群組。

1. 選擇**動作**、**刪除日誌群組**。確認刪除。

## 清除 Kinesis Data Streams 資源
<a name="example-notebook-cleanup-streams"></a>

若要刪除 Kinesis 串流，請開啟 Kinesis Data Streams 主控台，選取您的 Kinesis 串流，然後選擇**動作** > **刪除**。

## 清除 MSK 資源
<a name="example-notebook-cleanup-msk"></a>

如果您已在本教學課程中建立 Amazon MSK 叢集，請按照本節中的步驟進行操作。本節說明如何清理 Amazon EC2 用戶端執行個體、Amazon VPC 和 Amazon MSK 叢集。

### 刪除您的 Amazon MSK 叢集
<a name="example-notebook-msk-cleanup-msk"></a>

如果您已在本教學課程中建立 Amazon MSK 叢集，請按照以下步驟進行操作。

1. 開啟 Amazon MSK 主控台，網址為 [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)。

1. 選擇 **AWS KafkaTutorialCluster**。選擇 **刪除**。在出現的視窗中輸入 **delete**，然後確認選擇。

### 終止您的用戶端執行個體
<a name="example-notebook-msk-cleanup-client"></a>

如果您已在本教學課程中建立 Amazon EC2 用戶端執行個體，請按照以下步驟進行操作。

1. 前往 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 開啟 Amazon EC2 主控台。

1. 從左側導覽列中選擇**執行個體**。

1. 選擇 **ZeppelinClient** 旁邊的核取方塊以選取它。

1. 依序選擇**執行個體狀態**和**終止執行個體**。

### 刪除 Amazon VPC
<a name="example-notebook-msk-cleanup-vpc"></a>

如果您已在本教學課程中建立 Amazon VPC，請按照以下步驟進行操作。

1. 前往 [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) 開啟 Amazon EC2 主控台。

1. 從左側導覽列選擇**網路介面**。

1. 在搜尋列中輸入 VPC ID，然後按 Enter 鍵。

1. 選取資料表標題中的核取方塊，以選取所有顯示的網路介面。

1. 選擇 **Actions** (動作)、**Detach** (分離)。在出現的視窗中，選擇**強制分離**下方的**啟用**。選擇**分離**，然後等待所有網路介面都到達**可用**狀態。

1. 選取資料表標題中的核取方塊，以再次選取所有顯示的網路介面。

1. 選擇 **動作**、**刪除**。確認動作。

1. 在 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 開啟 Amazon VPC 主控台。

1. 選取 **AWS KafkaTutorialVPC**。依序選擇**動作**和**刪除 VPC**。輸入 **delete** 並確認刪除。

# 教學課程：將 Studio 筆記本部署為具有持久狀態的 Managed Service for Apache Flink 應用程式
<a name="example-notebook-deploy"></a>

下列教學課程示範如何將 Studio 筆記本部署為具有持久狀態的 Managed Service for Apache Flink 應用程式。

**Topics**
+ [完成事前準備](#example-notebook-durable-setup)
+ [使用 部署具有持久狀態的應用程式 AWS 管理主控台](#example-notebook-deploy-console)
+ [使用 部署具有持久狀態的應用程式 AWS CLI](#example-notebook-deploy-cli)

## 完成事前準備
<a name="example-notebook-durable-setup"></a>

按照[教學課程：在 Managed Service for Apache Flink 中建立 Studio 筆記本](example-notebook.md)建立新的 Studio 筆記本，使用 Kinesis Data Streams 或 Amazon MSK。命名 Studio 筆記本 `ExampleTestDeploy`。

## 使用 部署具有持久狀態的應用程式 AWS 管理主控台
<a name="example-notebook-deploy-console"></a>

1. 在主控台中的**應用程式程式碼位置 - *選用***下，新增您希望將封裝程式碼存放到的 S3 儲存貯體位置。這可讓步驟直接從筆記本部署和執行應用程式。

1. 將必要的許可新增至應用程式角色，以啟用您要用來讀取和寫入 Amazon S3 儲存貯體以及啟動 Managed Service for Apache Flink 應用程式的角色：
   + AmazonS3FullAccess
   + Amazonmanaged-flinkFullAccess
   + 視情況存取您的來源、目的地和 VPC。如需詳細資訊，請參閱[檢閱 Studio 筆記本的 IAM 許可](how-zeppelin-iam.md)。

1. 請參閱以下範例程式碼：

   ```
   %flink.ssql(type=update) 
   CREATE TABLE exampleoutput (
     'ticket' VARCHAR,
     'price' DOUBLE
   )
   WITH (
     'connector' = 'kinesis',
     'stream' = 'ExampleOutputStream',
     'aws.region' = 'us-east-1',
     'scan.stream.initpos' = 'LATEST',
     'format' = 'json'
   );
   
   INSERT INTO exampleoutput SELECT ticker, price FROM exampleinputstream
   ```

1. 啟動此功能後，您將在筆記本中每條筆記的右上角看到一個新的下拉式選單，其中包含筆記本的名稱。您可以執行下列動作：
   + 在 AWS 管理主控台中檢視 Studio 筆記本設定。
   + 建立 Zeppelin 筆記，並將其匯出至 Amazon S3。此時，請為您的應用程式提供名稱，然後選擇**建置和匯出**。匯出完成後，您會收到通知。
   + 如有需要，您可以在 Amazon S3 中的可執行檔上檢視和執行任何其他測試。
   + 建置完成後，您將能夠將程式碼部署為具有持久狀態和自動調度資源的 Kinesis 串流應用程式。
   + 使用下拉式選單並選擇**將 Zeppelin 筆記部署為 Kinesis 串流應用程式**。檢閱應用程式名稱，然後選擇**透過 AWS 主控台部署**。
   + 這將引導您前往建立 Managed Service for Apache Flink 應用程式的 AWS 管理主控台 頁面。請注意，已預先填入應用程式名稱、平行處理層級、程式碼位置、預設 Glue DB、VPC (如果適用) 和 IAM 角色。驗證 IAM 角色是否具有來源和目的地的必要許可。快照預設啟用，以進行持久的應用程式狀態管理。
   + 選擇**建立應用程式**。
   + 您可以選擇**設定**並修改任何設定，然後選擇**執行**以啟動串流應用程式。

## 使用 部署具有持久狀態的應用程式 AWS CLI
<a name="example-notebook-deploy-cli"></a>

若要使用 部署應用程式 AWS CLI，您必須更新 AWS CLI 以使用 Beta 2 資訊隨附的服務模型。如需如何使用更新的服務模型之相關資訊，請參閱 [完成先決條件完成事前準備](example-notebook.md#example-notebook-setup)。

以下範例程式碼會建立 Studio 筆記本：

```
aws kinesisanalyticsv2 create-application \
     --application-name <app-name> \
     --runtime-environment ZEPPELIN-FLINK-3_0 \
     --application-mode INTERACTIVE \
     --service-execution-role <iam-role>
     --application-configuration '{ 
       "ZeppelinApplicationConfiguration": { 
         "CatalogConfiguration": { 
           "GlueDataCatalogConfiguration": { 
             "DatabaseARN": "arn:aws:glue:us-east-1:<account>:database/<glue-database-name>" 
           } 
         } 
       },
       "FlinkApplicationConfiguration": {
         "ParallelismConfiguration": {
           "ConfigurationType": "CUSTOM",
           "Parallelism": 4,
           "ParallelismPerKPU": 4
         }
       },
       "DeployAsApplicationConfiguration": {
            "S3ContentLocation": { 
               "BucketARN": "arn:aws:s3:::<s3bucket>",
               "BasePath": "/something/"
            }
        },
       "VpcConfigurations": [
         {
           "SecurityGroupIds": [
             "<security-group>"
           ],
           "SubnetIds": [
             "<subnet-1>",
             "<subnet-2>"
           ]
         }
       ]
     }' \
     --region us-east-1
```

下列程式碼範例會啟動 Studio 筆記本：

```
aws kinesisanalyticsv2 start-application \
    --application-name <app-name> \
    --region us-east-1 \
    --no-verify-ssl
```

下列程式碼會傳回應用程式的 Apache Zeppelin 筆記本頁面的 URL：

```
aws kinesisanalyticsv2 create-application-presigned-url \
    --application-name <app-name> \
    --url-type ZEPPELIN_UI_URL \

    --region us-east-1 \
    --no-verify-ssl
```

# 檢視 Studio 筆記本中分析資料的範例查詢
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [使用 Amazon MSK/Apache Kafka 建立資料表](#how-zeppelin-examples-creating-tables)
+ [使用 Kinesis 建立資料表](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [查詢輪轉時段](#how-zeppelin-examples-tumbling)
+ [查詢滑動視窗](#how-zeppelin-examples-sliding)
+ [使用互動式 SQL](#how-zeppelin-examples-interactive-sql)
+ [使用 BlackHole SQL 連接器](#how-zeppelin-examples-blackhole-connector-sql)
+ [使用 Scala 產生範例資料](#notebook-example-data-generator)
+ [使用互動式 Scala](#notebook-example-interactive-scala)
+ [使用互動式 Python](#notebook-example-interactive-python)
+ [使用互動式 Python、SQL 和 Scala 的組合](#notebook-example-interactive-pythonsqlscala)
+ [使用跨帳戶 Kinesis 資料串流](#notebook-example-crossaccount-kds)

如需 Apache Flink SQL 查詢設定的相關資訊，請參閱[在 Zeppelin 筆記本上使用 Flink 進行互動式資料分析](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html)。

若要在 Apache Flink 儀表板中檢視應用程式，請在應用程式的 **Zeppelin 筆記**頁面中選擇 **FLINK 作業**。

如需視窗查詢的詳細資訊，請參閱 [Apache Flink 文件](https://nightlies.apache.org/flink/flink-docs-release-1.15/)中的[視窗](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html)。

如需 Apache Flink 串流 SQL 查詢的更多範例，請參閱 [Apache Flink 文件](https://nightlies.apache.org/flink/flink-docs-release-1.15/)中的[查詢](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

## 使用 Amazon MSK/Apache Kafka 建立資料表
<a name="how-zeppelin-examples-creating-tables"></a>

您可以將 Amazon MSK Flink 連接器與 Managed Service for Apache Flink Studio 搭配使用，以使用純文字、SSL 或 IAM 身分驗證來驗證您的連線。根據您的需求，使用特定屬性建立資料表。

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

您可以將這些與 [ Apache Kafka SQL 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/)的其他屬性結合使用。

## 使用 Kinesis 建立資料表
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

下列範例示範如何使用 Kinesis 建立資料表：

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

如需可使用的其他屬性的詳細資訊，請參閱 [Amazon Kinesis Data Streams SQL 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/)。

## 查詢輪轉時段
<a name="how-zeppelin-examples-tumbling"></a>

下列 Flink 串流 SQL 查詢會從 `ZeppelinTopic` 資料表中選取每個五秒鐘輪轉時段中的最高價格：

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## 查詢滑動視窗
<a name="how-zeppelin-examples-sliding"></a>

下列 Apache Flink 串流 SQL 查詢會從 `ZeppelinTopic` 資料表中選取每個五秒滑動視窗中的最高價格：

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## 使用互動式 SQL
<a name="how-zeppelin-examples-interactive-sql"></a>

此範例會列印事件時間和處理時間的最大值，以及索引鍵-值資料表中的值的總和。請確定您擁有 [使用 Scala 產生範例資料](#notebook-example-data-generator) 執行中的範例資料產生指令碼。若要在您的 Studio 筆記本中嘗試其他 SQL 查詢 (例如篩選和聯結)，請參閱《Apache Flink 文件》中的[查詢](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)。

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## 使用 BlackHole SQL 連接器
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

BlackHole SQL 連接器不需要您建立 Kinesis 資料串流或 Amazon MSK 叢集來測試查詢。如需 BlackHole SQL 連接器的相關資訊，請參閱《Apache Flink 文件》中的 [BlackHole SQL 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html)。在此範例中，預設目錄是記憶體內目錄。

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## 使用 Scala 產生範例資料
<a name="notebook-example-data-generator"></a>

此範例使用 Scala 生成範例資料。您可以使用此範例資料測試各種查詢。使用 create table 陳述式來建立索引鍵-值資料表。

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## 使用互動式 Scala
<a name="notebook-example-interactive-scala"></a>

這是 [使用互動式 SQL](#how-zeppelin-examples-interactive-sql) 的 Scala 翻譯。如需更多 Scala 範例，請參閱《Apache Flink 文件》中的[資料表 API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html)。

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## 使用互動式 Python
<a name="notebook-example-interactive-python"></a>

這是 [使用互動式 SQL](#how-zeppelin-examples-interactive-sql) 的 Python 翻譯。如需更多 Python 範例，請參閱《Apache Flink 文件》中的[資料表 API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html)。

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## 使用互動式 Python、SQL 和 Scala 的組合
<a name="notebook-example-interactive-pythonsqlscala"></a>

您可以在筆記本中使用 SQL、Python 和 Scala 的任意組合進行互動式分析。在您計劃部署為具有持久狀態的應用程式的 Studio 筆記本中，可以使用 SQL 和 Scala 的組合。此範例顯示略過的區段，以及在應用程式中部署為持久狀態的區段。

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## 使用跨帳戶 Kinesis 資料串流
<a name="notebook-example-crossaccount-kds"></a>

若要使用擁有您 Studio 筆記本之帳戶以外的帳戶中的 Kinesis 資料串流，請在執行 Studio 筆記本的帳戶中建立服務執行角色，在具有資料串流的帳戶中建立角色信任政策。在 Kinesis 連接器中的 create table DDL 陳述式中，使用 `aws.credentials.provider`、`aws.credentials.role.arn` 和 `aws.credentials.role.sessionName`，針對資料串流建立資料表。

對 Studio 筆記本帳戶使用下列服務執行角色。

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

對資料串流帳戶使用 `AmazonKinesisFullAccess` 政策和下列角色信任政策。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

為 create table 陳述式使用下面的段落。

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```

# Managed Service for Apache Flink 的 Studio 筆記本疑難排解
<a name="how-zeppelin-troubleshooting"></a>

本節包含 Studio 筆記本的疑難排解資訊。

## 停止停滯的應用程式
<a name="how-zeppelin-troubleshooting-stopping"></a>

若要停止停滯在暫時狀態的應用程式，請在 `Force` 參數設定為 `true` 的情況下呼叫 [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) 動作。如需詳細資訊，請參閱 [Managed Service for Apache Flink 開發人員指南](https://docs.aws.amazon.com/managed-flink/latest/java/)中的[執行應用程式](https://docs.aws.amazon.com/managed-flink/latest/java/how-running-apps.html)。

## 在無網際網路存取的 VPC 中，部署為具有持久狀態的應用程式
<a name="how-zeppelin-troubleshooting-deploying-no-internet"></a>

Managed Service for Apache Flink Studio 部署為應用程式功能不支援沒有網際網路存取的 VPC 應用程式。我們建議您在 Studio 中建置應用程式，然後使用 Managed Service for Apache Flink 手動建立 Flink 應用程式，並選取您在筆記本中建置的 zip 檔案。

下列步驟概述了這種方法：

1. 建置 Studio 應用程式並將其匯出到 Amazon S3。這必須是一個 zip 檔案。

1. 使用引用 Amazon S3 中 zip 檔案位置的程式碼路徑，手動建立 Managed Service for Apache Flink 應用程式。此外，您將需要使用以下 `env` 變數 (總共 2 個 `groupID`，3 個 `var`) 設定應用程式：

1. kinesis.analytics.flink.run.options

   1. python: source/note.py

   1. jarfile: lib/PythonApplicationDependencies.jar

1. managed.deploy\$1as\$1app.options

   1. DatabaseARN: *<glue database ARN (Amazon Resource Name)>*

1. 您可能需要為應用程式使用的服務授與使用 Managed Service for Apache Flink Studio 和 Managed Service for Apache Flink IAM 角色的許可。您可以為兩個應用程式使用相同的 IAM 角色。

## 部署為應用程式的大小和建置時間減少
<a name="how-zeppelin-troubleshooting-deploying-as-app-reduce-build-time"></a>

適用於 Python 應用程式的 Studio 部署作為應用程式功能會封裝 Python 環境中的所有可用內容，因為我們無法確定您需要哪些程式庫。這可能導致超出需要的部署即應用程式大小。下列程序示範如何透過解除安裝相依性來減少部署即應用程式形式之 Python 應用程式的大小。

如果您正在建置具有 Studio 部署即應用程式功能的 Python 應用程式，不妨考慮從系統中移除預先安裝的 Python 套件，如果您的應用程式不依賴該套件的話。這不僅有助於減少最終成品大小，以避免違反應用程式大小的服務限制，還可以縮短建置具有部署即應用程式功能之應用程式的時間。

您可以執行以下命令以列出所有已安裝的 Python 套件及其各自的安裝大小，並有選擇地移除較大的套件。

```
%flink.pyflink

!pip list --format freeze | awk -F = {'print $1'} | xargs pip show | grep -E 'Location:|Name:' | cut -d ' ' -f 2 | paste -d ' ' - - | awk '{gsub("-","_",$1); print $2 "/" tolower($1)}' | xargs du -sh 2> /dev/null | sort -hr
```

**注意**  
`apache-beam` 是 Flink Python 運作必需的套件。始終不能移除此套件及其相依性。

以下是在 Studio V2 中預先安裝的、可以考慮移除的 Python 套件之清單：

```
scipy
statsmodels
plotnine
seaborn
llvmlite
bokeh
pandas
matplotlib
botocore
boto3
numba
```

**若要從 Zeppelin 筆記本中移除 Python 套件：**

1. 在移除之前，請檢查您的應用程式是否依賴於該套件或其任何消費套件。您可以使用 [pipdeptree](https://pypi.org/project/pipdeptree/) 識別套件的相依性。

1. 執行以下命令來移除套件：

   ```
   %flink.pyflink
   !pip uninstall -y <package-to-remove>
   ```

1. 如果需要檢索錯誤移除的套件，請執行以下命令：

   ```
   %flink.pyflink
   !pip install <package-to-install>
   ```

**Example 範例：在使用部署即應用程式功能部署 Python 應用程式之前移除 `scipy` 套件。**  

1. 使用 `pipdeptree` 發現所有 `scipy` 使用者，並驗證是否可以安全移除 `scipy`。
   + 透過筆記本安裝該工具：

     ```
     %flink.pyflink             
     !pip install pipdeptree
     ```
   + 藉由執行以下命令取得 `scipy` 的反向相依性樹：

     ```
     %flink.pyflink
     !pip -r -p scipy
     ```

     您應該會看到類似下列的輸出 (為求簡化已進行壓縮)：

     ```
     ...
     ------------------------------------------------------------------------ 
     scipy==1.8.0 
     ├── plotnine==0.5.1 [requires: scipy>=1.0.0] 
     ├── seaborn==0.9.0 [requires: scipy>=0.14.0] 
     └── statsmodels==0.12.2 [requires: scipy>=1.1] 
         └── plotnine==0.5.1 [requires: statsmodels>=0.8.0]
     ```

1. 仔細檢查 `seaborn`、`statsmodels` 和 `plotnine` 在應用程式中的使用情況。如果應用程式不依賴 `scipy`、`seaborn`、`statemodels` 或 `plotnine` 中的任意一項，便可移除所有這些套件，或只移除應用程式不需要的套件。

1. 執行以下命令移除套件：

   ```
   !pip uninstall -y scipy plotnine seaborn statemodels
   ```

## 取消任務
<a name="how-notbook-canceling-jobs"></a>

本節說明如何取消無法從 Apache Zeppelin 取得的 Apache Flink 作業。若要取消此類作業，請前往 Apache Flink 儀表板，複製作業 ID，然後在下列其中一個範例中使用它。

取消單一作業：

```
%flink.pyflink
import requests

requests.patch("https://zeppelin-flink:8082/jobs/[job_id]", verify=False)
```

取消所有執行中作業：

```
%flink.pyflink
import requests

r = requests.get("https://zeppelin-flink:8082/jobs", verify=False)
jobs = r.json()['jobs']

for job in jobs:
    if (job["status"] == "RUNNING"):
        print(requests.patch("https://zeppelin-flink:8082/jobs/{}".format(job["id"]), verify=False))
```

取消所有作業：

```
%flink.pyflink
import requests

r = requests.get("https://zeppelin-flink:8082/jobs", verify=False)
jobs = r.json()['jobs']

for job in jobs:
    requests.patch("https://zeppelin-flink:8082/jobs/{}".format(job["id"]), verify=False)
```

## 重新啟動 Apache Flink 解譯器
<a name="how-notbook-restarting-interpreter"></a>

在 Studio 筆記本中重新啟動 Apache Flink 解譯器

1. 選擇熒幕右上角附近的**組態**。

1. 選擇**解譯器**。

1. 選擇**重新啟動**，然後按**確定**。

# 為 Managed Service for Apache Flink Studio 筆記本建立自訂 IAM 政策
<a name="how-zeppelin-appendix-iam"></a>

您通常會使用受管 IAM 政策來允許應用程式存取相依資源。如果需要更好地控制應用程式的許可，可以使用自訂 IAM 政策。本節包含自訂 IAM 政策的範例。

**注意**  
在下列政策範例中，使用應用程式的值取代預留位置文字。

**Topics**
+ [AWS Glue](#how-zeppelin-iam-glue)
+ [CloudWatch Logs](#how-zeppelin-iam-cw)
+ [Kinesis 串流](#how-zeppelin-iam-streams)
+ [Amazon MSK 叢集](#how-zeppelin-iam-msk)

## AWS Glue
<a name="how-zeppelin-iam-glue"></a>

下列範例政策會授予存取 AWS Glue 資料庫的許可。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "GlueTable",
            "Effect": "Allow",
            "Action": [
                "glue:GetConnection",
                "glue:GetTable",
                "glue:GetTables",
                "glue:GetDatabase",
                "glue:CreateTable",
                "glue:UpdateTable"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:123456789012:connection/*",
                "arn:aws:glue:us-east-1:123456789012:table/<database-name>/*",
                "arn:aws:glue:us-east-1:123456789012:database/<database-name>",
                "arn:aws:glue:us-east-1:123456789012:database/hive",
                "arn:aws:glue:us-east-1:123456789012:catalog"
            ]
        },
        {
            "Sid": "GlueDatabase",
            "Effect": "Allow",
            "Action": "glue:GetDatabases",
            "Resource": "*"
        }
    ]
}
```

------

## CloudWatch Logs
<a name="how-zeppelin-iam-cw"></a>

下列範例授與存取 CloudWatch 的許可。

```
{
      "Sid": "ListCloudwatchLogGroups",
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogGroups"
      ],
      "Resource": [
        "arn:aws:logs:<region>:<accountId>:log-group:*"
      ]
    },
    {
      "Sid": "ListCloudwatchLogStreams",
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogStreams"
      ],
      "Resource": [
        "<logGroupArn>:log-stream:*"
      ]
    },
    {
      "Sid": "PutCloudwatchLogs",
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents"
      ],
      "Resource": [
        "<logStreamArn>"
      ]
    }
```

**注意**  
如果使用主控台建立應用程式，則主控台會為應用程式角色新增必要的政策，以存取 CloudWatch Logs。

## Kinesis 串流
<a name="how-zeppelin-iam-streams"></a>

應用程式可以將 Kinesis 串流用於來源或目的地。應用程式需要讀取許可才能從來源串流讀取，需要寫入許可才能寫入目的地串流。

下列政策授與從用作來源的 Kinesis 串流讀取的許可：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisShardDiscovery",
            "Effect": "Allow",
            "Action": "kinesis:ListShards",
            "Resource": "*"
        },
        {
            "Sid": "KinesisShardConsumption",
            "Effect": "Allow",
            "Action": [
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:DeregisterStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>"
        },
        {
            "Sid": "KinesisEfoConsumer",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamConsumer",
                "kinesis:SubscribeToShard"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>/consumer/*"
        }
    ]
}
```

------

下列政策授與向用作目的地的 Kinesis 串流寫入的許可：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisStreamSink",
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:DescribeStreamSummary",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>"
        }
    ]
}
```

------

如果應用程式存取加密的 Kinesis 串流，則必須授與額外的許可，以存取該串流及其加密金鑰。

下列政策授與存取加密來源的串流和及其加密金鑰的許可：

```
{
      "Sid": "ReadEncryptedKinesisStreamSource",
      "Effect": "Allow",
      "Action": [
        "kms:Decrypt"
      ],
      "Resource": [
        "<inputStreamKeyArn>"
      ]
    }
    ,
```

下列政策授與存取加密目的地的串流和及其加密金鑰的許可：

```
{
      "Sid": "WriteEncryptedKinesisStreamSink",
      "Effect": "Allow",
      "Action": [
        "kms:GenerateDataKey"
      ],
      "Resource": [
        "<outputStreamKeyArn>"
      ]
    }
```

## Amazon MSK 叢集
<a name="how-zeppelin-iam-msk"></a>

若要授與 Amazon MSK 叢集的存取權，可以授與叢集 VPC 的存取權。如需存取 Amazon VPC 的政策範例，請參閱 [VPC 應用程式許可](https://docs.aws.amazon.com/managed-flink/latest/java/vpc-permissions.html)。