

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Amazon Managed Service for Apache Flink for Python 入門
<a name="gs-python"></a>

本節將為您介紹使用 Python 和資料表 API 的 Managed Service for Apache Flink 的基本概念。它描述了建立和測試應用程式的可用選項。此外，它還提供了相關指示，以協助您安裝完成本指南教學課程以及建立您的第一個應用程式所需要的工具。

**Topics**
+ [檢閱 Managed Service for Apache Flink 應用程式的元件](#gs-python-table-components)
+ [滿足先決條件](#gs-python-prerequisites)
+ [建立並執行 Managed Service for Apache Flink for Python 應用程式](gs-python-createapp.md)
+ [清除 AWS 資源](gs-python-cleanup.md)

## 檢閱 Managed Service for Apache Flink 應用程式的元件
<a name="gs-python-table-components"></a>

**注意**  
Amazon Managed Service for Apache Flink 支援所有 [Apache Flink APIs](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/overview/#flinks-apis)。根據您選擇的 API，應用程式的結構略有不同。在 Python 中開發 Apache Flink 應用程式時，一種常用的方法是使用內嵌在 Python 程式碼中的 SQL 定義應用程式流程。這是我們在下列 Gettgin Started 教學課程中遵循的方法。

為了處理資料，您的 Managed Service for Apache Flink 應用程式會使用 Python 指令碼來定義處理輸入並使用 Apache Flink 執行期產生輸出的資料流程。

典型的 Managed Service for Apache Flink 應用程式具有下列元件：
+ **執行期屬性：**您可以使用*執行期屬性*來設定應用程式，無需重新編譯應用程式的程式碼。
+ **來源：**應用程式會耗用來自一或多個*來源*的資料。來源使用[連接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/)從外部系統讀取資料，例如 Kinesis 資料串流或 Amazon MSK 主題。您也可以使用特殊連接器從應用程式內產生資料。當您使用 SQL 時，應用程式會將來源定義為*來源資料表*。
+ **轉換：**應用程式使用可篩選、擴充或彙總資料的一或多個*轉換*來處理資料。當您使用 SQL 時，應用程式會將轉換定義為 SQL 查詢。
+ **接收器：**應用程式會透過*接收器*將資料傳送至外部來源。接收器使用[連接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/)將資料傳送至外部系統，例如 Kinesis 資料串流、Amazon MSK 主題、Amazon S3 儲存貯體或關聯式資料庫。您也可以使用特殊連接器來列印輸出以供開發之用。當您使用 SQL 時，應用程式會將接收器定義為插入結果的*接收器資料表*。如需詳細資訊，請參閱[在 Managed Service for Apache Flink 中使用接收器寫入資料](how-sinks.md)。

您的 Python 應用程式也可能需要外部相依性，例如其他 Python 程式庫或應用程式使用的任何 Flink 連接器。當您封裝應用程式時，必須包含應用程式所需的每個相依性。本教學課程示範如何包含連接器相依性，以及如何封裝應用程式以在 Amazon Managed Service for Apache Flink 上部署。

## 滿足先決條件
<a name="gs-python-prerequisites"></a>

若要完成本教學課程，您必須具備下列項目：
+ **Python 3.11**，最好使用 [VirtualEnv (venv)](https://docs.python.org/3.11/library/venv.html)、[Conda](https://docs.conda.io/en/latest/) 或 [Miniconda](https://docs.anaconda.com/miniconda/) 等獨立環境。
+  [Git 用戶端](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) - 如果您尚未安裝 Git 用戶端。
+ [Java 開發套件 (JDK) 第 11 版](https://www.oracle.com/java/technologies/downloads/#java11) - 安裝 Java JDK 11，並將 `JAVA_HOME`環境變數設定為指向您的安裝位置。如果您沒有 JDK 11，您可以使用 [Amazon Corretto](https://docs.aws.amazon.com/corretto)或我們選擇的任何標準 JDK。
  + 若要確認您已正確安裝 JDK，請執行下列命令。如果您使用的是 Amazon Corretto 11 以外的 JDK，輸出將會不同。請確定版本為 11.x。

    ```
    $ java --version
    
    openjdk 11.0.23 2024-04-16 LTS
    OpenJDK Runtime Environment Corretto-11.0.23.9.1 (build 11.0.23+9-LTS)
    OpenJDK 64-Bit Server VM Corretto-11.0.23.9.1 (build 11.0.23+9-LTS, mixed mode)
    ```
+ [Apache Maven](https://maven.apache.org/) - 如果您尚未安裝 Apache Maven。如需詳細資訊，請參閱[安裝 Apache Maven](https://maven.apache.org/install.html)。
  + 若要測試您的 Apache Maven 安裝，請使用下列命令：

    ```
    $ mvn -version
    ```

**注意**  
雖然您的應用程式是以 Python 撰寫，但 Apache Flink 在 Java 虛擬機器 (JVM) 中執行。它將 Kinesis 連接器等大多數相依性分發為 JAR 檔案。若要管理這些相依性，並將應用程式封裝在 ZIP 檔案中，請使用 [Apache Maven](https://maven.apache.org/)。本教學課程說明如何執行此操作。

**警告**  
我們建議您使用 Python 3.11 進行本機開發。這是 Amazon Managed Service for Apache Flink 搭配 Flink 執行期 1.19 使用的相同 Python 版本。  
在 Python 3.12 上安裝 Python Flink 程式庫 1.19 可能會失敗。  
如果您的機器預設安裝了另一個 Python 版本，我們建議您使用 Python 3.11 建立 VirtualEnv 等獨立環境。

**適用於本機開發的 IDE**

我們建議您使用 [PyCharm](https://www.jetbrains.com/pycharm/) 或 [Visual Studio Code](https://code.visualstudio.com/) 等開發環境來開發和編譯您的應用程式。

然後，完成 的前兩個步驟[開始使用 Amazon Managed Service for Apache Flink (DataStream API)](getting-started.md)：
+ [設定 AWS 帳戶並建立管理員使用者](setting-up.md)
+ [設定 AWS Command Line Interface (AWS CLI)](setup-awscli.md)

若要開始使用，請參閱[建立應用程式](gs-python-createapp.md)。

# 建立並執行 Managed Service for Apache Flink for Python 應用程式
<a name="gs-python-createapp"></a>

在本節中，您會建立適用於 Python 應用程式的 Managed Service for Apache Flink 應用程式，並以 Kinesis 串流做為來源和接收器。

**Topics**
+ [建立相依資源](#gs-python-resources)
+ [設定您的本機開發環境](#gs-python-set-up)
+ [下載並檢查 Apache Flink 串流 Python 程式碼](#gs-python-download)
+ [管理 JAR 相依性](#gs-python-jar-dependencies)
+ [將範例記錄寫入輸入串流](#gs-python-sample-records)
+ [在本機執行您的應用程式](#gs-python-run-locally)
+ [觀察 Kinesis 串流中的輸入和輸出資料](#gs-python-observe-input-output)
+ [停止應用程式在本機執行](#gs-python-stop)
+ [封裝您的應用程式程式碼](#gs-python-package-code)
+ [將應用程式套件上傳至 Amazon S3 儲存貯體](#gs-python-upload-bucket)
+ [建立和設定 Managed Service for Apache Flink 應用程式](#gs-python-7)
+ [下一步驟](#gs-python-next-step-4)

## 建立相依資源
<a name="gs-python-resources"></a>

在為本練習建立 Managed Service for Apache Flink 之前，先建立下列相依資源：
+ 兩個 Kinesis 串流，用於輸入和輸出。
+ 存放應用程式程式碼的 Amazon S3 儲存貯體。

**注意**  
本教學假設您正在 us-east-1 區域中部署應用程式。如果您使用另一個區域，則必須相應地調整所有步驟。

### 建立兩個 Kinesis 串流
<a name="gs-python-resources-streams"></a>

為此練習建立 Managed Service for Apache Flink 應用程式之前，請在將用於部署應用程式的相同區域中建立兩個 Kinesis 資料串流 (`ExampleInputStream` 和 `ExampleOutputStream`) （在此範例中為 us-east-1)。您的應用程式會將這些串流用於應用程式來源和目的地串流。

您可以使用 Amazon Kinesis 主控台或以下 AWS CLI 命令來建立這些串流。如需主控台指示，請參閱《Amazon Kinesis Data Streams 開發人員指南》中的[建立和更新資料串流](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)**。

**建立資料串流 (AWS CLI)**

1. 若要建立第一個串流 (`ExampleInputStream`)，請使用下列 Amazon Kinesis `create-stream` AWS CLI 命令。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-east-1
   ```

1. 若要建立應用程式用來寫入輸出的第二個串流，請執行相同的命令，將串流名稱變更為 `ExampleOutputStream`。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-east-1
   ```

### 建立 Amazon S3 儲存貯體
<a name="gs-python-resources-s3"></a>

您可以使用主控台建立 Amazon S3 儲存貯體。如需建立這些資源的相關指示，請參閱以下主題：
+ 《Amazon Simple Storage Service 使用者指南》中的[如何建立 S3 儲存貯體](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html)。**為 Amazon S3 儲存貯體提供全域唯一名稱，例如透過附加您的登入名稱。
**注意**  
請確定您在用於本教學課程的區域中建立 S3 儲存貯體 (us-east-1)。

### 其他資源
<a name="gs-python-resources-cw"></a>

建立應用程式時，Managed Service for Apache Flink 會建立下列 Amazon CloudWatch 資源 (如果尚不存在該資源)：
+ 名為 `/AWS/KinesisAnalytics-java/<my-application>` 的日誌群組。
+ 名為 `kinesis-analytics-log-stream` 的日誌串流。

## 設定您的本機開發環境
<a name="gs-python-set-up"></a>

對於開發和偵錯，您可以在機器上執行 Python Flink 應用程式。您可以在您選擇的 Python IDE 中使用 `python main.py`或 從命令列啟動應用程式。

**注意**  
在您的開發機器上，您必須安裝 Python 3.10 或 3.11、Java 11、Apache Maven 和 Git。我們建議您使用 IDE，例如 [PyCharm](https://www.jetbrains.com/pycharm/) 或 [Visual Studio Code](https://code.visualstudio.com/)。若要驗證您是否符合所有先決條件，請先參閱 [滿足完成練習的先決條件](gs-python.md#gs-python-prerequisites) 再繼續。

### 安裝 PyFlink 程式庫
<a name="gs-python-install-pyflink"></a>

若要開發應用程式並在本機執行，您必須安裝 Flink Python 程式庫。

1. 使用 VirtualEnv、Conda 或任何類似的 Python 工具建立獨立的 Python 環境。

1. 在該環境中安裝 PyFlink 程式庫。使用您在 Amazon Managed Service for Apache Flink 中使用的相同 Apache Flink 執行時間版本。目前，建議的執行時間為 1.19.1。

   ```
   $ pip install apache-flink==1.19.1
   ```

1. 執行應用程式時，請確定環境處於作用中狀態。如果您在 IDE 中執行應用程式，請確定 IDE 使用環境做為執行時間。程序取決於您使用的 IDE。
**注意**  
您只需要安裝 PyFlink 程式庫。**您不需要**在機器上安裝 Apache Flink 叢集。

### 驗證您的 AWS 工作階段
<a name="gs-python-authenticate"></a>

應用程式使用 Kinesis 資料串流來發佈資料。在本機執行時，您必須擁有有效的已 AWS 驗證工作階段，具有寫入 Kinesis 資料串流的許可。使用下列步驟來驗證您的工作階段：

1. 如果您沒有設定 AWS CLI 和具有有效登入資料的具名設定檔，請參閱 [設定 AWS Command Line Interface (AWS CLI)](setup-awscli.md)。

1. 透過發佈下列測試記錄，確認您的 AWS CLI 已正確設定，且您的使用者具有寫入 Kinesis 資料串流的許可：

   ```
   $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
   ```

1. 如果您的 IDE 有要整合的外掛程式 AWS，您可以使用它將登入資料傳遞至在 IDE 中執行的應用程式。如需詳細資訊，請參閱 [AWS Toolkit for PyCharm](https://aws.amazon.com/pycharm/)、 [AWS Toolkit for Visual Studio Code](https://aws.amazon.com/visualstudiocode/) 和 [AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/)。

## 下載並檢查 Apache Flink 串流 Python 程式碼
<a name="gs-python-download"></a>

此範例的 Python 應用程式的程式碼可從 GitHub 下載。若要下載應用程式的程式碼，請執行下列動作：

1. 使用以下指令複製遠端儲存庫：

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. 導覽至 `./python/GettingStarted` 目錄。

### 檢閱應用程式元件
<a name="gs-python-review"></a>

應用程式碼位於 。 `main.py`我們使用內嵌在 Python 中的 SQL 來定義應用程式的流程。

**注意**  
為了獲得最佳化的開發人員體驗，應用程式的設計可在 Amazon Managed Service for Apache Flink 和本機上執行，無需變更任何程式碼，即可在機器上進行開發。應用程式使用 環境變數`IS_LOCAL = true`來偵測它何時在本機執行。您必須在 shell `IS_LOCAL = true`或 IDE 的執行組態中設定環境變數。
+ 應用程式會設定執行環境並讀取執行時間組態。若要同時在 Amazon Managed Service for Apache Flink 和本機上運作，應用程式會檢查 `IS_LOCAL`變數。
  + 以下是應用程式在 Amazon Managed Service for Apache Flink 中執行時的預設行為：

    1. 載入與應用程式一起封裝的相依性。如需詳細資訊，請參閱 （連結）

    1. 從您在 Amazon Managed Service for Apache Flink 應用程式中定義的執行期屬性載入組態。如需詳細資訊，請參閱 （連結）
  + 當應用程式偵測到您在本機執行應用程式`IS_LOCAL = true`時：

    1. 從專案載入外部相依性。

    1. 從專案中包含`application_properties.json`的檔案載入組態。

       ```
       ...
       APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"
       ...
       is_local = (
           True if os.environ.get("IS_LOCAL") else False
       )
       ...
       if is_local:
           APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"
           CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
           table_env.get_config().get_configuration().set_string(
               "pipeline.jars",
               "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar",
           )
       ```
+ 應用程式使用 [Kinesis Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/) 定義具有`CREATE TABLE`陳述式的來源資料表。此資料表會從輸入 Kinesis 串流讀取資料。應用程式會從執行時間組態中取得串流的名稱、區域和初始位置。

  ```
  table_env.execute_sql(f"""
          CREATE TABLE prices (
                  ticker VARCHAR(6),
                  price DOUBLE,
                  event_time TIMESTAMP(3),
                  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
                )
                PARTITIONED BY (ticker)
                WITH (
                  'connector' = 'kinesis',
                  'stream' = '{input_stream_name}',
                  'aws.region' = '{input_stream_region}',
                  'format' = 'json',
                  'json.timestamp-format.standard' = 'ISO-8601'
                ) """)
  ```
+ 在此範例中，應用程式也會使用 [Kinesis Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/) 定義接收器資料表。此故事會將資料傳送至輸出 Kinesis 串流。

  ```
  table_env.execute_sql(f"""
              CREATE TABLE output (
                  ticker VARCHAR(6),
                  price DOUBLE,
                  event_time TIMESTAMP(3)
                )
                PARTITIONED BY (ticker)
                WITH (
                  'connector' = 'kinesis',
                  'stream' = '{output_stream_name}',
                  'aws.region' = '{output_stream_region}',
                  'sink.partitioner-field-delimiter' = ';',
                  'sink.batch.max-size' = '100',
                  'format' = 'json',
                  'json.timestamp-format.standard' = 'ISO-8601'
                )""")
  ```
+ 最後，應用程式會從來源資料表執行`INSERT INTO...`接收資料表的 SQL。在更複雜的應用程式中，您可能會有額外的步驟在寫入目的地之前轉換資料。

  ```
  table_result = table_env.execute_sql("""INSERT INTO output 
          SELECT ticker, price, event_time FROM prices""")
  ```
+ 您必須在`main()`函數結尾新增另一個步驟，才能在本機執行應用程式：

  ```
  if is_local:
      table_result.wait()
  ```

  如果沒有此陳述式，應用程式會在您於本機執行時立即終止。當您在 Amazon Managed Service for Apache Flink 中執行應用程式時，不得執行此陳述式。

## 管理 JAR 相依性
<a name="gs-python-jar-dependencies"></a>

PyFlink 應用程式通常需要一或多個連接器。本教學課程中的應用程式使用 [Kinesis Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kinesis/)。由於 Apache Flink 在 Java JVM 中執行，無論您是否在 Python 中實作應用程式，連接器都會以 JAR 檔案形式分佈。在 Amazon Managed Service for Apache Flink 上部署這些相依性時，您必須將這些相依性與應用程式一起封裝。

在此範例中，我們會示範如何使用 Apache Maven 來擷取相依性，並封裝應用程式以在 Managed Service for Apache Flink 上執行。

**注意**  
有擷取和封裝相依性的其他方法。此範例示範可正確搭配一或多個連接器使用的方法。它還可讓您在本機、用於開發以及在 Managed Service for Apache Flink 上執行應用程式，而無需變更程式碼。

### 使用 pom.xml 檔案
<a name="gs-python-jar-pom"></a>

Apache Maven 使用 `pom.xml` 檔案來控制相依性和應用程式封裝。

任何 JAR 相依性都會在 `<dependencies>...</dependencies>`區塊的 `pom.xml` 檔案中指定。

```
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    ...
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kinesis</artifactId>
            <version>4.3.0-1.19</version>
        </dependency>
    </dependencies>
    ...
```

若要尋找要使用的正確連接器成品和版本，請參閱 [將 Apache Flink 連接器與 Managed Service for Apache Flink 搭配使用](how-flink-connectors.md)。請務必參考您正在使用的 Apache Flink 版本。在此範例中，我們使用 Kinesis 連接器。對於 Apache Flink 1.19，連接器版本為 `4.3.0-1.19`。

**注意**  
如果您使用的是 Apache Flink 1.19，則沒有為此版本特別發行的連接器版本。使用針對 1.18 發行的連接器。

### 下載和封裝相依性
<a name="gs-python-dependencies-download"></a>

使用 Maven 下載`pom.xml`檔案中定義的相依性，並為 Python Flink 應用程式封裝相依性。

1. 導覽至包含 Python 入門專案的目錄，稱為 `python/GettingStarted`。

1. 執行以下命令：

```
$ mvn package
```

Maven 會建立新的檔案，名為 `./target/pyflink-dependencies.jar`。當您在機器上進行本機開發時，Python 應用程式會尋找此檔案。

**注意**  
如果您忘記執行此命令，當您嘗試執行應用程式時，它會失敗並顯示錯誤： **找不到任何識別符為 "kinesis" 的工廠**。

## 將範例記錄寫入輸入串流
<a name="gs-python-sample-records"></a>

在本節中，您將傳送範例記錄到串流，供應用程式處理。您可以使用 Python 指令碼或 [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator) 產生範例資料。

### 使用 Python 指令碼產生範例資料
<a name="gs-python-sample-data"></a>

您可以使用 Python 指令碼將範例記錄傳送至串流。

**注意**  
若要執行此 Python 指令碼，您必須使用 Python 3.x 並安裝[AWS 適用於 Python (Boto) 的 SDK](https://aws.amazon.com/developer/language/python/) 程式庫。

**若要開始將測試資料傳送至 Kinesis 輸入串流：**

1. 從資料產生器 [ GitHub 儲存庫下載資料產生器](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/data-generator) `stock.py` Python 指令碼。

1. 執行 `stock.py` 指令碼：

   ```
   $ python stock.py
   ```

在您完成教學課程的其餘部分時，請讓指令碼保持執行。您現在可以執行 Apache Flink 應用程式。

### 使用 Kinesis Data Generator 產生範例資料
<a name="gs-python-sample-kinesis"></a>

或者，若要使用 Python 指令碼，您可以使用[託管版本](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html)中也提供的 [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator)，將隨機範例資料傳送至串流。Kinesis Data Generator 會在您的瀏覽器中執行，您不需要在機器上安裝任何項目。

**若要設定和執行 Kinesis Data Generator：**

1. 遵循 [Kinesis Data Generator 文件](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)中的指示來設定對工具的存取。您將執行設定使用者和密碼的 CloudFormation 範本。

1. 透過 CloudFormation 範本產生的 URL 存取 Kinesis Data Generator。CloudFormation 範本完成後，您可以在**輸出**索引標籤中找到 URL。

1. 設定資料產生器：
   + **區域：**選取您用於本教學課程的區域：us-east-1
   + **串流/交付串流：**選取應用程式將使用的輸入串流： `ExampleInputStream`
   + **每秒記錄數：**100
   + **記錄範本：**複製並貼上下列範本：

     ```
     {
       "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}},
       "ticker" : "{{random.arrayElement(
             ["AAPL", "AMZN", "MSFT", "INTC", "TBV"]
         )}}",
       "price" : {{random.number(100)}}          
     }
     ```

1. 測試範本：選擇**測試範本**，並確認產生的記錄與以下內容類似：

   ```
   { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
   ```

1. 啟動資料產生器：選擇**選取傳送資料**。

Kinesis Data Generator 現在正在將資料傳送至 `ExampleInputStream`。

## 在本機執行您的應用程式
<a name="gs-python-run-locally"></a>

您可以在本機測試應用程式，使用 `python main.py`IDE 從命令列執行或從 IDE 執行。

若要在本機執行應用程式，您必須安裝正確的 PyFlink 程式庫版本，如上一節所述。如需詳細資訊，請參閱 （連結）

**注意**  
繼續之前，請確認輸入和輸出串流是否可用。請參閱 [建立兩個 Amazon Kinesis 資料串流](get-started-exercise.md#get-started-exercise-1)。此外，請確認您具有從兩個串流讀取和寫入的許可。請參閱 [驗證您的 AWS 工作階段](get-started-exercise.md#get-started-exercise-2-5)。

### 將 Python 專案匯入 IDE
<a name="gs-python-import"></a>

若要開始在 IDE 中使用應用程式，您必須將其匯入為 Python 專案。

您複製的儲存庫包含多個範例。每個範例都是個別的專案。在本教學課程中，將`./python/GettingStarted`子目錄中的內容匯入 IDE。

將程式碼匯入為現有的 Python 專案。

**注意**  
匯入新 Python 專案的確切程序取決於您使用的 IDE。

### 檢查本機應用程式組態
<a name="gs-python-check-configuration"></a>

在本機執行時，應用程式會使用 下專案資源資料夾中 `application_properties.json` 檔案中的組態`./src/main/resources`。您可以編輯此檔案以使用不同的 Kinesis 串流名稱或區域。

```
[
  {
    "PropertyGroupId": "InputStream0",
    "PropertyMap": {
      "stream.name": "ExampleInputStream",
      "flink.stream.initpos": "LATEST",
      "aws.region": "us-east-1"
    }
  },
  {
    "PropertyGroupId": "OutputStream0",
    "PropertyMap": {
      "stream.name": "ExampleOutputStream",
      "aws.region": "us-east-1"
    }
  }
]
```

### 在本機執行您的 Python 應用程式
<a name="gs-python-run-locally"></a>

您可以從命令列做為一般 Python 指令碼，或從 IDE 在本機執行應用程式。

**從命令列執行您的應用程式**

1. 請確定安裝 Python Flink 程式庫的獨立 Python 環境目前處於作用中狀態，例如 Conda 或 VirtualEnv。

1. 請確定您`mvn package`至少執行一次。

1. 設定 `IS_LOCAL = true` 環境變數：

   ```
   $ export IS_LOCAL=true
   ```

1. 以一般 Python 指令碼執行應用程式。

   ```
   $python main.py
   ```

**從 IDE 中執行應用程式**

1. 將 IDE 設定為使用以下組態執行`main.py`指令碼：

   1. 使用安裝 PyFlink 程式庫的獨立 Python 環境，例如 Conda 或 VirtualEnv。

   1. 使用 AWS 登入資料來存取輸入和輸出 Kinesis 資料串流。

   1. 設定 `IS_LOCAL = true`。

1. 設定執行組態的確切程序取決於您的 IDE 和 。

1. 當您設定 IDE 後，請執行 Python 指令碼，並在應用程式執行時使用 IDE 提供的工具。

### 在本機檢查應用程式日誌
<a name="gs-python-run-IDE"></a>

在本機執行時，除了應用程式啟動時列印和顯示的幾行以外，應用程式不會在主控台中顯示任何日誌。PyFlink 會將日誌寫入安裝 Python Flink 程式庫的目錄中的檔案。應用程式會在日誌啟動時列印日誌的位置。您也可以執行下列命令來尋找日誌：

```
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
```

1. 列出記錄目錄中的檔案。您通常會找到單一`.log`檔案。

1. 在應用程式執行時自訂檔案：`tail -f <log-path>/<log-file>.log`。

## 觀察 Kinesis 串流中的輸入和輸出資料
<a name="gs-python-observe-input-output"></a>

您可以使用 Amazon Kinesis 主控台中的**資料檢視器**，觀察 （產生範例 Python) 或 Kinesis Data Generator （連結） 傳送至輸入串流的記錄。 Amazon Kinesis 

**若要觀察記錄：**  在以下網址開啟 Kinesis 主控台：[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)。  驗證您執行本教學課程的區域是否相同，預設為 us-east-1 美國東部 （維吉尼亞北部）。如果區域不相符，請變更區域。   選擇**資料串流**。   選取您要觀察的串流，或 `ExampleInputStream` `ExampleOutputStream.`   選擇**資料檢視器**標籤。   選擇任何**碎片**，保持**最新**為**開始位置**，然後選擇**取得記錄**。您可能會看到「找不到此請求的記錄」錯誤。若是如此，請選擇**重試取得記錄**。發佈至串流顯示的最新記錄。   選擇資料欄中的值，以 JSON 格式檢查記錄的內容。   

## 停止應用程式在本機執行
<a name="gs-python-stop"></a>

停止在 IDE 中執行的應用程式。IDE 通常提供「停止」選項。確切的位置和方法取決於 IDE。

## 封裝您的應用程式程式碼
<a name="gs-python-package-code"></a>

在本節中，您會使用 Apache Maven 將應用程式程式碼和所有必要的相依性封裝在 .zip 檔案中。

再次執行 Maven 套件命令：

```
$ mvn package
```

此命令會產生 檔案 `target/managed-flink-pyflink-getting-started-1.0.0.zip`。

## 將應用程式套件上傳至 Amazon S3 儲存貯體
<a name="gs-python-upload-bucket"></a>

在本節中，您將上一節中建立的 .zip 檔案上傳至在本教學課程開始時建立的 Amazon Simple Storage Service (Amazon S3) 儲存貯體。如果您尚未完成此步驟，請參閱 （連結）。

**上傳應用程式碼 JAR 檔案**

1. 開啟位於 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/) 的 Amazon S3 主控台。

1. 選擇您先前為應用程式碼建立的儲存貯體。

1. 選擇**上傳**。

1. 選擇 **Add files (新增檔案)**。

1. 導覽至上一個步驟中產生的 .zip 檔案：`target/managed-flink-pyflink-getting-started-1.0.0.zip`。

1. 選擇**上傳**，而不變更任何其他設定。

## 建立和設定 Managed Service for Apache Flink 應用程式
<a name="gs-python-7"></a>

您可以使用 主控台或 建立和設定 Managed Service for Apache Flink 應用程式 AWS CLI。在本教學課程中，我們將使用 主控台。

### 建立應用程式
<a name="gs-python-7-console-create"></a>

1. 登入 AWS 管理主控台，並在 https：//https://console.aws.amazon.com/flink 開啟 Amazon MSF 主控台。

1. 確認已選取正確的區域：美國東部 （維吉尼亞北部）us-east-1。

1. 開啟右側選單，然後選擇 **Apache Flink 應用程式**，然後選擇**建立串流應用程式**。或者，從初始頁面的**開始使用**區段中選擇**建立串流應用程式**。

1. 在**建立串流應用程式**頁面上：
   + 對於**選擇設定串流處理應用程式的方法**，請選擇**從頭開始建立**。
   + 針對 **Apache Flink 組態、Application Flink 版本**，選擇 **Apache Flink 1.19**。
   + 對於**應用程式組態**：
     + 在**應用程式名稱**中，輸入 **MyApplication**。
     + 對於 **Description (說明)**，輸入 **My Python test app**。
     + 在**存取應用程式資源**中，選擇**使用必要政策建立/更新 IAM 角色 kinesis-analytics-MyApplication-us-east-1**。
   + 對於**應用程式設定的範本**：
     + 針對**範本**，選擇**開發**。
   + 選擇**建立串流應用程式**。

**注意**  
使用主控台建立 Managed Service for Apache Flink 應用程式時，可以選擇是否為應用程式建立 IAM 角色和政策。應用程式使用此角色和政策來存取其相依資源。這些 IAM 資源會如下所述使用您的應用程式名稱和區域命名：  
政策：`kinesis-analytics-service-MyApplication-us-west-2`
角色：`kinesisanalytics-MyApplication-us-west-2`
Amazon Managed Service for Apache Flink 先前稱為 *Kinesis Data Analytics*。自動產生的資源名稱會加上 字首，`kinesis-analytics`以確保回溯相容性。

### 編輯 IAM 政策
<a name="gs-python-7-console-iam"></a>

編輯 IAM 政策以新增 Amazon S3 儲存貯體存取許可。

**編輯 IAM 政策以新增 S3 儲存貯體許可**

1. 前往 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 開啟 IAM 主控台。

1. 選擇**政策**。選擇主控台為您在上一節所建立的 **`kinesis-analytics-service-MyApplication-us-east-1`** 政策。

1. 選擇**編輯**，然後選擇 **JSON** 索引標籤。

1. 將下列政策範例的反白部分新增至政策。使用您的帳戶 ID 取代範例帳戶 ID (*012345678901*)。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

1. 選擇**下一步**，然後選擇**儲存變更**。

### 設定應用程式
<a name="gs-python-7-console-configure"></a>

編輯應用程式組態以設定應用程式程式碼成品。

**設定應用程式**

1. 在 **MyApplication** 頁面，選擇**設定**。

1. 在**應用程式碼位置**區段中：
   + 針對 **Amazon S3 儲存貯**體，選取您先前為應用程式碼建立的儲存貯體。選擇**瀏覽**並選擇正確的儲存貯體，然後選擇**選擇**。請勿選取儲存貯體名稱。
   + 對於 **Amazon S3 物件的路徑**，請輸入 **managed-flink-pyflink-getting-started-1.0.0.zip**。

1. 針對**存取許可**，選擇**`kinesis-analytics-MyApplication-us-east-1`使用必要政策建立/更新 IAM 角色**。

1. 移至**執行期屬性**，並保留所有其他設定的預設值。

1. 選擇**新增項目**並新增下列每個參數：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/gs-python-createapp.html)

1. 請勿修改任何其他區段，然後選擇**儲存變更**。

**注意**  
當您選擇啟用 Amazon CloudWatch 日誌時，Managed Service for Apache Flink 便會為您建立日誌群組和日誌串流。這些資源的名稱如下所示：  
日誌群組：`/aws/kinesis-analytics/MyApplication`
日誌串流：`kinesis-analytics-log-stream`

### 執行應用程式
<a name="gs-python-7-console-run"></a>

應用程式現在已設定並準備好執行。

**執行應用程式**

1. 在 Amazon Managed Service for Apache Flink 的 主控台上，選擇**我的應用程式**，然後選擇**執行**。

1. 在下一頁的應用程式還原組態頁面上，選擇**使用最新的快照執行**，然後選擇**執行**。

   **應用程式詳細資訊**中的**狀態**會從 `Ready` 轉換為 `Starting` ，然後在應用程式啟動`Running`時轉換為 。

當應用程式處於 `Running` 狀態時，您現在可以開啟 Flink 儀表板。

**開啟 儀表板**

1. 選擇**開啟 Apache Flink 儀表板**。儀表板會在新頁面上開啟。

1. 在**執行中任務**清單中，選擇您可以看到的單一任務。
**注意**  
如果您未正確設定執行期屬性或編輯 IAM 政策，應用程式狀態可能會變成 `Running`，但 Flink 儀表板會顯示任務正在持續重新啟動。如果應用程式設定錯誤或缺少存取外部資源的許可，這是常見的失敗案例。  
發生這種情況時，請檢查 Flink 儀表板中的**例外**狀況索引標籤，以查看問題的原因。

### 觀察執行中應用程式的指標
<a name="gs-python-observe-metrics"></a>

在 **MyApplication** 頁面的 **Amazon CloudWatch 指標**區段中，您可以查看執行中應用程式的一些基本指標。

**檢視指標**

1. 在**重新整理**按鈕旁，從下拉式清單中選取 **10 秒**。

1. 當應用程式執行正常時，您可以看到**運作時間**指標持續增加。

1. **Fullrestarts** 指標應為零。如果增加，組態可能會發生問題。若要調查問題，請檢閱 Flink 儀表板上的**例外狀況**索引標籤。

1. 在運作狀態良好的應用程式中，**失敗檢查點指標的數量**應為零。
**注意**  
此儀表板會顯示一組固定的指標，精細程度為 5 分鐘。您可以使用 CloudWatch 儀表板中的任何指標來建立自訂應用程式儀表板。

### 觀察 Kinesis 串流中的輸出資料
<a name="gs-python-observe-output"></a>

請確定您仍在使用 Python 指令碼或 Kinesis Data Generator 將資料發佈至輸入。

您現在可以使用 [https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/)：// 中的資料檢視器來觀察 Managed Service for Apache Flink 上執行之應用程式的輸出，與先前已執行的操作類似。

**檢視輸出**

1. 在以下網址開啟 Kinesis 主控台：[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)。

1. 確認區域與您用來執行本教學課程的區域相同。根據預設，它是 us-east-1US East (N. Virginia)。視需要變更 區域。

1. 選擇**資料串流**。

1. 選取您要觀察的串流。在本教學課程中，使用 `ExampleOutputStream`。

1.  選擇**資料檢視器**標籤。

1. 選取任何**碎片**，保持**最新**為**開始位置**，然後選擇**取得記錄**。您可能會看到「找不到此請求的記錄」錯誤。若是如此，請選擇**重試取得記錄**。發佈至串流顯示的最新記錄。

1. 選取資料欄中的值，以 JSON 格式檢查記錄的內容。

### 停止應用程式
<a name="gs-python-7-console-stop"></a>

若要停止應用程式，請前往名為 的 Managed Service for Apache Flink 應用程式的主控台頁面`MyApplication`。

**停止應用程式**

1. 從**動作**下拉式清單中，選擇**停止**。

1. **應用程式詳細資訊**中的**狀態**會從 轉換為 `Running` `Stopping`，然後在應用程式完全停止`Ready`時轉換為 。
**注意**  
別忘了也要停止從 Python 指令碼或 Kinesis Data Generator 將資料傳送至輸入串流。

## 下一步驟
<a name="gs-python-next-step-4"></a>

[清除 AWS 資源](gs-python-cleanup.md)

# 清除 AWS 資源
<a name="gs-python-cleanup"></a>

本節包含在入門 (Python) 教學課程中建立 AWS 的資源清除程序。

**Topics**
+ [刪除 Managed Service for Apache Flink 應用程式](#gs-python-cleanup-app)
+ [刪除您的 Kinesis 資料串流](#gs-python-cleanup-msk)
+ [刪除您的 Amazon S3 物件和儲存貯體](#gs-python-cleanup-s3)
+ [刪除您的 IAM 資源](#gs-python-cleanup-iam)
+ [刪除您的 CloudWatch 資源](#gs-python-cleanup-cw)

## 刪除 Managed Service for Apache Flink 應用程式
<a name="gs-python-cleanup-app"></a>

請使用下列程序刪除應用程式。

**刪除應用程式**

1. 在以下網址開啟 Kinesis 主控台：[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)。

1. 在 Managed Service for Apache Flink 面板中，選擇 **MyApplication**。

1. 從**動作**下拉式清單中，選擇**刪除**，然後確認刪除。

## 刪除您的 Kinesis 資料串流
<a name="gs-python-cleanup-msk"></a>

1. 登入 AWS 管理主控台，並在 https：//https://console.aws.amazon.com/flink 開啟 Amazon MSF 主控台。

1. 選擇**資料串流**。

1. 選取您建立的兩個串流，`ExampleInputStream`以及 `ExampleOutputStream`。

1. 從**動作**下拉式清單中，選擇**刪除**，然後確認刪除。

## 刪除您的 Amazon S3 物件和儲存貯體
<a name="gs-python-cleanup-s3"></a>

使用下列程序來刪除 S3 物件和儲存貯體。

**從 S3 儲存貯體刪除物件**

1. 開啟位於 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/) 的 Amazon S3 主控台。

1. 選取您為應用程式成品建立的 S3 儲存貯體。

1. 選取您上傳的應用程式成品，名為 `amazon-msf-java-stream-app-1.0.jar`。

1. 選擇**刪除**並確認刪除。

**刪除 S3 儲存貯體**

1. 開啟位於 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/) 的 Amazon S3 主控台。

1. 選取您為成品建立的儲存貯體。

1. 選擇**刪除**並確認刪除。
**注意**  
S3 儲存貯體必須為空白，才能將其刪除。

## 刪除您的 IAM 資源
<a name="gs-python-cleanup-iam"></a>

使用下列程序刪除 IAM 資源。

**刪除 IAM 資源**

1. 前往 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 開啟 IAM 主控台。

1. 在導覽列中，選擇**政策**。

1. 在篩選器控制項中，輸入 **kinesis**。

1. 選擇 **kinesis-analytics-service-MyApplication-us-east-1** 政策。

1. 選擇**政策動作**，然後選擇**刪除**。

1. 在導覽列中，選擇**角色**。

1. 選擇 **kinesis-analytics-MyApplication-us-east-1** 角色。

1. 選擇**刪除角色**，然後確認刪除。

## 刪除您的 CloudWatch 資源
<a name="gs-python-cleanup-cw"></a>

使用下列程序刪除 CloudWatch 資源。

**刪除 CloudWatch 資源**

1. 透過 [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/) 開啟 CloudWatch 主控台。

1. 在導覽列中，選擇**日誌**。

1. 選擇 **/aws/kinesis-analytics/MyApplication** 日誌群組。

1. 選擇**刪除日誌群組**，然後確認刪除。