為 Python 應用程序創建並運行阿帕奇 Flink 的託管服務 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

為 Python 應用程序創建並運行阿帕奇 Flink 的託管服務

在本節中,您將為 Python 應用程式建立 Apache Flink 應用程式的受管理服務,並將 Kinesis 串流作為來源和接收器。

建立相依資源

在為本練習建立 Managed Service for Apache Flink 之前,先建立下列相依資源:

  • 兩個 Kinesis 串流,用於輸入和輸出。

  • 用於存放應用程式程式碼的 Amazon S3 儲存貯體。

注意

本教學課程假設您正在 us-east-1 區域中部署應用程式。如果您使用其他區域,則必須相應地調整所有步驟。

建立兩個 Kinesis 串流

在為本練習建立適用於 Apache Flink 應用程式的受管服務之前,請在您將用來部署應用程式的相同區域中建立兩個 Kinesis 資料串流 (ExampleInputStreamExampleOutputStream) (在此範例中為 us-east-1)。您的應用程式會將這些串流用於應用程式來源和目的地串流。

您可以使用 Amazon Kinesis 主控台或以下 AWS CLI 命令來建立這些串流。如需主控台指示,請參閱《Amazon Kinesis Data Streams 開發人員指南》中的建立和更新資料串流

建立資料串流 (AWS CLI)
  1. 若要建立第一個串流 (ExampleInputStream),請使用下列 Amazon Kinesis create-stream AWS CLI 指令。

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1
  2. 若要建立應用程式用來寫入輸出的第二個串流,請執行相同的命令,將串流名稱變更為 ExampleOutputStream

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1

建立 Amazon S3 儲存貯體

您可以使用主控台建立 Amazon S3 儲存貯體。如需建立這些資源的相關指示,請參閱以下主題:

  • 《Amazon Simple Storage Service 使用者指南》中的如何建立 S3 儲存貯體為 Amazon S3 儲存貯體指定一個全域唯一的名稱,例如透過附加您的登入名稱。

    注意

    請務必在您用於本教學課程 (us-east-1) 的區域中建立 S3 儲存貯體。

其他資源

當您建立應用程式時,Apache Flink 的受管服務會建立下列 Amazon CloudWatch 資源 (如果這些資源尚未存在):

  • 名為 /AWS/KinesisAnalytics-java/<my-application> 的日誌群組。

  • 名為 kinesis-analytics-log-stream 的日誌串流。

設定您的本機開發環境

對於開發和調試,您可以在計算機上運行 Python Flink 應用程序。您可以使用python main.py或在您選擇IDE的 Python 中從命令行啟動應用程序。

注意

在你的開發機器上,你必須有 Python 3.10 或 3.11,Java 11,阿帕奇 Maven 和 Git 安裝。我們建議您使用IDE如PyCharm視覺工作室程式碼。若要確認您符合所有先決條件,請參閱繼續滿足完成練習的先決條件之前。

若要開發應用程式並在本機執行,您必須安裝 Flink Python 程式庫。

  1. 使用 VirtualEnv,康達或任何類似的 Python 工具創建一個獨立的 Python 環境。

  2. 在該環境中安裝 PyFlink 程式庫。使用您將在阿帕奇 Flink 的 Amazon 受管服務中使用的相同 Apache Flink 執行階段版本。目前,建議的執行階段為 1.19.1。

    $ pip install apache-flink==1.19.1
  3. 當您執行應用程式時,請確定環境處於作用中狀態。如果您在中執行應用程式IDE,請確定IDE使用環境做為執行階段。該過程取決於您IDE正在使用的。

    注意

    您只需要安裝程 PyFlink 式庫即可。您需要在您的機器上安裝 Apache Flink 叢集。

驗證您的 AWS 會話

應用程式會使用 Kinesis 資料串流來發佈資料。在本機執行時,您必須擁有具有寫入 Kinesis 資料串流之權限的有效 AWS 驗證工作階段。請使用下列步驟來驗證您的工作階段:

  1. 如果您沒有設定 AWS CLI 具有有效認證的具名設定檔,請參閱設置 AWS Command Line Interface (AWS CLI)

  2. 發佈下列測試記錄,確認您的設定正確,且您的使用者具有寫入 Kinesis 資料串流的權限: AWS CLI

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. 如果您IDE有要整合的外掛程式 AWS,您可以使用該外掛程式將憑證傳遞至在IDE. 如需詳細資訊,請參閱AWS 適用於 PyCharm Visual Studio 程式碼的AWS 工具AWS組和 IntelliJ IDEA 的工具組

下載並檢查阿帕奇 Flink 流 Python 代碼

