

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 建立並執行 Managed Service for Apache Flink for Python 應用程式
<a name="gs-python-createapp"></a>

在本節中，您會建立適用於 Python 應用程式的 Managed Service for Apache Flink 應用程式，並以 Kinesis 串流做為來源和接收器。

**Topics**
+ [

## 建立相依資源
](#gs-python-resources)
+ [

## 設定您的本機開發環境
](#gs-python-set-up)
+ [

## 下載並檢查 Apache Flink 串流 Python 程式碼
](#gs-python-download)
+ [

## 管理 JAR 相依性
](#gs-python-jar-dependencies)
+ [

## 將範例記錄寫入輸入串流
](#gs-python-sample-records)
+ [

## 在本機執行您的應用程式
](#gs-python-run-locally)
+ [

## 觀察 Kinesis 串流中的輸入和輸出資料
](#gs-python-observe-input-output)
+ [

## 停止應用程式在本機執行
](#gs-python-stop)
+ [

## 封裝您的應用程式程式碼
](#gs-python-package-code)
+ [

## 將應用程式套件上傳至 Amazon S3 儲存貯體
](#gs-python-upload-bucket)
+ [

## 建立和設定 Managed Service for Apache Flink 應用程式
](#gs-python-7)
+ [

## 下一步驟
](#gs-python-next-step-4)

## 建立相依資源
<a name="gs-python-resources"></a>

在為本練習建立 Managed Service for Apache Flink 之前，先建立下列相依資源：
+ 兩個 Kinesis 串流，用於輸入和輸出。
+ 存放應用程式程式碼的 Amazon S3 儲存貯體。

**注意**  
本教學假設您正在 us-east-1 區域中部署應用程式。如果您使用另一個區域，則必須相應地調整所有步驟。

### 建立兩個 Kinesis 串流
<a name="gs-python-resources-streams"></a>

為此練習建立 Managed Service for Apache Flink 應用程式之前，請在將用於部署應用程式的相同區域中建立兩個 Kinesis 資料串流 (`ExampleInputStream` 和 `ExampleOutputStream`) （在此範例中為 us-east-1)。您的應用程式會將這些串流用於應用程式來源和目的地串流。

您可以使用 Amazon Kinesis 主控台或以下 AWS CLI 命令來建立這些串流。如需主控台指示，請參閱《Amazon Kinesis Data Streams 開發人員指南》中的[建立和更新資料串流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)**。

**建立資料串流 (AWS CLI)**

1. 若要建立第一個串流 (`ExampleInputStream`)，請使用下列 Amazon Kinesis `create-stream` AWS CLI 命令。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-east-1
   ```

1. 若要建立應用程式用來寫入輸出的第二個串流，請執行相同的命令，將串流名稱變更為 `ExampleOutputStream`。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-east-1
   ```

### 建立 Amazon S3 儲存貯體
<a name="gs-python-resources-s3"></a>

您可以使用主控台建立 Amazon S3 儲存貯體。如需建立這些資源的相關指示，請參閱以下主題：
+ 《Amazon Simple Storage Service 使用者指南》中的[如何建立 S3 儲存貯體](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html)。**為 Amazon S3 儲存貯體提供全域唯一名稱，例如透過附加您的登入名稱。
**注意**  
請確定您在用於本教學課程的區域中建立 S3 儲存貯體 (us-east-1)。

### 其他資源
<a name="gs-python-resources-cw"></a>

建立應用程式時，Managed Service for Apache Flink 會建立下列 Amazon CloudWatch 資源 (如果尚不存在該資源)：
+ 名為 `/AWS/KinesisAnalytics-java/<my-application>` 的日誌群組。
+ 名為 `kinesis-analytics-log-stream` 的日誌串流。

## 設定您的本機開發環境
<a name="gs-python-set-up"></a>

對於開發和偵錯，您可以在機器上執行 Python Flink 應用程式。您可以在您選擇的 Python IDE 中使用 `python main.py`或 從命令列啟動應用程式。

**注意**  
在您的開發機器上，您必須安裝 Python 3.10 或 3.11、Java 11、Apache Maven 和 Git。我們建議您使用 IDE，例如 [PyCharm](https://www.jetbrains.com/pycharm/) 或 [Visual Studio Code](https://code.visualstudio.com/)。若要驗證您是否符合所有先決條件，請先參閱 [滿足完成練習的先決條件](gs-python.md#gs-python-prerequisites) 再繼續。

### 安裝 PyFlink 程式庫
<a name="gs-python-install-pyflink"></a>

若要開發應用程式並在本機執行，您必須安裝 Flink Python 程式庫。

1. 使用 VirtualEnv、Conda 或任何類似的 Python 工具建立獨立的 Python 環境。

1. 在該環境中安裝 PyFlink 程式庫。使用您在 Amazon Managed Service for Apache Flink 中使用的相同 Apache Flink 執行時間版本。目前，建議的執行時間為 1.19.1。

   ```
   $ pip install apache-flink==1.19.1
   ```

1. 執行應用程式時，請確定環境處於作用中狀態。如果您在 IDE 中執行應用程式，請確定 IDE 使用環境做為執行時間。程序取決於您使用的 IDE。
**注意**  
您只需要安裝 PyFlink 程式庫。**您不需要**在機器上安裝 Apache Flink 叢集。

### 驗證您的 AWS 工作階段
<a name="gs-python-authenticate"></a>

應用程式使用 Kinesis 資料串流來發佈資料。在本機執行時，您必須擁有有效的已 AWS 驗證工作階段，具有寫入 Kinesis 資料串流的許可。使用下列步驟來驗證您的工作階段：

1. 如果您沒有設定 AWS CLI 和具有有效登入資料的具名設定檔，請參閱 [設定 AWS Command Line Interface (AWS CLI)](setup-awscli.md)。

1. 透過發佈下列測試記錄，確認您的 AWS CLI 已正確設定，且您的使用者具有寫入 Kinesis 資料串流的許可：

   ```
   $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
   ```

1. 如果您的 IDE 有要整合的外掛程式 AWS，您可以使用它將登入資料傳遞至在 IDE 中執行的應用程式。如需詳細資訊，請參閱 [AWS Toolkit for PyCharm](https://aws.amazon.com/pycharm/)、 [AWS Toolkit for Visual Studio Code](https://aws.amazon.com/visualstudiocode/) 和 [AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/)。

## 下載並檢查 Apache Flink 串流 Python 程式碼
<a name="gs-python-download"></a>

此範例的 Python 應用程式的程式碼可從 GitHub 下載。若要下載應用程式的程式碼，請執行下列動作：

1. 使用以下指令複製遠端儲存庫：

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. 導覽至 `./python/GettingStarted` 目錄。

### 檢閱應用程式元件
<a name="gs-python-review"></a>

應用程式碼位於 。 `main.py`我們使用內嵌在 Python 中的 SQL 來定義應用程式的流程。

**注意**  
為了獲得最佳化的開發人員體驗，應用程式的設計可在 Amazon Managed Service for Apache Flink 和本機上執行，無需變更任何程式碼，即可在機器上進行開發。應用程式使用 環境變數`IS_LOCAL = true`來偵測它何時在本機執行。您必須在 shell `IS_LOCAL = true`或 IDE 的執行組態中設定環境變數。
+ 應用程式會設定執行環境並讀取執行時間組態。若要同時在 Amazon Managed Service for Apache Flink 和本機上運作，應用程式會檢查 `IS_LOCAL`變數。
  + 以下是應用程式在 Amazon Managed Service for Apache Flink 中執行時的預設行為：

    1. 載入與應用程式一起封裝的相依性。如需詳細資訊，請參閱 （連結）

    1. 從您在 Amazon Managed Service for Apache Flink 應用程式中定義的執行期屬性載入組態。如需詳細資訊，請參閱 （連結）
  + 當應用程式偵測到您在本機執行應用程式`IS_LOCAL = true`時：

    1. 從專案載入外部相依性。

    1. 從專案中包含`application_properties.json`的檔案載入組態。

       ```
       ...
       APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"
       ...
       is_local = (
           True if os.environ.get("IS_LOCAL") else False
       )
       ...
       if is_local:
           APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"
           CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
           table_env.get_config().get_configuration().set_string(
               "pipeline.jars",
               "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar",
           )
       ```
+ 應用程式使用 [Kinesis Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/) 定義具有`CREATE TABLE`陳述式的來源資料表。此資料表會從輸入 Kinesis 串流讀取資料。應用程式會從執行時間組態中取得串流的名稱、區域和初始位置。

  ```
  table_env.execute_sql(f"""
          CREATE TABLE prices (
                  ticker VARCHAR(6),
                  price DOUBLE,
                  event_time TIMESTAMP(3),
                  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
                )
                PARTITIONED BY (ticker)
                WITH (
                  'connector' = 'kinesis',
                  'stream' = '{input_stream_name}',
                  'aws.region' = '{input_stream_region}',
                  'format' = 'json',
                  'json.timestamp-format.standard' = 'ISO-8601'
                ) """)
  ```
+ 在此範例中，應用程式也會使用 [Kinesis Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/) 定義接收器資料表。此故事會將資料傳送至輸出 Kinesis 串流。

  ```
  table_env.execute_sql(f"""
              CREATE TABLE output (
                  ticker VARCHAR(6),
                  price DOUBLE,
                  event_time TIMESTAMP(3)
                )
                PARTITIONED BY (ticker)
                WITH (
                  'connector' = 'kinesis',
                  'stream' = '{output_stream_name}',
                  'aws.region' = '{output_stream_region}',
                  'sink.partitioner-field-delimiter' = ';',
                  'sink.batch.max-size' = '100',
                  'format' = 'json',
                  'json.timestamp-format.standard' = 'ISO-8601'
                )""")
  ```
+ 最後，應用程式會從來源資料表執行`INSERT INTO...`接收資料表的 SQL。在更複雜的應用程式中，您可能會有額外的步驟在寫入目的地之前轉換資料。

  ```
  table_result = table_env.execute_sql("""INSERT INTO output 
          SELECT ticker, price, event_time FROM prices""")
  ```
+ 您必須在`main()`函數結尾新增另一個步驟，才能在本機執行應用程式：

  ```
  if is_local:
      table_result.wait()
  ```

  如果沒有此陳述式，應用程式會在您於本機執行時立即終止。當您在 Amazon Managed Service for Apache Flink 中執行應用程式時，不得執行此陳述式。

## 管理 JAR 相依性
<a name="gs-python-jar-dependencies"></a>

PyFlink 應用程式通常需要一或多個連接器。本教學課程中的應用程式使用 [Kinesis Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/)。由於 Apache Flink 在 Java JVM 中執行，無論您是否在 Python 中實作應用程式，連接器都會以 JAR 檔案形式分佈。在 Amazon Managed Service for Apache Flink 上部署這些相依性時，您必須將這些相依性與應用程式一起封裝。

在此範例中，我們會示範如何使用 Apache Maven 來擷取相依性，並封裝應用程式以在 Managed Service for Apache Flink 上執行。

**注意**  
有擷取和封裝相依性的其他方法。此範例示範可正確搭配一或多個連接器使用的方法。它還可讓您在本機、用於開發以及在 Managed Service for Apache Flink 上執行應用程式，而無需變更程式碼。

### 使用 pom.xml 檔案
<a name="gs-python-jar-pom"></a>

Apache Maven 使用 `pom.xml` 檔案來控制相依性和應用程式封裝。

任何 JAR 相依性都會在 `<dependencies>...</dependencies>`區塊的 `pom.xml` 檔案中指定。

```
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    ...
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kinesis</artifactId>
            <version>4.3.0-1.19</version>
        </dependency>
    </dependencies>
    ...
```

若要尋找要使用的正確連接器成品和版本，請參閱 [將 Apache Flink 連接器與 Managed Service for Apache Flink 搭配使用](how-flink-connectors.md)。請務必參考您正在使用的 Apache Flink 版本。在此範例中，我們使用 Kinesis 連接器。對於 Apache Flink 1.19，連接器版本為 `4.3.0-1.19`。

**注意**  
如果您使用的是 Apache Flink 1.19，則沒有為此版本特別發行的連接器版本。使用針對 1.18 發行的連接器。

### 下載和封裝相依性
<a name="gs-python-dependencies-download"></a>

使用 Maven 下載`pom.xml`檔案中定義的相依性，並為 Python Flink 應用程式封裝相依性。

1. 導覽至包含 Python 入門專案的目錄，稱為 `python/GettingStarted`。

1. 執行以下命令：

```
$ mvn package
```

Maven 會建立新的檔案，名為 `./target/pyflink-dependencies.jar`。當您在機器上進行本機開發時，Python 應用程式會尋找此檔案。

**注意**  
如果您忘記執行此命令，當您嘗試執行應用程式時，它會失敗並顯示錯誤： **找不到任何識別符為 "kinesis" 的工廠**。

## 將範例記錄寫入輸入串流
<a name="gs-python-sample-records"></a>

在本節中，您將傳送範例記錄到串流，供應用程式處理。您可以使用 Python 指令碼或 [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator) 產生範例資料。

### 使用 Python 指令碼產生範例資料
<a name="gs-python-sample-data"></a>

您可以使用 Python 指令碼將範例記錄傳送至串流。

**注意**  
若要執行此 Python 指令碼，您必須使用 Python 3.x 並安裝[AWS 適用於 Python (Boto) 的 SDK](https://aws.amazon.com/developer/language/python/) 程式庫。

**若要開始將測試資料傳送至 Kinesis 輸入串流：**

1. 從資料產生器 [ GitHub 儲存庫下載資料產生器](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/data-generator) `stock.py` Python 指令碼。

1. 執行 `stock.py` 指令碼：

   ```
   $ python stock.py
   ```

在您完成教學課程的其餘部分時，請讓指令碼保持執行。您現在可以執行 Apache Flink 應用程式。

### 使用 Kinesis Data Generator 產生範例資料
<a name="gs-python-sample-kinesis"></a>

或者，若要使用 Python 指令碼，您可以使用[託管版本](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html)中也提供的 [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator)，將隨機範例資料傳送至串流。Kinesis Data Generator 會在您的瀏覽器中執行，您不需要在機器上安裝任何項目。

**若要設定和執行 Kinesis Data Generator：**

1. 遵循 [Kinesis Data Generator 文件](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)中的指示來設定對工具的存取。您將執行設定使用者和密碼的 CloudFormation 範本。

1. 透過 CloudFormation 範本產生的 URL 存取 Kinesis Data Generator。CloudFormation 範本完成後，您可以在**輸出**索引標籤中找到 URL。

1. 設定資料產生器：
   + **區域：**選取您用於本教學課程的區域：us-east-1
   + **串流/交付串流：**選取應用程式將使用的輸入串流： `ExampleInputStream`
   + **每秒記錄數：**100
   + **記錄範本：**複製並貼上下列範本：

     ```
     {
       "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}},
       "ticker" : "{{random.arrayElement(
             ["AAPL", "AMZN", "MSFT", "INTC", "TBV"]
         )}}",
       "price" : {{random.number(100)}}          
     }
     ```

1. 測試範本：選擇**測試範本**，並確認產生的記錄與以下內容類似：

   ```
   { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
   ```

1. 啟動資料產生器：選擇**選取傳送資料**。

Kinesis Data Generator 現在正在將資料傳送至 `ExampleInputStream`。

## 在本機執行您的應用程式
<a name="gs-python-run-locally"></a>

您可以在本機測試應用程式，使用 `python main.py`IDE 從命令列執行或從 IDE 執行。

若要在本機執行應用程式，您必須安裝正確的 PyFlink 程式庫版本，如上一節所述。如需詳細資訊，請參閱 （連結）

**注意**  
繼續之前，請確認輸入和輸出串流是否可用。請參閱 [建立兩個 Amazon Kinesis 資料串流](get-started-exercise.md#get-started-exercise-1)。此外，請確認您具有從兩個串流讀取和寫入的許可。請參閱 [驗證您的 AWS 工作階段](get-started-exercise.md#get-started-exercise-2-5)。

### 將 Python 專案匯入 IDE
<a name="gs-python-import"></a>

若要開始在 IDE 中使用應用程式，您必須將其匯入為 Python 專案。

您複製的儲存庫包含多個範例。每個範例都是個別的專案。在本教學課程中，將`./python/GettingStarted`子目錄中的內容匯入 IDE。

將程式碼匯入為現有的 Python 專案。

**注意**  
匯入新 Python 專案的確切程序取決於您使用的 IDE。

### 檢查本機應用程式組態
<a name="gs-python-check-configuration"></a>

在本機執行時，應用程式會使用 下專案資源資料夾中 `application_properties.json` 檔案中的組態`./src/main/resources`。您可以編輯此檔案以使用不同的 Kinesis 串流名稱或區域。

```
[
  {
    "PropertyGroupId": "InputStream0",
    "PropertyMap": {
      "stream.name": "ExampleInputStream",
      "flink.stream.initpos": "LATEST",
      "aws.region": "us-east-1"
    }
  },
  {
    "PropertyGroupId": "OutputStream0",
    "PropertyMap": {
      "stream.name": "ExampleOutputStream",
      "aws.region": "us-east-1"
    }
  }
]
```

### 在本機執行您的 Python 應用程式
<a name="gs-python-run-locally"></a>

您可以從命令列做為一般 Python 指令碼，或從 IDE 在本機執行應用程式。

**從命令列執行您的應用程式**

1. 請確定安裝 Python Flink 程式庫的獨立 Python 環境目前處於作用中狀態，例如 Conda 或 VirtualEnv。

1. 請確定您`mvn package`至少執行一次。

1. 設定 `IS_LOCAL = true` 環境變數：

   ```
   $ export IS_LOCAL=true
   ```

1. 以一般 Python 指令碼執行應用程式。

   ```
   $python main.py
   ```

**從 IDE 中執行應用程式**

1. 將 IDE 設定為使用以下組態執行`main.py`指令碼：

   1. 使用安裝 PyFlink 程式庫的獨立 Python 環境，例如 Conda 或 VirtualEnv。

   1. 使用 AWS 登入資料來存取輸入和輸出 Kinesis 資料串流。

   1. 設定 `IS_LOCAL = true`。

1. 設定執行組態的確切程序取決於您的 IDE 和 。

1. 當您設定 IDE 後，請執行 Python 指令碼，並在應用程式執行時使用 IDE 提供的工具。

### 在本機檢查應用程式日誌
<a name="gs-python-run-IDE"></a>

在本機執行時，除了應用程式啟動時列印和顯示的幾行以外，應用程式不會在主控台中顯示任何日誌。PyFlink 會將日誌寫入安裝 Python Flink 程式庫的目錄中的檔案。應用程式會在日誌啟動時列印日誌的位置。您也可以執行下列命令來尋找日誌：

```
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
```

1. 列出記錄目錄中的檔案。您通常會找到單一`.log`檔案。

1. 在應用程式執行時自訂檔案：`tail -f <log-path>/<log-file>.log`。

## 觀察 Kinesis 串流中的輸入和輸出資料
<a name="gs-python-observe-input-output"></a>

您可以使用 Amazon Kinesis 主控台中的**資料檢視器**，觀察 （產生範例 Python) 或 Kinesis Data Generator （連結） 傳送至輸入串流的記錄。 Amazon Kinesis 

**若要觀察記錄：**  在以下網址開啟 Kinesis 主控台：[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)。  驗證您執行本教學課程的區域是否相同，預設為 us-east-1 美國東部 （維吉尼亞北部）。如果區域不相符，請變更區域。   選擇**資料串流**。   選取您要觀察的串流，或 `ExampleInputStream` `ExampleOutputStream.`   選擇**資料檢視器**標籤。   選擇任何**碎片**，保持**最新**為**開始位置**，然後選擇**取得記錄**。您可能會看到「找不到此請求的記錄」錯誤。若是如此，請選擇**重試取得記錄**。發佈至串流顯示的最新記錄。   選擇資料欄中的值，以 JSON 格式檢查記錄的內容。   

## 停止應用程式在本機執行
<a name="gs-python-stop"></a>

停止在 IDE 中執行的應用程式。IDE 通常提供「停止」選項。確切的位置和方法取決於 IDE。

## 封裝您的應用程式程式碼
<a name="gs-python-package-code"></a>

在本節中，您會使用 Apache Maven 將應用程式程式碼和所有必要的相依性封裝在 .zip 檔案中。

再次執行 Maven 套件命令：

```
$ mvn package
```

此命令會產生 檔案 `target/managed-flink-pyflink-getting-started-1.0.0.zip`。

## 將應用程式套件上傳至 Amazon S3 儲存貯體
<a name="gs-python-upload-bucket"></a>

在本節中，您將上一節中建立的 .zip 檔案上傳至在本教學課程開始時建立的 Amazon Simple Storage Service (Amazon S3) 儲存貯體。如果您尚未完成此步驟，請參閱 （連結）。

**上傳應用程式碼 JAR 檔案**

1. 開啟位於 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/) 的 Amazon S3 主控台。

1. 選擇您先前為應用程式碼建立的儲存貯體。

1. 選擇**上傳**。

1. 選擇 **Add files (新增檔案)**。

1. 導覽至上一個步驟中產生的 .zip 檔案：`target/managed-flink-pyflink-getting-started-1.0.0.zip`。

1. 選擇**上傳**，而不變更任何其他設定。

## 建立和設定 Managed Service for Apache Flink 應用程式
<a name="gs-python-7"></a>

您可以使用 主控台或 建立和設定 Managed Service for Apache Flink 應用程式 AWS CLI。在本教學課程中，我們將使用 主控台。

### 建立應用程式
<a name="gs-python-7-console-create"></a>

1. 登入 AWS 管理主控台，並在 https：//https://console.aws.amazon.com/flink 開啟 Amazon MSF 主控台。

1. 確認已選取正確的區域：美國東部 （維吉尼亞北部）us-east-1。

1. 開啟右側選單，然後選擇 **Apache Flink 應用程式**，然後選擇**建立串流應用程式**。或者，從初始頁面的**開始使用**區段中選擇**建立串流應用程式**。

1. 在**建立串流應用程式**頁面上：
   + 對於**選擇設定串流處理應用程式的方法**，請選擇**從頭開始建立**。
   + 針對 **Apache Flink 組態、Application Flink 版本**，選擇 **Apache Flink 1.19**。
   + 對於**應用程式組態**：
     + 在**應用程式名稱**中，輸入 **MyApplication**。
     + 對於 **Description (說明)**，輸入 **My Python test app**。
     + 在**存取應用程式資源**中，選擇**使用必要政策建立/更新 IAM 角色 kinesis-analytics-MyApplication-us-east-1**。
   + 對於**應用程式設定的範本**：
     + 針對**範本**，選擇**開發**。
   + 選擇**建立串流應用程式**。

**注意**  
使用主控台建立 Managed Service for Apache Flink 應用程式時，可以選擇是否為應用程式建立 IAM 角色和政策。應用程式使用此角色和政策來存取其相依資源。這些 IAM 資源會如下所述使用您的應用程式名稱和區域命名：  
政策：`kinesis-analytics-service-MyApplication-us-west-2`
角色：`kinesisanalytics-MyApplication-us-west-2`
Amazon Managed Service for Apache Flink 先前稱為 *Kinesis Data Analytics*。自動產生的資源名稱會加上 字首，`kinesis-analytics`以確保回溯相容性。

### 編輯 IAM 政策
<a name="gs-python-7-console-iam"></a>

編輯 IAM 政策以新增 Amazon S3 儲存貯體存取許可。

**編輯 IAM 政策以新增 S3 儲存貯體許可**

1. 前往 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 開啟 IAM 主控台。

1. 選擇**政策**。選擇主控台為您在上一節所建立的 **`kinesis-analytics-service-MyApplication-us-east-1`** 政策。

1. 選擇**編輯**，然後選擇 **JSON** 索引標籤。

1. 將下列政策範例的反白部分新增至政策。使用您的帳戶 ID 取代範例帳戶 ID (*012345678901*)。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

1. 選擇**下一步**，然後選擇**儲存變更**。

### 設定應用程式
<a name="gs-python-7-console-configure"></a>

編輯應用程式組態以設定應用程式程式碼成品。

**設定應用程式**

1. 在 **MyApplication** 頁面，選擇**設定**。

1. 在**應用程式碼位置**區段中：
   + 針對 **Amazon S3 儲存貯**體，選取您先前為應用程式碼建立的儲存貯體。選擇**瀏覽**並選擇正確的儲存貯體，然後選擇**選擇**。請勿選取儲存貯體名稱。
   + 對於 **Amazon S3 物件的路徑**，請輸入 **managed-flink-pyflink-getting-started-1.0.0.zip**。

1. 針對**存取許可**，選擇**`kinesis-analytics-MyApplication-us-east-1`使用必要政策建立/更新 IAM 角色**。

1. 移至**執行期屬性**，並保留所有其他設定的預設值。

1. 選擇**新增項目**並新增下列每個參數：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/gs-python-createapp.html)

1. 請勿修改任何其他區段，然後選擇**儲存變更**。

**注意**  
當您選擇啟用 Amazon CloudWatch 日誌時，Managed Service for Apache Flink 便會為您建立日誌群組和日誌串流。這些資源的名稱如下所示：  
日誌群組：`/aws/kinesis-analytics/MyApplication`
日誌串流：`kinesis-analytics-log-stream`

### 執行應用程式
<a name="gs-python-7-console-run"></a>

應用程式現在已設定並準備好執行。

**執行應用程式**

1. 在 Amazon Managed Service for Apache Flink 的 主控台上，選擇**我的應用程式**，然後選擇**執行**。

1. 在下一頁的應用程式還原組態頁面上，選擇**使用最新的快照執行**，然後選擇**執行**。

   **應用程式詳細資訊**中的**狀態**會從 `Ready` 轉換為 `Starting` ，然後在應用程式啟動`Running`時轉換為 。

當應用程式處於 `Running` 狀態時，您現在可以開啟 Flink 儀表板。

**開啟 儀表板**

1. 選擇**開啟 Apache Flink 儀表板**。儀表板會在新頁面上開啟。

1. 在**執行中任務**清單中，選擇您可以看到的單一任務。
**注意**  
如果您未正確設定執行期屬性或編輯 IAM 政策，應用程式狀態可能會變成 `Running`，但 Flink 儀表板會顯示任務正在持續重新啟動。如果應用程式設定錯誤或缺少存取外部資源的許可，這是常見的失敗案例。  
發生這種情況時，請檢查 Flink 儀表板中的**例外**狀況索引標籤，以查看問題的原因。

### 觀察執行中應用程式的指標
<a name="gs-python-observe-metrics"></a>

在 **MyApplication** 頁面的 **Amazon CloudWatch 指標**區段中，您可以查看執行中應用程式的一些基本指標。

**檢視指標**

1. 在**重新整理**按鈕旁，從下拉式清單中選取 **10 秒**。

1. 當應用程式執行正常時，您可以看到**運作時間**指標持續增加。

1. **Fullrestarts** 指標應為零。如果增加，組態可能會發生問題。若要調查問題，請檢閱 Flink 儀表板上的**例外狀況**索引標籤。

1. 在運作狀態良好的應用程式中，**失敗檢查點指標的數量**應為零。
**注意**  
此儀表板會顯示一組固定的指標，精細程度為 5 分鐘。您可以使用 CloudWatch 儀表板中的任何指標來建立自訂應用程式儀表板。

### 觀察 Kinesis 串流中的輸出資料
<a name="gs-python-observe-output"></a>

請確定您仍在使用 Python 指令碼或 Kinesis Data Generator 將資料發佈至輸入。

您現在可以使用 [https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/)：// 中的資料檢視器來觀察 Managed Service for Apache Flink 上執行之應用程式的輸出，與先前已執行的操作類似。

**檢視輸出**

1. 在以下網址開啟 Kinesis 主控台：[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)。

1. 確認區域與您用來執行本教學課程的區域相同。根據預設，它是 us-east-1US East (N. Virginia)。視需要變更 區域。

1. 選擇**資料串流**。

1. 選取您要觀察的串流。在本教學課程中，使用 `ExampleOutputStream`。

1.  選擇**資料檢視器**標籤。

1. 選取任何**碎片**，保持**最新**為**開始位置**，然後選擇**取得記錄**。您可能會看到「找不到此請求的記錄」錯誤。若是如此，請選擇**重試取得記錄**。發佈至串流顯示的最新記錄。

1. 選取資料欄中的值，以 JSON 格式檢查記錄的內容。

### 停止應用程式
<a name="gs-python-7-console-stop"></a>

若要停止應用程式，請前往名為 的 Managed Service for Apache Flink 應用程式的主控台頁面`MyApplication`。

**停止應用程式**

1. 從**動作**下拉式清單中，選擇**停止**。

1. **應用程式詳細資訊**中的**狀態**會從 轉換為 `Running` `Stopping`，然後在應用程式完全停止`Ready`時轉換為 。
**注意**  
別忘了也要停止從 Python 指令碼或 Kinesis Data Generator 將資料傳送至輸入串流。

## 下一步驟
<a name="gs-python-next-step-4"></a>

[清除 AWS 資源](gs-python-cleanup.md)