

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 建立並執行 Managed Service for Apache Flink 應用程式
<a name="get-started-exercise"></a>

在此步驟中，您會建立 Managed Service for Apache Flink 應用程式，並將 Kinesis 資料串流做為來源和接收器。

**Topics**
+ [建立相依資源](#get-started-exercise-0)
+ [設定您的本機開發環境](#get-started-exercise-2)
+ [下載並檢查 Apache Flink 串流 Java 程式碼](#get-started-exercise-5)
+ [將範例記錄寫入輸入串流](#get-started-exercise-5-4)
+ [在本機執行您的應用程式](#get-started-exercise-5-run)
+ [觀察 Kinesis 串流中的輸入和輸出資料](#get-started-exercise-input-output)
+ [停止應用程式在本機執行](#get-started-exercise-stop)
+ [編譯和封裝您的應用程式程式碼](#get-started-exercise-5-5)
+ [上傳應用程式碼 JAR 檔案](#get-started-exercise-6)
+ [建立和設定 Managed Service for Apache Flink 應用程式](#get-started-exercise-7)
+ [下一步驟](#get-started-exercise-next-step-4)

## 建立相依資源
<a name="get-started-exercise-0"></a>

在為本練習建立 Managed Service for Apache Flink 應用程式之前，先建立下列相依資源：
+ 兩個用於輸入和輸出的 Kinesis 資料串流
+ 存放應用程式程式碼的 Amazon S3 儲存貯體
**注意**  
本教學假設您正在 us-east-1 美國東部 （維吉尼亞北部） 區域中部署應用程式。如果您使用另一個區域，請相應地調整所有步驟。

### 建立兩個 Amazon Kinesis 資料串流
<a name="get-started-exercise-1"></a>

在為本練習建立 Managed Service for Apache Flink 應用程式之前，請先建立兩個 Kinesis 資料串流 (`ExampleInputStream` 和 `ExampleOutputStream`)。您的應用程式會將這些串流用於應用程式來源和目的地串流。

您可以使用 Amazon Kinesis 主控台或下列 AWS CLI 命令來建立這些串流。如需主控台指示，請參閱《Amazon Kinesis Data Streams 開發人員指南》中的[建立和更新資料串流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)**。若要使用 建立串流 AWS CLI，請使用下列命令，調整至您用於應用程式的 區域。

**建立資料串流 (AWS CLI)**

1. 若要建立第一個串流 (`ExampleInputStream`)，請使用下列 Amazon Kinesis `create-stream` AWS CLI 命令：

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-east-1 \
   ```

1. 若要建立應用程式用來寫入輸出的第二個串流，請執行相同的命令，將串流名稱變更為 `ExampleOutputStream`：

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-east-1 \
   ```

### 為應用程式碼建立 Amazon S3 儲存貯體
<a name="get-started-exercise-1-5"></a>

您可以使用主控台建立 Amazon S3 儲存貯體。若要了解如何使用主控台建立 Amazon S3 儲存貯體，請參閱《[Amazon S3 使用者指南](https://docs.aws.amazon.com/AmazonS3/latest/userguide/)》中的[建立儲存貯體](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)。使用全域唯一名稱命名 Amazon S3 儲存貯體，例如透過附加您的登入名稱。

**注意**  
 請確定您在用於本教學課程的 區域中建立儲存貯體 (us-east-1)。

### 其他資源
<a name="get-started-exercise-1-6"></a>

當您建立應用程式時，Managed Service for Apache Flink 會在資源不存在時自動建立下列 Amazon CloudWatch 資源：
+ 名為 `/AWS/KinesisAnalytics-java/<my-application>` 的日誌群組。
+ 名為 `kinesis-analytics-log-stream` 的日誌串流。

## 設定您的本機開發環境
<a name="get-started-exercise-2"></a>

對於開發和偵錯，您可以直接從您選擇的 IDE 在機器上執行 Apache Flink 應用程式。任何 Apache Flink 相依性都會像一般 Java 相依性一樣使用 Apache Maven 來處理。

**注意**  
在您的開發機器上，您必須安裝 Java JDK 11、Maven 和 Git。我們建議您使用開發環境，例如 [Eclipse Java Neon](https://www.eclipse.org/downloads/packages/release/neon/3) 或 [IntelliJ IDEA](https://www.jetbrains.com/idea/)。若要驗證您是否符合所有先決條件，請參閱 [滿足完成練習的先決條件](getting-started.md#setting-up-prerequisites)。**您不需要**在機器上安裝 Apache Flink 叢集。

### 驗證您的 AWS 工作階段
<a name="get-started-exercise-2-5"></a>

應用程式使用 Kinesis 資料串流來發佈資料。在本機執行時，您必須擁有有效的已 AWS 驗證工作階段，具有寫入 Kinesis 資料串流的許可。使用下列步驟來驗證您的工作階段：

1. 如果您沒有設定 AWS CLI 和具有有效登入資料的具名設定檔，請參閱 [設定 AWS Command Line Interface (AWS CLI)](setup-awscli.md)。

1. 透過發佈下列測試記錄，確認您的 AWS CLI 已正確設定，且您的使用者具有寫入 Kinesis 資料串流的許可：

   ```
   $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
   ```

1. 如果您的 IDE 有要整合的外掛程式 AWS，您可以使用它將登入資料傳遞至在 IDE 中執行的應用程式。如需詳細資訊，請參閱 [AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/) 和 [AWS Toolkit for Eclipse](https://docs.aws.amazon.com/toolkit-for-eclipse/v1/user-guide/welcome.html)。

## 下載並檢查 Apache Flink 串流 Java 程式碼
<a name="get-started-exercise-5"></a>

此範例的 Java 應用程式的程式碼可從 GitHub 下載。若要下載應用程式的程式碼，請執行下列動作：

1. 使用以下指令複製遠端儲存庫：

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. 導覽至 `amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted` 目錄。

### 檢閱應用程式元件
<a name="get-started-exercise-5-1"></a>

應用程式完全在 `com.amazonaws.services.msf.BasicStreamingJob`類別中實作。`main()` 方法定義處理串流資料和執行資料的資料流程。

**注意**  
為了獲得最佳化的開發人員體驗，應用程式設計為在 Amazon Managed Service for Apache Flink 和本機上執行，在 IDE 中進行開發時不會有任何程式碼變更。
+ 若要讀取執行時間組態，以便在 Amazon Managed Service for Apache Flink 和 IDE 中執行時運作，應用程式會自動偵測它是否在 IDE 中於本機獨立執行。在這種情況下，應用程式會以不同的方式載入執行期組態：

  1. 當應用程式偵測到其在您的 IDE 中以獨立模式執行時，請形成包含在專案**資源**資料夾中`application_properties.json`的檔案。檔案的內容如下。

  1. 當應用程式在 Amazon Managed Service for Apache Flink 中執行時，預設行為會從您在 Amazon Managed Service for Apache Flink 應用程式中定義的執行期屬性載入應用程式組態。請參閱 [建立和設定 Managed Service for Apache Flink 應用程式](#get-started-exercise-7)。

     ```
     private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
         if (env instanceof LocalStreamEnvironment) {
             LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
             return KinesisAnalyticsRuntime.getApplicationProperties(
                     BasicStreamingJob.class.getClassLoader()
                             .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
         } else {
             LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
             return KinesisAnalyticsRuntime.getApplicationProperties();
         }
     }
     ```
+ `main()` 方法定義應用程式資料流程並執行它。
  + 初始化預設串流環境。在此範例中，我們會示範如何建立`StreamExecutionEnvironment`要與 DataSteam API 搭配使用的 ，以及`StreamTableEnvironment`要與 SQL 和資料表 API 搭配使用的 。兩個環境物件是相同執行時間環境的兩個不同參考，以使用不同的 APIs。

    ```
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    ```
  + 載入應用程式組態參數。這會自動從正確的位置載入它們，視應用程式執行的位置而定：

    ```
    Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    ```
  + 應用程式使用 [Kinesis Consumer](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/#kinesis-consumer) 連接器定義來源，以從輸入串流讀取資料。輸入串流的組態在 `PropertyGroupId`= 中定義`InputStream0`。串流的名稱和區域分別位於名為 `stream.name`和 的屬性中`aws.region`。為了簡化，此來源會將記錄讀取為字串。

    ```
    private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) {
        String inputStreamName = inputProperties.getProperty("stream.name");
        return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties);
    }
    ...
    
    public static void main(String[] args) throws Exception { 
       ...
       SourceFunction<String> source = createSource(applicationParameters.get("InputStream0"));
       DataStream<String> input = env.addSource(source, "Kinesis Source");  
       ...
    }
    ```
  + 應用程式接著會使用 [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/#kinesis-streams-sink) 連接器定義接收器，將資料傳送至輸出串流。輸出串流名稱和區域在 `PropertyGroupId`= 中定義`OutputStream0`，類似於輸入串流。接收器會直接連接到從來源`DataStream`取得資料的 內部。在實際的應用程式中，您會在來源和接收之間進行一些轉換。

    ```
    private static KinesisStreamsSink<String> createSink(Properties outputProperties) {
        String outputStreamName = outputProperties.getProperty("stream.name");
        return KinesisStreamsSink.<String>builder()
                .setKinesisClientProperties(outputProperties)
                .setSerializationSchema(new SimpleStringSchema())
                .setStreamName(outputStreamName)
                .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
                .build();
    }
    ...
    public static void main(String[] args) throws Exception { 
       ...
       Sink<String> sink = createSink(applicationParameters.get("OutputStream0"));
       input.sinkTo(sink);
       ...
    }
    ```
  + 最後，執行您剛定義的資料流程。這必須是`main()`方法的最後一個指示，在您定義所有運算子之後，資料流程需要：

    ```
    env.execute("Flink streaming Java API skeleton");
    ```

### 使用 pom.xml 檔案
<a name="get-started-exercise-5-2"></a>

pom.xml 檔案會定義應用程式所需的所有相依性，並設定 Maven Shade 外掛程式來建置包含 Flink 所需所有相依性的 fat-jar。
+ 有些相依性具有`provided`範圍。當應用程式在 Amazon Managed Service for Apache Flink 中執行時，這些相依性會自動可用。它們需要用來編譯應用程式，或在 IDE 中本機執行應用程式。如需詳細資訊，請參閱[在本機執行您的應用程式](#get-started-exercise-5-run)。請確定您使用的 Flink 版本與您在 Amazon Managed Service for Apache Flink 中使用的執行時間相同。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ 您必須使用預設範圍將額外的 Apache Flink 相依性新增至 pom，例如此應用程式使用的 [Kinesis 連接器](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kinesis/)。如需詳細資訊，請參閱[使用 Apache Flink 連接器](how-flink-connectors.md)。您也可以新增應用程式所需的任何其他 Java 相依性。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kinesis</artifactId>
      <version>${aws.connector.version}</version>
  </dependency>
  ```
+ Maven Java 編譯器外掛程式可確保程式碼是根據 Apache Flink 目前支援的 JDK 版本 Java 11 編譯。
+ Maven Shade 外掛程式會封裝 fat-jar，不包括執行時間提供的一些程式庫。它也會指定兩個轉換器： `ServicesResourceTransformer`和 `ManifestResourceTransformer`。後者會設定 類別，其中包含啟動應用程式的 `main`方法。如果您重新命名主類別，請不要忘記更新此轉換器。
+ 

  ```
  <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      ...
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass>
          </transformer>
      ...
  </plugin>
  ```

## 將範例記錄寫入輸入串流
<a name="get-started-exercise-5-4"></a>

在本節中，您將傳送範例記錄到串流，供應用程式處理。您可以使用 Python 指令碼或 [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator) 產生範例資料。

### 使用 Python 指令碼產生範例資料
<a name="get-started-exercise-5-4-1"></a>

您可以使用 Python 指令碼將範例記錄傳送至串流。

**注意**  
若要執行此 Python 指令碼，您必須使用 Python 3.x 並安裝[AWS 適用於 Python (Boto) 的 SDK](https://aws.amazon.com/developer/language/python/) 程式庫。

**若要開始將測試資料傳送至 Kinesis 輸入串流：**

1. 從資料產生器 [ GitHub 儲存庫下載資料產生器](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/data-generator) `stock.py` Python 指令碼。

1. 執行 `stock.py` 指令碼：

   ```
   $ python stock.py
   ```

在您完成教學課程的其餘部分時，請讓指令碼保持執行。您現在可以執行 Apache Flink 應用程式。

### 使用 Kinesis Data Generator 產生範例資料
<a name="get-started-exercise-5-4-2"></a>

或者，若要使用 Python 指令碼，您可以使用[託管版本](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html)中也提供的 [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator)，將隨機範例資料傳送至串流。Kinesis Data Generator 會在您的瀏覽器中執行，您不需要在機器上安裝任何項目。

**若要設定和執行 Kinesis Data Generator：**

1. 遵循 [Kinesis Data Generator 文件](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)中的指示來設定對工具的存取。您將執行設定使用者和密碼的 CloudFormation 範本。

1. 透過 CloudFormation 範本產生的 URL 存取 Kinesis Data Generator。CloudFormation 範本完成後，您可以在**輸出**索引標籤中找到 URL。

1. 設定資料產生器：
   + **區域：**選取您用於本教學課程的區域：us-east-1
   + **串流/交付串流：**選取應用程式將使用的輸入串流： `ExampleInputStream`
   + **每秒記錄數：**100
   + **記錄範本：**複製並貼上下列範本：

     ```
     {
       "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}},
       "ticker" : "{{random.arrayElement(
             ["AAPL", "AMZN", "MSFT", "INTC", "TBV"]
         )}}",
       "price" : {{random.number(100)}}          
     }
     ```

1. 測試範本：選擇**測試範本**，並確認產生的記錄與以下內容類似：

   ```
   { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
   ```

1. 啟動資料產生器：選擇**選取傳送資料**。

Kinesis Data Generator 現在正在將資料傳送至 `ExampleInputStream`。

## 在本機執行您的應用程式
<a name="get-started-exercise-5-run"></a>

您可以在 IDE 中的本機執行和偵錯 Flink 應用程式。

**注意**  
繼續之前，請確認輸入和輸出串流是否可用。請參閱 [建立兩個 Amazon Kinesis 資料串流](#get-started-exercise-1)。此外，請確認您具有從兩個串流讀取和寫入的許可。請參閱 [驗證您的 AWS 工作階段](#get-started-exercise-2-5)。  
設定本機開發環境需要 Java 11 JDK、Apache Maven 和 以及適用於 Java 開發的 IDE。確認您符合必要的先決條件。請參閱 [滿足完成練習的先決條件](getting-started.md#setting-up-prerequisites)。

### 將 Java 專案匯入 IDE
<a name="get-started-exercise-5-run-1"></a>

若要開始在 IDE 中使用應用程式，您必須將其匯入為 Java 專案。

您複製的儲存庫包含多個範例。每個範例都是個別的專案。在本教學課程中，將`./java/GettingStarted`子目錄中的內容匯入 IDE。

使用 Maven 將程式碼插入為現有的 Java 專案。

**注意**  
匯入新 Java 專案的確切程序取決於您使用的 IDE。

### 檢查本機應用程式組態
<a name="get-started-exercise-5-run-2"></a>

在本機執行時，應用程式會使用 下專案資源資料夾中 `application_properties.json` 檔案中的組態`./src/main/resources`。您可以編輯此檔案以使用不同的 Kinesis 串流名稱或區域。

```
[
  {
    "PropertyGroupId": "InputStream0",
    "PropertyMap": {
      "stream.name": "ExampleInputStream",
      "flink.stream.initpos": "LATEST",
      "aws.region": "us-east-1"
    }
  },
  {
    "PropertyGroupId": "OutputStream0",
    "PropertyMap": {
      "stream.name": "ExampleOutputStream",
      "aws.region": "us-east-1"
    }
  }
]
```

### 設定 IDE 執行組態
<a name="get-started-exercise-5-run-3"></a>

您可以執行主要類別 ，直接從 IDE 執行和偵錯 Flink 應用程式`com.amazonaws.services.msf.BasicStreamingJob`，就像執行任何 Java 應用程式一樣。在執行應用程式之前，您必須設定執行組態。設定取決於您使用的 IDE。例如，請參閱 IntelliJ IDEA 文件中的[執行/偵錯組態](https://www.jetbrains.com/help/idea/run-debug-configuration.html)。特別是，您必須設定下列項目：

1. **將`provided`相依性新增至 classpath**。這是必要的，以確保在本機執行時，具有`provided`範圍的相依性會傳遞至應用程式。如果沒有此設定，應用程式會立即顯示`class not found`錯誤。

1. **傳遞 AWS 登入資料以存取應用程式的 Kinesis 串流**。最快的方法是使用 [AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/)。在執行組態中使用此 IDE 外掛程式，您可以選取特定 AWS 設定檔。 AWS 身分驗證會使用此設定檔進行。您不需要直接傳遞 AWS 登入資料。

1. 確認 IDE 使用 **JDK 11 **執行應用程式。

### 在 IDE 中執行應用程式
<a name="get-started-exercise-5-run-4"></a>

設定 的執行組態後`BasicStreamingJob`，您可以像一般 Java 應用程式一樣執行或偵錯組態。

**注意**  
您無法`java -jar ...`直接從命令列使用 執行 Maven 產生的 fat-jar。此 jar 不包含獨立執行應用程式所需的 Flink 核心相依性。

當應用程式成功啟動時，它會記錄有關獨立迷你叢集和連接器初始化的一些資訊。這後面接著一些 INFO 和一些 WARN 日誌，Flink 通常會在應用程式啟動時發出。

```
13:43:31,405 INFO  com.amazonaws.services.msf.BasicStreamingJob                 [] - Loading application properties from 'flink-application-properties-dev.json'
13:43:31,549 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb.
13:43:31,677 INFO  org.apache.flink.runtime.minicluster.MiniCluster             [] - Starting Flink Mini Cluster
....
```

初始化完成後，應用程式不會發出任何進一步的日誌項目。**資料流動時，不會發出日誌。**

若要驗證應用程式是否正確處理資料，您可以檢查輸入和輸出 Kinesis 串流，如下節所述。

**注意**  
 不發出有關流動資料的日誌是 Flink 應用程式的正常行為。在每個記錄上發出日誌對於偵錯可能很方便，但在生產環境中執行時可能會增加大量的額外負荷。

## 觀察 Kinesis 串流中的輸入和輸出資料
<a name="get-started-exercise-input-output"></a>

您可以使用 Amazon Kinesis 主控台中的**資料檢視器**，觀察 （產生範例 Python) 或 Kinesis Data Generator （連結） 傳送至輸入串流的記錄。 Amazon Kinesis 

**若要觀察記錄**

1. 在以下網址開啟 Kinesis 主控台：[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)。

1. 確認您執行本教學課程的區域相同，預設為 us-east-1 美國東部 （維吉尼亞北部）。如果區域不相符，請變更區域。

1. 選擇**資料串流**。

1. 選取您要觀察的串流，或 `ExampleInputStream` `ExampleOutputStream.`

1. 選擇**資料檢視器**標籤。

1. 選擇任何**碎片**，保持**最新**為**開始位置**，然後選擇**取得記錄**。您可能會看到「找不到此請求的記錄」錯誤。若是如此，請選擇**重試取得記錄**。發佈至串流顯示的最新記錄。

1. 選擇資料欄中的值，以 JSON 格式檢查記錄的內容。

## 停止應用程式在本機執行
<a name="get-started-exercise-stop"></a>

停止在 IDE 中執行的應用程式。IDE 通常提供「停止」選項。確切的位置和方法取決於您使用的 IDE。

## 編譯和封裝您的應用程式程式碼
<a name="get-started-exercise-5-5"></a>

在本節中，您會使用 Apache Maven 編譯 Java 程式碼，並將其封裝成 JAR 檔案。您可以使用 Maven 命令列工具或 IDE 來編譯和封裝程式碼。

**若要使用 Maven 命令列編譯和封裝：**

移至包含 Java GettingStarted 專案的目錄，並執行下列命令：

```
$ mvn package
```

**若要使用 IDE 編譯和封裝：**

`mvn package` 從 IDE Maven 整合執行 。

在這兩種情況下，都會建立下列 JAR 檔案：`target/amazon-msf-java-stream-app-1.0.jar`。

**注意**  
 從 IDE 執行「建置專案」可能不會建立 JAR 檔案。

## 上傳應用程式碼 JAR 檔案
<a name="get-started-exercise-6"></a>

在本節中，您將您在上一節中建立的 JAR 檔案上傳至在本教學課程開始時建立的 Amazon Simple Storage Service (Amazon S3) 儲存貯體。如果您尚未完成此步驟，請參閱 （連結）。

**上傳應用程式碼 JAR 檔案**

1. 開啟位於 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/) 的 Amazon S3 主控台。

1. 選擇您先前為應用程式碼建立的儲存貯體。

1. 選擇**上傳**。

1. 選擇 **Add files (新增檔案)**。

1. 導覽至上一個步驟中產生的 JAR 檔案：`target/amazon-msf-java-stream-app-1.0.jar`。

1. 選擇**上傳**，而不變更任何其他設定。

**警告**  
請務必在 中選取正確的 JAR 檔案`<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar`。  
`target` 目錄也包含您不需要上傳的其他 JAR 檔案。

## 建立和設定 Managed Service for Apache Flink 應用程式
<a name="get-started-exercise-7"></a>

您可以使用主控台或 AWS CLI建立和執行 Managed Service for Apache Flink 應用程式。在本教學課程中，您將使用 主控台。

**注意**  
當您使用 主控台建立應用程式時，系統會為您建立 AWS Identity and Access Management (IAM) 和 Amazon CloudWatch Logs 資源。當您使用 建立應用程式時 AWS CLI，您可以分別建立這些資源。

**Topics**
+ [建立應用程式](#get-started-exercise-7-console-create)
+ [編輯 IAM 政策](#get-started-exercise-7-console-iam)
+ [設定應用程式](#get-started-exercise-7-console-configure)
+ [執行應用程式](#get-started-exercise-7-console-run)
+ [觀察執行中應用程式的指標](#get-started-exercise-7-console-stop)
+ [觀察 Kinesis 串流中的輸出資料](#get-started-exercise-7-console-output)
+ [停止應用程式](#get-started-exercise-stop)

### 建立應用程式
<a name="get-started-exercise-7-console-create"></a>

**建立應用程式**

1. 登入 AWS 管理主控台，並在 https：//https://console.aws.amazon.com/flink 開啟 Amazon MSF 主控台。

1. 確認已選取正確的區域：us-east-1 美國東部 （維吉尼亞北部）

1. 開啟右側的選單，然後選擇 **Apache Flink 應用程式**，然後選擇**建立串流應用程式**。或者，在初始頁面的入門容器中選擇**建立串流應用程式**。

1. 在**建立串流應用程式**頁面上：
   + **選擇設定串流處理應用程式的方法：**選擇**從頭開始建立**。
   + **Apache Flink 組態、Application Flink 版本：**選擇 **Apache Flink 1.20**。

1. 設定您的應用程式
   + **應用程式名稱：**輸入 **MyApplication**。
   + **描述：**輸入 **My java test app**。
   + **存取應用程式資源：**選擇**`kinesis-analytics-MyApplication-us-east-1`使用必要政策建立/更新 IAM 角色**。

1. 設定**您的範本以進行應用程式設定**
   + **範本：**選擇**開發**。

1. 選擇頁面底部的**建立串流應用程式**。

**注意**  
使用主控台建立 Managed Service for Apache Flink 應用程式時，可以選擇是否為應用程式建立 IAM 角色和政策。應用程式使用此角色和政策來存取其相依資源。這些 IAM 資源會如下所述使用您的應用程式名稱和區域命名：  
政策：`kinesis-analytics-service-MyApplication-us-east-1`
角色：`kinesisanalytics-MyApplication-us-east-1`
Amazon Managed Service for Apache Flink 先前稱為 Kinesis Data Analytics。自動建立的資源名稱會加上 字首，`kinesis-analytics-`以確保回溯相容性。

### 編輯 IAM 政策
<a name="get-started-exercise-7-console-iam"></a>

編輯 IAM 政策來新增存取 Kinesis 資料串流的許可。

**編輯政策**

1. 前往 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 開啟 IAM 主控台。

1. 選擇**政策**。選擇主控台為您在上一節所建立的 **`kinesis-analytics-service-MyApplication-us-east-1`** 政策。

1. 選擇**編輯**，然後選擇 **JSON** 索引標籤。

1. 將下列政策範例的反白部分新增至政策。使用您的帳戶 ID 取代範例帳戶 ID (*012345678901*)。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

1.  選擇頁面底部的**下一步**，然後選擇**儲存變更**。

### 設定應用程式
<a name="get-started-exercise-7-console-configure"></a>

編輯應用程式組態以設定應用程式程式碼成品。

**編輯組態**

1. 在 **MyApplication** 頁面，選擇**設定**。

1. 在**應用程式碼位置**區段中：
   + 針對 **Amazon S3 儲存貯**體，選取您先前為應用程式碼建立的儲存貯體。選擇**瀏覽**並選取正確的儲存貯體，然後選取**選擇**。請勿按一下儲存貯體名稱。
   + 對於 **Amazon S3 物件的路徑**，請輸入 **amazon-msf-java-stream-app-1.0.jar**。

1. 針對**存取許可**，選擇**`kinesis-analytics-MyApplication-us-east-1`使用必要政策建立/更新 IAM 角色**。

1. 在**執行期屬性**區段中，新增下列屬性。

1. 選擇**新增項目**並新增下列每個參數：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/get-started-exercise.html)

1. 請勿修改任何其他區段。

1. 選擇**儲存變更**。

**注意**  
當您選擇啟用 Amazon CloudWatch 日誌時，Managed Service for Apache Flink 便會為您建立日誌群組和日誌串流。這些資源的名稱如下所示：  
日誌群組：`/aws/kinesis-analytics/MyApplication`
日誌串流：`kinesis-analytics-log-stream`

### 執行應用程式
<a name="get-started-exercise-7-console-run"></a>

應用程式現在已設定並準備好執行。

**執行應用程式**

1. 在 Amazon Managed Service for Apache Flink 的 主控台上，選擇**我的應用程式**，然後選擇**執行**。

1. 在下一頁的應用程式還原組態頁面上，選擇**使用最新的快照執行**，然後選擇**執行**。

   **應用程式詳細資訊**中的**狀態**會從 `Ready` 轉換為 `Starting` ，然後在應用程式啟動`Running`時轉換為 。

當應用程式處於 `Running` 狀態時，您現在可以開啟 Flink 儀表板。

**開啟 儀表板**

1. 選擇**開啟 Apache Flink 儀表板**。儀表板會在新頁面上開啟。

1. 在**執行中任務**清單中，選擇您可以看到的單一任務。
**注意**  
如果您設定執行期屬性或不正確地編輯 IAM 政策，應用程式狀態可能會變成 `Running`，但 Flink 儀表板會顯示任務正在持續重新啟動。如果應用程式設定錯誤或缺少存取外部資源的許可，這是常見的失敗案例。  
發生這種情況時，請檢查 Flink 儀表板中的**例外**狀況索引標籤，以查看問題的原因。

### 觀察執行中應用程式的指標
<a name="get-started-exercise-7-console-stop"></a>

在 **MyApplication** 頁面的 **Amazon CloudWatch 指標**區段中，您可以從執行中的應用程式查看一些基本指標。

**檢視指標**

1. 在**重新整理**按鈕旁，從下拉式清單中選取 **10 秒**。

1. 當應用程式執行正常時，您可以看到**運作時間**指標持續增加。

1. **fullrestarts** 指標應為零。如果增加，組態可能會發生問題。若要調查問題，請檢閱 Flink 儀表板上的**例外狀況**索引標籤。

1. 在運作狀態良好的應用程式中，**失敗檢查點指標的數量**應為零。
**注意**  
此儀表板會顯示一組固定的指標，精細程度為 5 分鐘。您可以使用 CloudWatch 儀表板中的任何指標來建立自訂應用程式儀表板。

### 觀察 Kinesis 串流中的輸出資料
<a name="get-started-exercise-7-console-output"></a>

請確定您仍在使用 Python 指令碼或 Kinesis Data Generator 將資料發佈至輸入。

您現在可以使用 [https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/)：// 中的資料檢視器來觀察 Managed Service for Apache Flink 上執行之應用程式的輸出，與先前已執行的操作類似。

**檢視輸出**

1. 在以下網址開啟 Kinesis 主控台：[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)。

1. 確認區域與您用來執行本教學課程的區域相同。根據預設，它是 us-east-1US East (N. Virginia)。視需要變更 區域。

1. 選擇**資料串流**。

1. 選取您要觀察的串流。在本教學課程中，使用 `ExampleOutputStream`。

1.  選擇**資料檢視器**標籤。

1. 選取任何**碎片**，保持**最新**為**開始位置**，然後選擇**取得記錄**。您可能會看到「找不到此請求的記錄」錯誤。若是如此，請選擇**重試取得記錄**。發佈至串流顯示的最新記錄。

1. 選取資料欄中的值，以 JSON 格式檢查記錄的內容。

### 停止應用程式
<a name="get-started-exercise-stop"></a>

若要停止應用程式，請前往名為 的 Managed Service for Apache Flink 應用程式的主控台頁面`MyApplication`。

**停止應用程式**

1. 從**動作**下拉式清單中，選擇**停止**。

1. **應用程式詳細資訊**中的**狀態**會從 轉換為 `Running` `Stopping`，然後在應用程式完全停止`Ready`時轉換為 。
**注意**  
別忘了也要停止從 Python 指令碼或 Kinesis Data Generator 將資料傳送至輸入串流。

## 下一步驟
<a name="get-started-exercise-next-step-4"></a>

[清除 AWS 資源](getting-started-cleanup.md)