此範例的 Python 應用程式程式碼可從中取得 GitHub。若要下載應用程式的程式碼,請執行下列動作:

  1. 使用以下指令複製遠端儲存庫:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. 導覽至 ./python/GettingStarted 目錄。

檢閱應用程式元

應用程式程式碼位於中main.py。我們使用SQL嵌入在 Python 中來定義應用程序的流程。

注意

為了獲得最佳化的開發人員體驗,應用程式的設計目的是在適用於 Apache Flink 的 Amazon 受管服務和本機上進行開發時,不需要變更任何程式碼即可執行。應用程式會使用環境變數IS_LOCAL = true來偵測何時在本機執行。您必須在 shell 上IS_LOCAL = true或IDE. 的運行配置中設置環境變量

  • 應用程式會設定執行環境並讀取執行階段組態。若要同時在適用於 Apache Flink 和本機的 Amazon 受管服務上運作,應用程式會檢查IS_LOCAL變數。

    • 以下是應用程式在適用於 Apache Flink 的 Amazon 受管服務中執行時的預設行為:

      1. 載入與應用程式封裝的相依性。有關更多信息,請參閱(鏈接)

      2. 從您在 Apache Flink 應用程式的 Amazon 受管服務中定義的執行階段屬性載入組態。有關更多信息,請參閱(鏈接)

    • 當應用程式偵測到您在本機執行應用程式IS_LOCAL = true時:

      1. 從專案載入外部相依性。

      2. 從專案中包含的application_properties.json檔案載入規劃。

        ... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
  • 應用程式會使用 Kinesis 連接器來定義含CREATE TABLE陳述式的來源資料表。此表格會從輸入 Kinesis 串流讀取資料。該應用程序從運行時配置獲取流的名稱,區域和初始位置。

    table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """)
  • 在此範例中,應用程式也會使用 Kinesis 連接器來定義接收表。這個故事將數據發送到輸出 Kinesis 流。

    table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""")
  • 最後,應用程式會從來源資料表中執行接收器表格。SQL INSERT INTO...在更複雜的應用程式中,在寫入接收器之前,您可能還有其他步驟來轉換資料。

    table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""")
  • 您必須在函數結尾新增另一個步驟,才main()能在本機執行應用程式:

    if is_local: table_result.wait()

    如果沒有這個陳述式,應用程式會在您在本機執行時立即終止。當您在 Apache Flink 的 Amazon 受管服務中執行應用程式時,不得執行此陳述式。

管理JAR相依性

一個 PyFlink 應用程序通常需要一個或多個連接器。本教學課程中的應用程式使用 Kinesis 連接器。由於 Apache Flink 在 Java 中執行JVM,因此無論您是否在 Python 中實作應用程式,連接器都會以JAR檔案的形式散佈。在適用於 Apache Flink 的 Amazon 受管服務上部署應用程式時,必須將這些相依性與應用程式封裝。

在這個例子中,我們展示了如何使用 Apache 的 Maven 獲取依賴關係和打包應用程序在 Apache Flink 託管服務上運行。

注意

還有其他方法來獲取和打包依賴關係。此範例示範可正確搭配一個或多個連接器運作的方法。它也可讓您在本機、開發和 Apache Flink 受管理服務上執行應用程式,而無需變更程式碼。

使用 pom.xml 檔案

阿帕奇 Maven 使用該pom.xml文件來控制依賴關係和應用程序打包。

任何JAR從屬關係都在<dependencies>...</dependencies>塊中的pom.xml文件中指定。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...

若要尋找要使用的正確加工品和連接器版本,請參閱搭配 Apache Flink 的受管理服務使用 Apache Flink 連接器。請確定您參考的是您正在使用的 Apache Flink 版本。在此範例中,我們使用 Kinesis 連接器。對於阿帕奇 Flink 1.19,連接器版本為。4.3.0-1.19

注意

如果您使用的是 Apache Flink 1.19,則沒有專門針對此版本發行的連接器版本。使用針對 1.18 版發行的連接器。

下載和封裝相依性

使用 Maven 下載pom.xml檔案中定義的相依性,並將它們封裝給 Python Flink 應用程式。

  1. 瀏覽至包含名為 Python 入門專案的目錄python/GettingStarted

  2. 執行以下命令:

$ mvn package

Maven 創建一個名為的新文件./target/pyflink-dependencies.jar。當您在電腦上進行本機開發時,Python 應用程式會尋找此檔案。

注意

如果您忘記運行此命令,當您嘗試運行應用程序時,它將失敗並顯示錯誤:找不到標識符「kinesis 的任何工廠

將樣本記錄寫入輸入流

在本節中,您會將範例記錄傳送至串流以供應用程式處理。您有兩個選項可以使用 Python 指令碼或 Kinesis 資料產生器來產生範例資料

使用 Python 腳本生成示例數據

您可以使用 Python 腳本將示例記錄發送到流。

注意

要運行這個 Python 腳本,你必須使用 Python 3.x 和有 Python(博托)庫安裝。AWS SDK

若要開始將測試資料傳送至 Kinesis 輸入串流:

  1. 從數據生成器 GitHub 存儲庫下載數據生成器 stock.py Python 腳本。

  2. 執行 stock.py 指令碼:

    $ python stock.py

當您完成教學課程的其餘部分時,請保持指令碼的執行狀態。您現在可以執行您的 Apache Flink 應用程式。

使用 Kinesis 資料產生器產生範例資料

或者,若要使用 Python 指令碼,您也可以使用託管版本提供的 Kinesis 資料產生器,將隨機範例資料傳送至串流。Kinesis 資料產生器會在您的瀏覽器中執行,而且您不需要在電腦上安裝任何項目。

若要設定和執行 Kinesis 資料產生器:

  1. 請遵循 Kinesis 資料產生器文件中的指示來設定工具的存取權。您將執行設定使用者和密碼的 AWS CloudFormation 範本。

  2. 透過 CloudFormation範本產生的存取 Kinesis 資料URL產生器。 CloudFormation 模板完成後,您可以URL在「輸出」選項卡中找到。

  3. 設定資料產生器:

    • 區域:選取您在此教學課程中使用的區域:us-east-1

    • 流/交付串流:選取應用程式將使用的輸入串流:ExampleInputStream

    • 每秒記錄數:100

    • 記錄範本:複製並貼上下列範本:

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. 測試範本:選擇「測試範本」,然後確認產生的記錄與下列內容類似:

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. 啟動資料產生器:選擇 「選取傳送資料」。

Kinesis 資料產生器現在正在將資料傳送至. ExampleInputStream

在本機執行應用程式

您可以在本地測試應用程序,從命令行運行python main.py或從IDE.

若要在本機執行應用程式,您必須安裝正確版本的程式 PyFlink庫,如上一節所述。有關更多信息,請參閱(鏈接)

注意

在繼續之前,請確認輸入和輸出串流是否可用。請參閱 建立兩個 Amazon Kinesis 資料串流。另外,請確認您具有從兩個串流讀取和寫入的權限。請參閱 驗證您的 AWS 會話

將 Python 項目導入到您的 IDE

若要開始在您的應用程式上工作IDE,您必須將它匯入為 Python 專案。

您複製的儲存庫包含多個範例。每個例子都是一個單獨的項目。在本教學課程中,請將./python/GettingStarted子目錄中的內容匯入您的IDE.

將程式碼匯入為現有的 Python 專案。

注意

導入新 Python 項目的確切過程取決於IDE您使用的項目。

檢查本地應用程序配置

在本機執行時,應用程式會使用下專application_properties.json案資源資料夾中檔案中的組態./src/main/resources。您可以編輯此檔案以使用不同的 Kinesis 串流名稱或區域。

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

在本機執行 Python 應用程式

您可以在本機執行應用程式,無論是從命令列做為一般 Python 指令碼,或從IDE.

若要從命令列執行應用程式
  1. 請確定獨立 Python 環境 (例如 Conda) 或您安裝 Python Flink 程式庫的 VirtualEnv 位置目前處於作用中狀態。

  2. 確保您至mvn package少跑了一次。

  3. 設定 IS_LOCAL = true 環境變數:

    $ export IS_LOCAL=true
  4. 運行應用程序作為一個常規的 Python 腳本。

    $python main.py
若要從中執行應用程式 IDE
  1. 配置您IDE使用以下配置運行main.py腳本:

    1. 使用獨立的 Python 環境,例如 Conda 或您安裝程式 PyFlink 庫的 VirtualEnv 位置。

    2. 使用 AWS 認證來存取輸入和輸出 Kinesis 資料串流。

    3. 設定 IS_LOCAL = true

  2. 設置運行配置的確切過程取決於您的IDE和變化。

  3. 設置完畢後IDE,請運行 Python 腳本並使用應用程序運行IDE時提供的工具。

在本機檢查應用程式

在本地運行時,除了應用程序啟動時打印和顯示的幾行之外,應用程序不會在控制台中顯示任何日誌。 PyFlink 將記錄寫入安裝 Python Flink 程式庫的目錄中的檔案中。應用程式會在啟動時列印記錄檔的位置。您也可以執行下列命令來尋找記錄檔:

$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
  1. 列出記錄目錄中的檔案。您通常會找到一個.log文件。

  2. 在應用程序運行時尾部文件:tail -f <log-path>/<log-file>.log

觀察 Kinesis 串流中的輸入和輸出資料

您可以使用 Amazon Kinesis Kinesis 主控台中的資料檢視器,觀察 (產生範例 Python) 或 Kinesis 資料產生 (連結) 傳送至輸入串流的記錄。

要觀察記錄:

停止應用程式在本機執行

停止在您的IDE. 通IDE常會提供「停止」選項。確切的位置和方法取決於IDE.

Package 您的應用程式碼

在本節中,您可以使用 Apache Maven 將應用程式程式碼和所有必要的相依性封裝在 .zip 檔案中。

再次運行 Maven 包命令:

$ mvn package

這個命令會產生檔案target/managed-flink-pyflink-getting-started-1.0.0.zip

將應用程式套件上傳到 Amazon S3 儲存貯體

在本節中,您將在上一節中建立的 .zip 檔案上傳到您在本教學開始時建立的 Amazon 簡單儲存服務 (Amazon S3) 儲存貯體。如果您尚未完成此步驟,請參閱(鏈接)。

上傳應用程式程式碼JAR檔案
  1. 在開啟 Amazon S3 主控台https://console.aws.amazon.com/s3/

  2. 選擇您先前為應用程式程式碼建立的值區。

  3. 選擇上傳

  4. 選擇 Add files (新增檔案)

  5. 導覽至在上一個步驟中產生的 .zip 檔案:target/managed-flink-pyflink-getting-started-1.0.0.zip

  6. 選擇「上傳」而不變更任何其他設定。

建立並設定 Apache Flink 應用程式的受管理服務

您可以使用主控台或. AWS CLI在本教程中,我們將使用控制台。

建立應用程式

  1. 在 https://console.aws.amazon.com /flink 開啟適用於阿帕奇 Flink 的受管理服務

  2. 確認已選取正確的區域:美國東部 (維吉尼亞北部) us-east-1。

  3. 開啟右側功能表並選擇 Apache Flink 應用程式,然後選擇建立串流應用程式。或者,從初始頁面的 [開始使用] 區段中選擇 [建立串流應用程式]。

  4. 在 [建立串流應用程式] 頁面上:

    • 對於選擇設定串流處理應用程式的方法,請選擇從頭開始建立

    • 對於阿帕奇 Flink 配置,應用程序 Flink 版本,請選擇阿帕奇 F link 1.19。

    • 對於應用程序配置

      • 應用程式名稱中,輸入 MyApplication

      • 對於 Description (說明),輸入 My Python test app

      • 在 [存取應用程式資源] 中,選擇 [建立/更新IAM角色運動分析 MyApplication--us- east-1 具有必要原則]。

    • 對於應用程式設定的範本

      • 針對範本,選擇開發

    • 選擇建立串流應用程式

注意

當您使用主控台為 Apache Flink 應用程式建立受管理服務時,您可以選擇為應用程式建立IAM角色和原則。應用程式使用此角色和政策來存取其相依資源。這些IAM資源會使用您的應用程式名稱和區域命名,如下所示:

  • 政策:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesisanalytics-MyApplication-us-west-2

Amazon 阿帕奇 Flink 託管服務以前被稱為 Kinesis Data Analytics。kinesis-analytics為了回溯相容,自動產生的資源名稱會加上前置詞。

編輯IAM策略

編輯政IAM策以新增存取 Amazon S3 儲存貯體的許可。

編輯IAM政策以新增 S3 儲存貯體許可
  1. 在開啟IAM主控台https://console.aws.amazon.com/iam/

  2. 選擇政策。選擇主控台為您在上一節所建立的 kinesis-analytics-service-MyApplication-us-east-1 政策。

  3. 選擇 [編輯],然後選擇索JSON引標籤。

  4. 將下列政策範例的反白部分新增至政策。取代範例帳戶 IDs (012345678901) 使用您的帳戶 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. 選擇下一步,然後選擇儲存變更

設定應用程式

編輯應用程式組態以設定應用程式程式碼人工因素。

設定應用程式
  1. MyApplication頁面上,選擇設定

  2. 應用程式程式碼位置區段中:

    • 對於 Amazon S3 儲存貯體,請選取先前為應用程式程式碼建立的儲存貯體。選擇「瀏覽」並選取正確的值區,然後選擇「選」。請勿選取值區名稱。

    • 對於 Amazon S3 物件的路徑,請輸入 managed-flink-pyflink-getting-started-1.0.0.zip

  3. 對於 [存取權限],選擇 [建立/更新kinesis-analytics-MyApplication-us-east-1具有必要策略的IAM角色]

  4. 移至「執行階段」屬性,並保留所有其他設定的預設值。

  5. 選擇「新增項目」,然後新增下列每個參數:

    群組 ID 金鑰
    InputStream0 stream.name ExampleInputStream
    InputStream0 flink.stream.initpos LATEST
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
    kinesis.analytics.flink.run.options python main.py
    kinesis.analytics.flink.run.options jarfile lib/pyflink-dependencies.jar
  6. 請勿修改任何其他區段,然後選擇 [儲存變更]。

注意

當您選擇啟用 Amazon 日誌 CloudWatch 記錄時,Apache Flink 的受管服務會為您建立日誌群組和日誌串流。這些資源的名稱如下所示:

  • 日誌群組:/aws/kinesis-analytics/MyApplication

  • 日誌串流:kinesis-analytics-log-stream

執行應用程式

該應用程序現在已配置並準備好運行。

執行應用程式
  1. 在適用於 Apache Flink 的 Amazon 受管服務的主控台上,選擇我的應用程式,然後選擇執行

  2. 在下一頁的 [應用程式還原組態] 頁面上,選擇 [以最新快照執行],然後選擇 [執行]。

    應用程式中的狀態」 詳細資訊會從Ready應用程式啟動到再轉換為應用程式的Running時間。Starting

當應用程式處於狀Running態時,您現在可以開啟 Flink 儀表板。

開啟 儀表板
  1. 選擇「開啟阿帕奇 Flink 儀表板」。儀表板會在新頁面上開啟。

  2. 在「執行工作」清單中,選擇您可以看到的單一工作。

    注意

    如果您設定執行階段屬性或編輯不正確的IAM原則,應用程式狀態可能會變成Running,但 Flink 儀表板會顯示工作正在持續重新啟動。如果應用程式設定錯誤或缺少存取外部資源的權限,這是常見的失敗案例。

    發生這種情況時,請檢查 Flink 儀表板中的「例外」標籤以查看問題的原因。

觀察正在運行的應用程序的指標

MyApplication頁面的 Amazon CloudWatch 指標區段中,您可以看到執行中應用程式中的一些基本指標。

若要檢視測量結果
  1. 在「重新整理」按鈕旁邊,從下拉式清單中選取 10 秒

  2. 當應用程式執行且健康狀態良好時,您可以看到正常執行時指標不斷增加。

  3. 完整重新啟動指標應為零。如果正在增加,則配置可能有問題。若要調查問題,請檢閱 Flink 儀表板上的「例外」標籤。

  4. 在運作狀況良好的應用程式中,失敗的檢查點數量應為零。

    注意

    此儀表板會顯示一組固定的指標,精細度為 5 分鐘。您可以使用儀表板中的任何指標來建立自訂應用程式 CloudWatch 儀表板。

觀察 Kinesis 串流中的輸出資料

確定您仍在使用 Python 指令碼或 Kinesis 資料產生器將資料發佈至輸入。

您現在可以使用中的「資料檢視器」,觀察在 Apache Flink 受管理服務上執行之應用程式的輸出 https://console.aws.amazon.com/kinesis/,與您之前已執行的操作類似。

若要檢視輸出
  1. https://console.aws.amazon.com/Kinesis 處開啟室壁運動主控台。

  2. 確認區域與您用來執行此自學課程的區域相同。依預設,它是美國東部-1 美國東部 (維吉尼亞北部)。如有必要,請變更「區域」。

  3. 選擇「資料串流」。

  4. 選取您要觀察的串流。在本教學課程中,使用 ExampleOutputStream

  5. 選擇「資料檢視器」標籤。

  6. 選取任何 [碎片],將 [最新] 保留為起始位置,然後選擇 [取得記錄]。您可能會看到「找不到此要求的記錄」錯誤訊息。如果是這樣,請選擇重試取得記錄。發佈至串流的最新記錄會顯示。

  7. 在「資料」欄中選取值,以JSON格式檢查記錄的內容。

停止應用程式

若要停止應用程式,請移至名為的 Apache Flink 應用程式的受管理服務的主控台頁面。MyApplication

停止應用程式
  1. 從「動作」下拉式清單中,選擇「停止」。

  2. 應用程式中的狀態」 詳細資訊Ready會從應Running用程式完全停止轉換為Stopping,然後再轉換為應用程式的

    注意

    別忘了也停止將資料從 Python 指令碼或 Kinesis 資料產生器傳送至輸入串流。

下一步驟

清理 AWS 資源