

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 開始使用 Amazon Managed Service for Apache Flink (DataStream API)
<a name="getting-started"></a>

本節向您介紹 Managed Service for Apache Flink 的基本概念，以及使用 DataStream API 在 Java 中實作應用程式。它描述了建立和測試應用程式的可用選項。此外，它還提供了相關指示，以協助您安裝完成本指南教學課程以及建立您的第一個應用程式所需要的工具。

**Topics**
+ [

## 檢閱 Managed Service for Apache Flink 應用程式的元件
](#getting-started-components)
+ [

## 滿足完成練習的先決條件
](#setting-up-prerequisites)
+ [

# 設定 AWS 帳戶並建立管理員使用者
](setting-up.md)
+ [

# 設定 AWS Command Line Interface (AWS CLI)
](setup-awscli.md)
+ [

# 建立並執行 Managed Service for Apache Flink 應用程式
](get-started-exercise.md)
+ [

# 清除 AWS 資源
](getting-started-cleanup.md)
+ [

# 探索其他資源
](getting-started-next-steps.md)

## 檢閱 Managed Service for Apache Flink 應用程式的元件
<a name="getting-started-components"></a>

**注意**  
Amazon Managed Service for Apache Flink 支援所有 Apache Flink APIs並可能支援所有 JVM 語言。如需詳細資訊，請參閱 [Flink APIs](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/concepts/overview/#flinks-apis)。  
根據您選擇的 API，應用程式和實作的結構略有不同。本入門教學課程涵蓋在 Java 中使用 DataStream API 的應用程式實作。

為了處理資料，您的 Managed Service for Apache Flink 應用程式使用 Java 應用程式來處理輸入，並使用 Apache Flink 執行時間產生輸出。

典型的 Managed Service for Apache Flink 應用程式具有下列元件：
+ **執行期屬性：**您可以使用*執行期屬性*將組態參數傳遞至應用程式，以變更這些參數，而無需修改和重新發佈程式碼。
+ **來源：**應用程式會耗用來自一或多個*來源*的資料。來源使用[連接器](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/overview/)從外部系統讀取資料，例如 Kinesis 資料串流或 Kafka 儲存貯體。如需詳細資訊，請參閱[新增串流資料來源](how-sources.md)。
+ **運算子：**應用程式會使用一或多個*運算子*來處理資料。運算子可以轉換、富集或彙總資料。如需詳細資訊，請參閱[運算子](how-operators.md)。
+ **接收器：**應用程式會透過*接收器*將資料傳送至外部來源。接收器使用[連接器](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/overview/) v 將資料傳送至 Kinesis 資料串流、Kafka 主題、Amazon S3 或關聯式資料庫。您也可以使用特殊連接器來列印輸出，僅用於開發目的。如需詳細資訊，請參閱[使用接收器寫入資料](how-sinks.md)。

您的應用程式需要一些*外部相依性*，例如應用程式使用的 Flink 連接器，或可能的 Java 程式庫。若要在 Amazon Managed Service for Apache Flink 中執行，應用程式必須與相依性一起封裝在 *fat-jar* 中，並上傳至 Amazon S3 儲存貯體。然後建立 Managed Service for Apache Flink 應用程式。您可以傳遞程式碼套件的位置，以及任何其他執行時間組態參數。

本教學課程示範如何使用 Apache Maven 封裝應用程式，以及如何在您選擇的 IDE 中於本機執行應用程式。

## 滿足完成練習的先決條件
<a name="setting-up-prerequisites"></a>

若要完成本指南中的步驟，您必須執行下列各項：
+ [Git 用戶端](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)。如果您尚未安裝 Git 用戶端，請安裝 。
+ [ Java 開發套件 (JDK) 第 11 版](https://www.oracle.com/java/technologies/downloads/#java11)。安裝 Java JDK 11 並設定 `JAVA_HOME`環境變數以指向您的 JDK 安裝位置。如果您沒有 JDK 11，您可以使用 [ Amazon Coretto 11](https://docs.aws.amazon.com/corretto/latest/corretto-11-ug/what-is-corretto-11.html) 或您選擇的任何其他標準 JDK。
  + 若要驗證是否已正確安裝 JDK，請執行下列命令。如果您使用的是 Amazon Corretto 以外的 JDK，則輸出會有所不同。請確定版本為 11.x。

    ```
    $ java --version
    
    openjdk 11.0.23 2024-04-16 LTS
    OpenJDK Runtime Environment Corretto-11.0.23.9.1 (build 11.0.23+9-LTS)
    OpenJDK 64-Bit Server VM Corretto-11.0.23.9.1 (build 11.0.23+9-LTS, mixed mode)
    ```
+ [Apache Maven](https://maven.apache.org/)。如果您尚未安裝 Apache Maven。若要了解如何安裝，請參閱[安裝 Apache Maven](https://maven.apache.org/install.html)。
  + 若要測試您的 Apache Maven 安裝，輸入以下資訊：

  ```
  $ mvn -version
  ```
+ 適用於本機開發的 IDE。我們建議您使用 [Eclipse Java Neon](https://www.eclipse.org/downloads/packages/release/neon/3) 或 [IntelliJ IDEA](https://www.jetbrains.com/idea/) 等開發環境來開發和編譯應用程式。
  + 若要測試您的 Apache Maven 安裝，輸入以下資訊：

  ```
  $ mvn -version
  ```

開始執行，請移至 [設定 AWS 帳戶並建立管理員使用者](setting-up.md)。

# 設定 AWS 帳戶並建立管理員使用者
<a name="setting-up"></a>

第一次使用 Managed Service for Apache Flink 之前，請先完成以下任務：

## 註冊 AWS 帳戶
<a name="sign-up-for-aws"></a>

如果您沒有 AWS 帳戶，請完成下列步驟來建立一個。

**註冊 AWS 帳戶**

1. 開啟 [https://portal.aws.amazon.com/billing/signup](https://portal.aws.amazon.com/billing/signup)。

1. 請遵循線上指示進行。

   部分註冊程序需接收來電或簡訊，並在電話鍵盤輸入驗證碼。

   當您註冊 時 AWS 帳戶，*AWS 帳戶根使用者*會建立 。根使用者有權存取該帳戶中的所有 AWS 服務 和資源。作為安全最佳實務，請將管理存取權指派給使用者，並且僅使用根使用者來執行[需要根使用者存取權的任務](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_root-user.html#root-user-tasks)。

AWS 會在註冊程序完成後傳送確認電子郵件給您。您可以隨時登錄 [https://aws.amazon.com/](https://aws.amazon.com/) 並選擇**我的帳戶**，以檢視您目前的帳戶活動並管理帳戶。

## 建立具有管理存取權的使用者
<a name="create-an-admin"></a>

註冊 後 AWS 帳戶，請保護 AWS 帳戶根使用者、啟用 AWS IAM Identity Center和建立 管理使用者，以免將根使用者用於日常任務。

**保護您的 AWS 帳戶根使用者**

1.  選擇**根使用者**並輸入 AWS 帳戶 您的電子郵件地址，以帳戶擁有者[AWS 管理主控台](https://console.aws.amazon.com/)身分登入 。在下一頁中，輸入您的密碼。

   如需使用根使用者登入的說明，請參閱 *AWS 登入 使用者指南*中的[以根使用者身分登入](https://docs.aws.amazon.com/signin/latest/userguide/console-sign-in-tutorials.html#introduction-to-root-user-sign-in-tutorial)。

1. 若要在您的根使用者帳戶上啟用多重要素驗證 (MFA)。

   如需說明，請參閱《*IAM 使用者指南*》中的[為您的 AWS 帳戶 根使用者 （主控台） 啟用虛擬 MFA 裝置](https://docs.aws.amazon.com/IAM/latest/UserGuide/enable-virt-mfa-for-root.html)。

**建立具有管理存取權的使用者**

1. 啟用 IAM Identity Center。

   如需指示，請參閱《AWS IAM Identity Center 使用者指南》**中的[啟用 AWS IAM Identity Center](https://docs.aws.amazon.com//singlesignon/latest/userguide/get-set-up-for-idc.html)。

1. 在 IAM Identity Center 中，將管理存取權授予使用者。

   如需使用 IAM Identity Center 目錄 做為身分來源的教學課程，請參閱*AWS IAM Identity Center 《 使用者指南*》中的[使用預設值設定使用者存取 IAM Identity Center 目錄](https://docs.aws.amazon.com//singlesignon/latest/userguide/quick-start-default-idc.html)。

**以具有管理存取權的使用者身分登入**
+ 若要使用您的 IAM Identity Center 使用者簽署，請使用建立 IAM Identity Center 使用者時傳送至您電子郵件地址的簽署 URL。

  如需使用 IAM Identity Center 使用者登入的說明，請參閱*AWS 登入 《 使用者指南*》中的[登入 AWS 存取入口網站](https://docs.aws.amazon.com/signin/latest/userguide/iam-id-center-sign-in-tutorial.html)。

**指派存取權給其他使用者**

1. 在 IAM Identity Center 中，建立一個許可集來遵循套用最低權限的最佳實務。

   如需指示，請參閱《AWS IAM Identity Center 使用者指南》**中的[建立許可集](https://docs.aws.amazon.com//singlesignon/latest/userguide/get-started-create-a-permission-set.html)。

1. 將使用者指派至群組，然後對該群組指派單一登入存取權。

   如需指示，請參閱《AWS IAM Identity Center 使用者指南》**中的[新增群組](https://docs.aws.amazon.com//singlesignon/latest/userguide/addgroups.html)。

## 授與程式設計存取權
<a name="setting-up-access"></a>

如果使用者想要與 AWS 外部互動，則需要程式設計存取 AWS 管理主控台。授予程式設計存取權的方式取決於正在存取的使用者類型 AWS。

若要授予使用者程式設計存取權，請選擇下列其中一個選項。


****  

| 哪個使用者需要程式設計存取權？ | 到 | 根據 | 
| --- | --- | --- | 
| IAM | （建議） 使用主控台登入資料做為臨時登入資料，以簽署對 AWS CLI、 AWS SDKs程式設計請求。 AWS APIs |  請依照您要使用的介面所提供的指示操作。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/setting-up.html)  | 
|  人力資源身分 (IAM Identity Center 中管理的使用者)  | 使用暫時登入資料簽署對 AWS CLI、 AWS SDKs程式設計請求。 AWS APIs |  請依照您要使用的介面所提供的指示操作。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/setting-up.html)  | 
| IAM | 使用暫時登入資料簽署對 AWS CLI、 AWS SDKs程式設計請求。 AWS APIs | 遵循《IAM 使用者指南》中[將臨時登入資料與 AWS 資源](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html)搭配使用的指示。 | 
| IAM | (不建議使用)使用長期憑證簽署對 AWS CLI、 AWS SDKs程式設計請求。 AWS APIs |  請依照您要使用的介面所提供的指示操作。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/setting-up.html)  | 

## 後續步驟
<a name="setting-up-next-step-2"></a>

[設定 AWS Command Line Interface (AWS CLI)](setup-awscli.md)

# 設定 AWS Command Line Interface (AWS CLI)
<a name="setup-awscli"></a>

在此步驟中，您會下載並設定 AWS CLI 以搭配 Managed Service for Apache Flink 使用。

**注意**  
本指南的入門練習均假設您使用帳戶中的管理員登入資料 (`adminuser`) 來執行操作。

**注意**  
如果您已 AWS CLI 安裝 ，您可能需要升級 以取得最新功能。如需詳細資訊，請參閱《AWS Command Line Interface 使用者指南》**中的[安裝 AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/installing.html)。若要檢查 的版本 AWS CLI，請執行下列命令：  

```
aws --version
```
本教學課程中的練習需要以下 AWS CLI 版本 或更新版本：  

```
aws-cli/1.16.63
```

**若要設定 AWS CLI**

1. 下載和設定 AWS CLI。如需相關指示，請參閱《AWS Command Line Interface 使用者指南》**中的下列主題：
   + [安裝 AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-set-up.html)
   + [設定 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html)

1. 在 AWS CLI `config` 檔案中為管理員使用者新增具名設定檔。當您執行 AWS CLI 命令時，使用此設定檔。如需具名描述檔的詳細資訊，請參閱《AWS Command Line Interface 使用者指南》**中的[具名描述檔](https://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html)。

   ```
   [profile adminuser]
   aws_access_key_id = adminuser access key ID
   aws_secret_access_key = adminuser secret access key
   region = aws-region
   ```

   如需可用 AWS 區域的清單，請參閱《》中的[區域和端點](https://docs.aws.amazon.com/general/latest/gr/rande.html)*Amazon Web Services 一般參考*。
**注意**  
本教學課程中的範例程式碼和命令使用 us-east-1 美國東部 （維吉尼亞北部） 區域。若要使用其他區域，請將本教學課程的程式碼和指令中的「區域」變更為您要使用的區域。

1. 在命令提示字元中輸入下列 help 命令，以驗證設定：

   ```
   aws help
   ```

設定 AWS 帳戶和 之後 AWS CLI，您可以嘗試下一個練習，在其中設定範例應用程式並測試end-to-end設定。

## 下一步驟
<a name="setup-awscli-next-step-3"></a>

[建立並執行 Managed Service for Apache Flink 應用程式](get-started-exercise.md)

# 建立並執行 Managed Service for Apache Flink 應用程式
<a name="get-started-exercise"></a>

在此步驟中，您會建立 Managed Service for Apache Flink 應用程式，並將 Kinesis 資料串流做為來源和接收器。

**Topics**
+ [

## 建立相依資源
](#get-started-exercise-0)
+ [

## 設定您的本機開發環境
](#get-started-exercise-2)
+ [

## 下載並檢查 Apache Flink 串流 Java 程式碼
](#get-started-exercise-5)
+ [

## 將範例記錄寫入輸入串流
](#get-started-exercise-5-4)
+ [

## 在本機執行您的應用程式
](#get-started-exercise-5-run)
+ [

## 觀察 Kinesis 串流中的輸入和輸出資料
](#get-started-exercise-input-output)
+ [

## 停止應用程式在本機執行
](#get-started-exercise-stop)
+ [

## 編譯和封裝您的應用程式程式碼
](#get-started-exercise-5-5)
+ [

## 上傳應用程式碼 JAR 檔案
](#get-started-exercise-6)
+ [

## 建立和設定 Managed Service for Apache Flink 應用程式
](#get-started-exercise-7)
+ [

## 下一步驟
](#get-started-exercise-next-step-4)

## 建立相依資源
<a name="get-started-exercise-0"></a>

在為本練習建立 Managed Service for Apache Flink 應用程式之前，先建立下列相依資源：
+ 兩個用於輸入和輸出的 Kinesis 資料串流
+ 存放應用程式程式碼的 Amazon S3 儲存貯體
**注意**  
本教學假設您正在 us-east-1 美國東部 （維吉尼亞北部） 區域中部署應用程式。如果您使用另一個區域，請相應地調整所有步驟。

### 建立兩個 Amazon Kinesis 資料串流
<a name="get-started-exercise-1"></a>

在為本練習建立 Managed Service for Apache Flink 應用程式之前，請先建立兩個 Kinesis 資料串流 (`ExampleInputStream` 和 `ExampleOutputStream`)。您的應用程式會將這些串流用於應用程式來源和目的地串流。

您可以使用 Amazon Kinesis 主控台或下列 AWS CLI 命令來建立這些串流。如需主控台指示，請參閱《Amazon Kinesis Data Streams 開發人員指南》中的[建立和更新資料串流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)**。若要使用 建立串流 AWS CLI，請使用下列命令，調整至您用於應用程式的 區域。

**建立資料串流 (AWS CLI)**

1. 若要建立第一個串流 (`ExampleInputStream`)，請使用下列 Amazon Kinesis `create-stream` AWS CLI 命令：

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-east-1 \
   ```

1. 若要建立應用程式用來寫入輸出的第二個串流，請執行相同的命令，將串流名稱變更為 `ExampleOutputStream`：

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-east-1 \
   ```

### 為應用程式碼建立 Amazon S3 儲存貯體
<a name="get-started-exercise-1-5"></a>

您可以使用主控台建立 Amazon S3 儲存貯體。若要了解如何使用主控台建立 Amazon S3 儲存貯體，請參閱《[Amazon S3 使用者指南](https://docs.aws.amazon.com/AmazonS3/latest/userguide/)》中的[建立儲存貯體](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)。使用全域唯一名稱命名 Amazon S3 儲存貯體，例如透過附加您的登入名稱。

**注意**  
 請確定您在用於本教學課程的 區域中建立儲存貯體 (us-east-1)。

### 其他資源
<a name="get-started-exercise-1-6"></a>

當您建立應用程式時，Managed Service for Apache Flink 會在資源不存在時自動建立下列 Amazon CloudWatch 資源：
+ 名為 `/AWS/KinesisAnalytics-java/<my-application>` 的日誌群組。
+ 名為 `kinesis-analytics-log-stream` 的日誌串流。

## 設定您的本機開發環境
<a name="get-started-exercise-2"></a>

對於開發和偵錯，您可以直接從您選擇的 IDE 在機器上執行 Apache Flink 應用程式。任何 Apache Flink 相依性都會像一般 Java 相依性一樣使用 Apache Maven 來處理。

**注意**  
在您的開發機器上，您必須安裝 Java JDK 11、Maven 和 Git。我們建議您使用開發環境，例如 [Eclipse Java Neon](https://www.eclipse.org/downloads/packages/release/neon/3) 或 [IntelliJ IDEA](https://www.jetbrains.com/idea/)。若要驗證您是否符合所有先決條件，請參閱 [滿足完成練習的先決條件](getting-started.md#setting-up-prerequisites)。**您不需要**在機器上安裝 Apache Flink 叢集。

### 驗證您的 AWS 工作階段
<a name="get-started-exercise-2-5"></a>

應用程式使用 Kinesis 資料串流來發佈資料。在本機執行時，您必須擁有有效的已 AWS 驗證工作階段，具有寫入 Kinesis 資料串流的許可。使用下列步驟來驗證您的工作階段：

1. 如果您沒有設定 AWS CLI 和具有有效登入資料的具名設定檔，請參閱 [設定 AWS Command Line Interface (AWS CLI)](setup-awscli.md)。

1. 透過發佈下列測試記錄，確認您的 AWS CLI 已正確設定，且您的使用者具有寫入 Kinesis 資料串流的許可：

   ```
   $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
   ```

1. 如果您的 IDE 有要整合的外掛程式 AWS，您可以使用它將登入資料傳遞至在 IDE 中執行的應用程式。如需詳細資訊，請參閱 [AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/) 和 [AWS Toolkit for Eclipse](https://docs.aws.amazon.com/toolkit-for-eclipse/v1/user-guide/welcome.html)。

## 下載並檢查 Apache Flink 串流 Java 程式碼
<a name="get-started-exercise-5"></a>

此範例的 Java 應用程式的程式碼可從 GitHub 下載。若要下載應用程式的程式碼，請執行下列動作：

1. 使用以下指令複製遠端儲存庫：

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. 導覽至 `amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted` 目錄。

### 檢閱應用程式元件
<a name="get-started-exercise-5-1"></a>

應用程式完全在 `com.amazonaws.services.msf.BasicStreamingJob`類別中實作。`main()` 方法定義處理串流資料和執行資料的資料流程。

**注意**  
為了獲得最佳化的開發人員體驗，應用程式設計為在 Amazon Managed Service for Apache Flink 和本機上執行，在 IDE 中進行開發時不會有任何程式碼變更。
+ 若要讀取執行時間組態，以便在 Amazon Managed Service for Apache Flink 和 IDE 中執行時運作，應用程式會自動偵測它是否在 IDE 中於本機獨立執行。在這種情況下，應用程式會以不同的方式載入執行期組態：

  1. 當應用程式偵測到其在您的 IDE 中以獨立模式執行時，請形成包含在專案**資源**資料夾中`application_properties.json`的檔案。檔案的內容如下。

  1. 當應用程式在 Amazon Managed Service for Apache Flink 中執行時，預設行為會從您在 Amazon Managed Service for Apache Flink 應用程式中定義的執行期屬性載入應用程式組態。請參閱 [建立和設定 Managed Service for Apache Flink 應用程式](#get-started-exercise-7)。

     ```
     private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
         if (env instanceof LocalStreamEnvironment) {
             LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
             return KinesisAnalyticsRuntime.getApplicationProperties(
                     BasicStreamingJob.class.getClassLoader()
                             .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
         } else {
             LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
             return KinesisAnalyticsRuntime.getApplicationProperties();
         }
     }
     ```
+ `main()` 方法定義應用程式資料流程並執行它。
  + 初始化預設串流環境。在此範例中，我們會示範如何建立`StreamExecutionEnvironment`要與 DataSteam API 搭配使用的 ，以及`StreamTableEnvironment`要與 SQL 和資料表 API 搭配使用的 。兩個環境物件是相同執行時間環境的兩個不同參考，以使用不同的 APIs。

    ```
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    ```
  + 載入應用程式組態參數。這會自動從正確的位置載入它們，視應用程式執行的位置而定：

    ```
    Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    ```
  + 應用程式使用 [Kinesis Consumer](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/#kinesis-consumer) 連接器定義來源，以從輸入串流讀取資料。輸入串流的組態在 `PropertyGroupId`= 中定義`InputStream0`。串流的名稱和區域分別位於名為 `stream.name`和 的屬性中`aws.region`。為了簡化，此來源會將記錄讀取為字串。

    ```
    private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) {
        String inputStreamName = inputProperties.getProperty("stream.name");
        return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties);
    }
    ...
    
    public static void main(String[] args) throws Exception { 
       ...
       SourceFunction<String> source = createSource(applicationParameters.get("InputStream0"));
       DataStream<String> input = env.addSource(source, "Kinesis Source");  
       ...
    }
    ```
  + 應用程式接著會使用 [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/#kinesis-streams-sink) 連接器定義接收器，將資料傳送至輸出串流。輸出串流名稱和區域在 `PropertyGroupId`= 中定義`OutputStream0`，類似於輸入串流。接收器會直接連接到從來源`DataStream`取得資料的 內部。在實際的應用程式中，您會在來源和接收之間進行一些轉換。

    ```
    private static KinesisStreamsSink<String> createSink(Properties outputProperties) {
        String outputStreamName = outputProperties.getProperty("stream.name");
        return KinesisStreamsSink.<String>builder()
                .setKinesisClientProperties(outputProperties)
                .setSerializationSchema(new SimpleStringSchema())
                .setStreamName(outputStreamName)
                .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
                .build();
    }
    ...
    public static void main(String[] args) throws Exception { 
       ...
       Sink<String> sink = createSink(applicationParameters.get("OutputStream0"));
       input.sinkTo(sink);
       ...
    }
    ```
  + 最後，執行您剛定義的資料流程。這必須是`main()`方法的最後一個指示，在您定義所有運算子之後，資料流程需要：

    ```
    env.execute("Flink streaming Java API skeleton");
    ```

### 使用 pom.xml 檔案
<a name="get-started-exercise-5-2"></a>

pom.xml 檔案會定義應用程式所需的所有相依性，並設定 Maven Shade 外掛程式來建置包含 Flink 所需所有相依性的 fat-jar。
+ 有些相依性具有`provided`範圍。當應用程式在 Amazon Managed Service for Apache Flink 中執行時，這些相依性會自動可用。它們需要用來編譯應用程式，或在 IDE 中本機執行應用程式。如需詳細資訊，請參閱[在本機執行您的應用程式](#get-started-exercise-5-run)。請確定您使用的 Flink 版本與您在 Amazon Managed Service for Apache Flink 中使用的執行時間相同。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ 您必須使用預設範圍將額外的 Apache Flink 相依性新增至 pom，例如此應用程式使用的 [Kinesis 連接器](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kinesis/)。如需詳細資訊，請參閱[使用 Apache Flink 連接器](how-flink-connectors.md)。您也可以新增應用程式所需的任何其他 Java 相依性。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kinesis</artifactId>
      <version>${aws.connector.version}</version>
  </dependency>
  ```
+ Maven Java 編譯器外掛程式可確保程式碼是根據 Apache Flink 目前支援的 JDK 版本 Java 11 編譯。
+ Maven Shade 外掛程式會封裝 fat-jar，不包括執行時間提供的一些程式庫。它也會指定兩個轉換器： `ServicesResourceTransformer`和 `ManifestResourceTransformer`。後者會設定 類別，其中包含啟動應用程式的 `main`方法。如果您重新命名主類別，請不要忘記更新此轉換器。
+ 

  ```
  <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      ...
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass>
          </transformer>
      ...
  </plugin>
  ```

## 將範例記錄寫入輸入串流
<a name="get-started-exercise-5-4"></a>

在本節中，您將傳送範例記錄到串流，供應用程式處理。您可以使用 Python 指令碼或 [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator) 產生範例資料。

### 使用 Python 指令碼產生範例資料
<a name="get-started-exercise-5-4-1"></a>

您可以使用 Python 指令碼將範例記錄傳送至串流。

**注意**  
若要執行此 Python 指令碼，您必須使用 Python 3.x 並安裝[AWS 適用於 Python (Boto) 的 SDK](https://aws.amazon.com/developer/language/python/) 程式庫。

**若要開始將測試資料傳送至 Kinesis 輸入串流：**

1. 從資料產生器 [ GitHub 儲存庫下載資料產生器](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/data-generator) `stock.py` Python 指令碼。

1. 執行 `stock.py` 指令碼：

   ```
   $ python stock.py
   ```

在您完成教學課程的其餘部分時，請讓指令碼保持執行。您現在可以執行 Apache Flink 應用程式。

### 使用 Kinesis Data Generator 產生範例資料
<a name="get-started-exercise-5-4-2"></a>

或者，若要使用 Python 指令碼，您可以使用[託管版本](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html)中也提供的 [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator)，將隨機範例資料傳送至串流。Kinesis Data Generator 會在您的瀏覽器中執行，您不需要在機器上安裝任何項目。

**若要設定和執行 Kinesis Data Generator：**

1. 遵循 [Kinesis Data Generator 文件](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)中的指示來設定對工具的存取。您將執行設定使用者和密碼的 CloudFormation 範本。

1. 透過 CloudFormation 範本產生的 URL 存取 Kinesis Data Generator。CloudFormation 範本完成後，您可以在**輸出**索引標籤中找到 URL。

1. 設定資料產生器：
   + **區域：**選取您用於本教學課程的區域：us-east-1
   + **串流/交付串流：**選取應用程式將使用的輸入串流： `ExampleInputStream`
   + **每秒記錄數：**100
   + **記錄範本：**複製並貼上下列範本：

     ```
     {
       "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}},
       "ticker" : "{{random.arrayElement(
             ["AAPL", "AMZN", "MSFT", "INTC", "TBV"]
         )}}",
       "price" : {{random.number(100)}}          
     }
     ```

1. 測試範本：選擇**測試範本**，並確認產生的記錄與以下內容類似：

   ```
   { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
   ```

1. 啟動資料產生器：選擇**選取傳送資料**。

Kinesis Data Generator 現在正在將資料傳送至 `ExampleInputStream`。

## 在本機執行您的應用程式
<a name="get-started-exercise-5-run"></a>

您可以在 IDE 中的本機執行和偵錯 Flink 應用程式。

**注意**  
繼續之前，請確認輸入和輸出串流是否可用。請參閱 [建立兩個 Amazon Kinesis 資料串流](#get-started-exercise-1)。此外，請確認您具有從兩個串流讀取和寫入的許可。請參閱 [驗證您的 AWS 工作階段](#get-started-exercise-2-5)。  
設定本機開發環境需要 Java 11 JDK、Apache Maven 和 以及適用於 Java 開發的 IDE。確認您符合必要的先決條件。請參閱 [滿足完成練習的先決條件](getting-started.md#setting-up-prerequisites)。

### 將 Java 專案匯入 IDE
<a name="get-started-exercise-5-run-1"></a>

若要開始在 IDE 中使用應用程式，您必須將其匯入為 Java 專案。

您複製的儲存庫包含多個範例。每個範例都是個別的專案。在本教學課程中，將`./java/GettingStarted`子目錄中的內容匯入 IDE。

使用 Maven 將程式碼插入為現有的 Java 專案。

**注意**  
匯入新 Java 專案的確切程序取決於您使用的 IDE。

### 檢查本機應用程式組態
<a name="get-started-exercise-5-run-2"></a>

在本機執行時，應用程式會使用 下專案資源資料夾中 `application_properties.json` 檔案中的組態`./src/main/resources`。您可以編輯此檔案以使用不同的 Kinesis 串流名稱或區域。

```
[
  {
    "PropertyGroupId": "InputStream0",
    "PropertyMap": {
      "stream.name": "ExampleInputStream",
      "flink.stream.initpos": "LATEST",
      "aws.region": "us-east-1"
    }
  },
  {
    "PropertyGroupId": "OutputStream0",
    "PropertyMap": {
      "stream.name": "ExampleOutputStream",
      "aws.region": "us-east-1"
    }
  }
]
```

### 設定 IDE 執行組態
<a name="get-started-exercise-5-run-3"></a>

您可以執行主要類別 ，直接從 IDE 執行和偵錯 Flink 應用程式`com.amazonaws.services.msf.BasicStreamingJob`，就像執行任何 Java 應用程式一樣。在執行應用程式之前，您必須設定執行組態。設定取決於您使用的 IDE。例如，請參閱 IntelliJ IDEA 文件中的[執行/偵錯組態](https://www.jetbrains.com/help/idea/run-debug-configuration.html)。特別是，您必須設定下列項目：

1. **將`provided`相依性新增至 classpath**。這是必要的，以確保在本機執行時，具有`provided`範圍的相依性會傳遞至應用程式。如果沒有此設定，應用程式會立即顯示`class not found`錯誤。

1. **傳遞 AWS 登入資料以存取應用程式的 Kinesis 串流**。最快的方法是使用 [AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/)。在執行組態中使用此 IDE 外掛程式，您可以選取特定 AWS 設定檔。 AWS 身分驗證會使用此設定檔進行。您不需要直接傳遞 AWS 登入資料。

1. 確認 IDE 使用 **JDK 11 **執行應用程式。

### 在 IDE 中執行應用程式
<a name="get-started-exercise-5-run-4"></a>

設定 的執行組態後`BasicStreamingJob`，您可以像一般 Java 應用程式一樣執行或偵錯組態。

**注意**  
您無法`java -jar ...`直接從命令列使用 執行 Maven 產生的 fat-jar。此 jar 不包含獨立執行應用程式所需的 Flink 核心相依性。

當應用程式成功啟動時，它會記錄有關獨立迷你叢集和連接器初始化的一些資訊。這後面接著一些 INFO 和一些 WARN 日誌，Flink 通常會在應用程式啟動時發出。

```
13:43:31,405 INFO  com.amazonaws.services.msf.BasicStreamingJob                 [] - Loading application properties from 'flink-application-properties-dev.json'
13:43:31,549 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb.
13:43:31,677 INFO  org.apache.flink.runtime.minicluster.MiniCluster             [] - Starting Flink Mini Cluster
....
```

初始化完成後，應用程式不會發出任何進一步的日誌項目。**資料流動時，不會發出日誌。**

若要驗證應用程式是否正確處理資料，您可以檢查輸入和輸出 Kinesis 串流，如下節所述。

**注意**  
 不發出有關流動資料的日誌是 Flink 應用程式的正常行為。在每個記錄上發出日誌對於偵錯可能很方便，但在生產環境中執行時可能會增加大量的額外負荷。

## 觀察 Kinesis 串流中的輸入和輸出資料
<a name="get-started-exercise-input-output"></a>

您可以使用 Amazon Kinesis 主控台中的**資料檢視器**，觀察 （產生範例 Python) 或 Kinesis Data Generator （連結） 傳送至輸入串流的記錄。 Amazon Kinesis 

**若要觀察記錄**

1. 在以下網址開啟 Kinesis 主控台：[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)。

1. 確認您執行本教學課程的區域相同，預設為 us-east-1 美國東部 （維吉尼亞北部）。如果區域不相符，請變更區域。

1. 選擇**資料串流**。

1. 選取您要觀察的串流，或 `ExampleInputStream` `ExampleOutputStream.`

1. 選擇**資料檢視器**標籤。

1. 選擇任何**碎片**，保持**最新**為**開始位置**，然後選擇**取得記錄**。您可能會看到「找不到此請求的記錄」錯誤。若是如此，請選擇**重試取得記錄**。發佈至串流顯示的最新記錄。

1. 選擇資料欄中的值，以 JSON 格式檢查記錄的內容。

## 停止應用程式在本機執行
<a name="get-started-exercise-stop"></a>

停止在 IDE 中執行的應用程式。IDE 通常提供「停止」選項。確切的位置和方法取決於您使用的 IDE。

## 編譯和封裝您的應用程式程式碼
<a name="get-started-exercise-5-5"></a>

在本節中，您會使用 Apache Maven 編譯 Java 程式碼，並將其封裝成 JAR 檔案。您可以使用 Maven 命令列工具或 IDE 來編譯和封裝程式碼。

**若要使用 Maven 命令列編譯和封裝：**

移至包含 Java GettingStarted 專案的目錄，並執行下列命令：

```
$ mvn package
```

**若要使用 IDE 編譯和封裝：**

`mvn package` 從 IDE Maven 整合執行 。

在這兩種情況下，都會建立下列 JAR 檔案：`target/amazon-msf-java-stream-app-1.0.jar`。

**注意**  
 從 IDE 執行「建置專案」可能不會建立 JAR 檔案。

## 上傳應用程式碼 JAR 檔案
<a name="get-started-exercise-6"></a>

在本節中，您將您在上一節中建立的 JAR 檔案上傳至在本教學課程開始時建立的 Amazon Simple Storage Service (Amazon S3) 儲存貯體。如果您尚未完成此步驟，請參閱 （連結）。

**上傳應用程式碼 JAR 檔案**

1. 開啟位於 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/) 的 Amazon S3 主控台。

1. 選擇您先前為應用程式碼建立的儲存貯體。

1. 選擇**上傳**。

1. 選擇 **Add files (新增檔案)**。

1. 導覽至上一個步驟中產生的 JAR 檔案：`target/amazon-msf-java-stream-app-1.0.jar`。

1. 選擇**上傳**，而不變更任何其他設定。

**警告**  
請務必在 中選取正確的 JAR 檔案`<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar`。  
`target` 目錄也包含您不需要上傳的其他 JAR 檔案。

## 建立和設定 Managed Service for Apache Flink 應用程式
<a name="get-started-exercise-7"></a>

您可以使用主控台或 AWS CLI建立和執行 Managed Service for Apache Flink 應用程式。在本教學課程中，您將使用 主控台。

**注意**  
當您使用 主控台建立應用程式時，系統會為您建立 AWS Identity and Access Management (IAM) 和 Amazon CloudWatch Logs 資源。當您使用 建立應用程式時 AWS CLI，您可以分別建立這些資源。

**Topics**
+ [

### 建立應用程式
](#get-started-exercise-7-console-create)
+ [

### 編輯 IAM 政策
](#get-started-exercise-7-console-iam)
+ [

### 設定應用程式
](#get-started-exercise-7-console-configure)
+ [

### 執行應用程式
](#get-started-exercise-7-console-run)
+ [

### 觀察執行中應用程式的指標
](#get-started-exercise-7-console-stop)
+ [

### 觀察 Kinesis 串流中的輸出資料
](#get-started-exercise-7-console-output)
+ [

### 停止應用程式
](#get-started-exercise-stop)

### 建立應用程式
<a name="get-started-exercise-7-console-create"></a>

**建立應用程式**

1. 登入 AWS 管理主控台，並在 https：//https://console.aws.amazon.com/flink 開啟 Amazon MSF 主控台。

1. 確認已選取正確的區域：us-east-1 美國東部 （維吉尼亞北部）

1. 開啟右側的選單，然後選擇 **Apache Flink 應用程式**，然後選擇**建立串流應用程式**。或者，在初始頁面的入門容器中選擇**建立串流應用程式**。

1. 在**建立串流應用程式**頁面上：
   + **選擇設定串流處理應用程式的方法：**選擇**從頭開始建立**。
   + **Apache Flink 組態、Application Flink 版本：**選擇 **Apache Flink 1.20**。

1. 設定您的應用程式
   + **應用程式名稱：**輸入 **MyApplication**。
   + **描述：**輸入 **My java test app**。
   + **存取應用程式資源：**選擇**`kinesis-analytics-MyApplication-us-east-1`使用必要政策建立/更新 IAM 角色**。

1. 設定**您的範本以進行應用程式設定**
   + **範本：**選擇**開發**。

1. 選擇頁面底部的**建立串流應用程式**。

**注意**  
使用主控台建立 Managed Service for Apache Flink 應用程式時，可以選擇是否為應用程式建立 IAM 角色和政策。應用程式使用此角色和政策來存取其相依資源。這些 IAM 資源會如下所述使用您的應用程式名稱和區域命名：  
政策：`kinesis-analytics-service-MyApplication-us-east-1`
角色：`kinesisanalytics-MyApplication-us-east-1`
Amazon Managed Service for Apache Flink 先前稱為 Kinesis Data Analytics。自動建立的資源名稱會加上 字首，`kinesis-analytics-`以確保回溯相容性。

### 編輯 IAM 政策
<a name="get-started-exercise-7-console-iam"></a>

編輯 IAM 政策來新增存取 Kinesis 資料串流的許可。

**編輯政策**

1. 前往 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 開啟 IAM 主控台。

1. 選擇**政策**。選擇主控台為您在上一節所建立的 **`kinesis-analytics-service-MyApplication-us-east-1`** 政策。

1. 選擇**編輯**，然後選擇 **JSON** 索引標籤。

1. 將下列政策範例的反白部分新增至政策。使用您的帳戶 ID 取代範例帳戶 ID (*012345678901*)。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

1.  選擇頁面底部的**下一步**，然後選擇**儲存變更**。

### 設定應用程式
<a name="get-started-exercise-7-console-configure"></a>

編輯應用程式組態以設定應用程式程式碼成品。

**編輯組態**

1. 在 **MyApplication** 頁面，選擇**設定**。

1. 在**應用程式碼位置**區段中：
   + 針對 **Amazon S3 儲存貯**體，選取您先前為應用程式碼建立的儲存貯體。選擇**瀏覽**並選取正確的儲存貯體，然後選取**選擇**。請勿按一下儲存貯體名稱。
   + 對於 **Amazon S3 物件的路徑**，請輸入 **amazon-msf-java-stream-app-1.0.jar**。

1. 針對**存取許可**，選擇**`kinesis-analytics-MyApplication-us-east-1`使用必要政策建立/更新 IAM 角色**。

1. 在**執行期屬性**區段中，新增下列屬性。

1. 選擇**新增項目**並新增下列每個參數：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/get-started-exercise.html)

1. 請勿修改任何其他區段。

1. 選擇**儲存變更**。

**注意**  
當您選擇啟用 Amazon CloudWatch 日誌時，Managed Service for Apache Flink 便會為您建立日誌群組和日誌串流。這些資源的名稱如下所示：  
日誌群組：`/aws/kinesis-analytics/MyApplication`
日誌串流：`kinesis-analytics-log-stream`

### 執行應用程式
<a name="get-started-exercise-7-console-run"></a>

應用程式現在已設定並準備好執行。

**執行應用程式**

1. 在 Amazon Managed Service for Apache Flink 的 主控台上，選擇**我的應用程式**，然後選擇**執行**。

1. 在下一頁的應用程式還原組態頁面上，選擇**使用最新的快照執行**，然後選擇**執行**。

   **應用程式詳細資訊**中的**狀態**會從 `Ready` 轉換為 `Starting` ，然後在應用程式啟動`Running`時轉換為 。

當應用程式處於 `Running` 狀態時，您現在可以開啟 Flink 儀表板。

**開啟 儀表板**

1. 選擇**開啟 Apache Flink 儀表板**。儀表板會在新頁面上開啟。

1. 在**執行中任務**清單中，選擇您可以看到的單一任務。
**注意**  
如果您設定執行期屬性或不正確地編輯 IAM 政策，應用程式狀態可能會變成 `Running`，但 Flink 儀表板會顯示任務正在持續重新啟動。如果應用程式設定錯誤或缺少存取外部資源的許可，這是常見的失敗案例。  
發生這種情況時，請檢查 Flink 儀表板中的**例外**狀況索引標籤，以查看問題的原因。

### 觀察執行中應用程式的指標
<a name="get-started-exercise-7-console-stop"></a>

在 **MyApplication** 頁面的 **Amazon CloudWatch 指標**區段中，您可以從執行中的應用程式查看一些基本指標。

**檢視指標**

1. 在**重新整理**按鈕旁，從下拉式清單中選取 **10 秒**。

1. 當應用程式執行正常時，您可以看到**運作時間**指標持續增加。

1. **fullrestarts** 指標應為零。如果增加，組態可能會發生問題。若要調查問題，請檢閱 Flink 儀表板上的**例外狀況**索引標籤。

1. 在運作狀態良好的應用程式中，**失敗檢查點指標的數量**應為零。
**注意**  
此儀表板會顯示一組固定的指標，精細程度為 5 分鐘。您可以使用 CloudWatch 儀表板中的任何指標來建立自訂應用程式儀表板。

### 觀察 Kinesis 串流中的輸出資料
<a name="get-started-exercise-7-console-output"></a>

請確定您仍在使用 Python 指令碼或 Kinesis Data Generator 將資料發佈至輸入。

您現在可以使用 [https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/)：// 中的資料檢視器來觀察 Managed Service for Apache Flink 上執行之應用程式的輸出，與先前已執行的操作類似。

**檢視輸出**

1. 在以下網址開啟 Kinesis 主控台：[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)。

1. 確認區域與您用來執行本教學課程的區域相同。根據預設，它是 us-east-1US East (N. Virginia)。視需要變更 區域。

1. 選擇**資料串流**。

1. 選取您要觀察的串流。在本教學課程中，使用 `ExampleOutputStream`。

1.  選擇**資料檢視器**標籤。

1. 選取任何**碎片**，保持**最新**為**開始位置**，然後選擇**取得記錄**。您可能會看到「找不到此請求的記錄」錯誤。若是如此，請選擇**重試取得記錄**。發佈至串流顯示的最新記錄。

1. 選取資料欄中的值，以 JSON 格式檢查記錄的內容。

### 停止應用程式
<a name="get-started-exercise-stop"></a>

若要停止應用程式，請前往名為 的 Managed Service for Apache Flink 應用程式的主控台頁面`MyApplication`。

**停止應用程式**

1. 從**動作**下拉式清單中，選擇**停止**。

1. **應用程式詳細資訊**中的**狀態**會從 轉換為 `Running` `Stopping`，然後在應用程式完全停止`Ready`時轉換為 。
**注意**  
別忘了也要停止從 Python 指令碼或 Kinesis Data Generator 將資料傳送至輸入串流。

## 下一步驟
<a name="get-started-exercise-next-step-4"></a>

[清除 AWS 資源](getting-started-cleanup.md)

# 清除 AWS 資源
<a name="getting-started-cleanup"></a>

本節包含清除本入門 (DataStream API) 教學課程中建立 AWS 之資源的程序。

**Topics**
+ [

## 刪除 Managed Service for Apache Flink 應用程式
](#getting-started-cleanup-app)
+ [

## 刪除您的 Kinesis 資料串流
](#getting-started-cleanup-stream)
+ [

## 刪除您的 Amazon S3 物件和儲存貯體
](#getting-started-cleanup-s3)
+ [

## 刪除您的 IAM 資源
](#getting-started-cleanup-iam)
+ [

## 刪除您的 CloudWatch 資源
](#getting-started-cleanup-cw)
+ [

## 探索 Apache Flink 的其他資源
](#getting-started-cleanup-next-step-5)

## 刪除 Managed Service for Apache Flink 應用程式
<a name="getting-started-cleanup-app"></a>

請使用下列程序刪除應用程式。

1. 在以下網址開啟 Kinesis 主控台：[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)。

1. 在 Managed Service for Apache Flink 面板中，選擇 **MyApplication**。

1. 從**動作**下拉式清單中，選擇**刪除**，然後確認刪除。

## 刪除您的 Kinesis 資料串流
<a name="getting-started-cleanup-stream"></a>

1. 登入 AWS 管理主控台，然後開啟位於 https：//https://console.aws.amazon.com/flink 的 Amazon MSF 主控台。

1. 選擇**資料串流**。

1. 選取您建立的兩個串流，`ExampleInputStream`以及 `ExampleOutputStream`。

1. 從**動作**下拉式清單中，選擇**刪除**，然後確認刪除。

## 刪除您的 Amazon S3 物件和儲存貯體
<a name="getting-started-cleanup-s3"></a>

使用下列程序刪除 Amazon S3 物件和儲存貯體。

**從 S3 儲存貯體刪除物件**

1. 開啟位於 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/) 的 Amazon S3 主控台。

1. 選取您為應用程式成品建立的 S3 儲存貯體。

1. 選取您上傳的應用程式成品，名為 `amazon-msf-java-stream-app-1.0.jar`。

1. 選擇**刪除**並確認刪除。

**刪除 S3 儲存貯體**

1. 開啟位於 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/) 的 Amazon S3 主控台。

1. 選取您為成品建立的儲存貯體。

1. 選擇**刪除**並確認刪除。
**注意**  
S3 儲存貯體必須為空白，才能將其刪除。

## 刪除您的 IAM 資源
<a name="getting-started-cleanup-iam"></a>

**刪除 IAM 資源**

1. 前往 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 開啟 IAM 主控台。

1. 在導覽列中，選擇**政策**。

1. 在篩選器控制項中，輸入 **kinesis**。

1. 選擇 **kinesis-analytics-service-MyApplication-us-east-1** 政策。

1. 選擇**政策動作**，然後選擇**刪除**。

1. 在導覽列中，選擇**角色**。

1. 選擇 **kinesis-analytics-MyApplication-us-east-1** 角色。

1. 選擇**刪除角色**，然後確認刪除。

## 刪除您的 CloudWatch 資源
<a name="getting-started-cleanup-cw"></a>

1. 在以下網址開啟 CloudWatch 主控台：[https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)。

1. 在導覽列中，選擇**日誌**。

1. 選擇 **/aws/kinesis-analytics/MyApplication** 日誌群組。

1. 選擇**刪除日誌群組**，然後確認刪除。

## 探索 Apache Flink 的其他資源
<a name="getting-started-cleanup-next-step-5"></a>

[探索其他資源](getting-started-next-steps.md)

# 探索其他資源
<a name="getting-started-next-steps"></a>

現在您已建立並執行 Managed Service for Apache Flink 應用程式，請參閱下列資源，取得更進階的 Managed Service for Apache Flink 解決方案。
+ **[Amazon Managed Service for Apache Flink 研討會](https://catalog.workshops.aws/managed-flink)：**在本研討會中，您將建置end-to-end串流架構，以近乎即時的方式擷取、分析和視覺化串流資料。您著手改善紐約市一家計程車公司的運營。您可以近乎即時地分析紐約市計程車車隊的遙測資料，以最佳化其車隊運作。
+ **[建立和使用 Managed Service for Apache Flink 應用程式的範例](examples-collapsibles.md)：**本開發人員指南的這一節提供了在 Managed Service for Apache Flink 中建立和使用應用程式的範例。其中包含範例程式碼和逐步指示，可協助您建立 Managed Service for Apache Flink 應用程式並測試結果。
+ **[學習 Flink：動手訓練：](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/learn-flink/overview/)**官方介紹性 Apache Flink 訓練課程，可協助您開始撰寫可擴展的串流 ETL、分析和事件驅動型應用程式。