Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
為 Python 應用程序創建並運行阿帕奇 Flink 的託管服務
在本節中,您將為 Python 應用程式建立 Apache Flink 應用程式的受管理服務,並將 Kinesis 串流作為來源和接收器。
本節包含下列步驟:
建立相依資源
在為本練習建立 Managed Service for Apache Flink 之前,先建立下列相依資源:
-
兩個 Kinesis 串流,用於輸入和輸出。
-
用於存放應用程式程式碼的 Amazon S3 儲存貯體。
注意
本教學課程假設您正在 us-east-1 區域中部署應用程式。如果您使用其他區域,則必須相應地調整所有步驟。
建立兩個 Kinesis 串流
在為本練習建立適用於 Apache Flink 應用程式的受管服務之前,請在您將用來部署應用程式的相同區域中建立兩個 Kinesis 資料串流 (ExampleInputStream
和ExampleOutputStream
) (在此範例中為 us-east-1)。您的應用程式會將這些串流用於應用程式來源和目的地串流。
您可以使用 Amazon Kinesis 主控台或以下 AWS CLI 命令來建立這些串流。如需主控台指示,請參閱《Amazon Kinesis Data Streams 開發人員指南》中的建立和更新資料串流。
建立資料串流 (AWS CLI)
-
若要建立第一個串流 (
ExampleInputStream
),請使用下列 Amazon Kinesiscreate-stream
AWS CLI 指令。$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1
-
若要建立應用程式用來寫入輸出的第二個串流,請執行相同的命令,將串流名稱變更為
ExampleOutputStream
。$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1
建立 Amazon S3 儲存貯體
您可以使用主控台建立 Amazon S3 儲存貯體。如需建立這些資源的相關指示,請參閱以下主題:
-
《Amazon Simple Storage Service 使用者指南》中的如何建立 S3 儲存貯體。為 Amazon S3 儲存貯體指定一個全域唯一的名稱,例如透過附加您的登入名稱。
注意
請務必在您用於本教學課程 (us-east-1) 的區域中建立 S3 儲存貯體。
其他資源
當您建立應用程式時,Apache Flink 的受管服務會建立下列 Amazon CloudWatch 資源 (如果這些資源尚未存在):
-
名為
/AWS/KinesisAnalytics-java/<my-application>
的日誌群組。 -
名為
kinesis-analytics-log-stream
的日誌串流。
設定您的本機開發環境
對於開發和調試,您可以在計算機上運行 Python Flink 應用程序。您可以使用python
main.py
或在您選擇IDE的 Python 中從命令行啟動應用程序。
注意
在你的開發機器上,你必須有 Python 3.10 或 3.11,Java 11,阿帕奇 Maven 和 Git 安裝。我們建議您使用IDE如PyCharm
安裝程 PyFlink 式庫
若要開發應用程式並在本機執行,您必須安裝 Flink Python 程式庫。
-
使用 VirtualEnv,康達或任何類似的 Python 工具創建一個獨立的 Python 環境。
-
在該環境中安裝 PyFlink 程式庫。使用您將在阿帕奇 Flink 的 Amazon 受管服務中使用的相同 Apache Flink 執行階段版本。目前,建議的執行階段為 1.19.1。
$ pip install apache-flink==1.19.1
-
當您執行應用程式時,請確定環境處於作用中狀態。如果您在中執行應用程式IDE,請確定IDE使用環境做為執行階段。該過程取決於您IDE正在使用的。
注意
您只需要安裝程 PyFlink 式庫即可。您不需要在您的機器上安裝 Apache Flink 叢集。
驗證您的 AWS 會話
應用程式會使用 Kinesis 資料串流來發佈資料。在本機執行時,您必須擁有具有寫入 Kinesis 資料串流之權限的有效 AWS 驗證工作階段。請使用下列步驟來驗證您的工作階段:
-
如果您沒有設定 AWS CLI 具有有效認證的具名設定檔,請參閱設置 AWS Command Line Interface (AWS CLI)。
-
發佈下列測試記錄,確認您的設定正確,且您的使用者具有寫入 Kinesis 資料串流的權限: AWS CLI
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
-
如果您IDE有要整合的外掛程式 AWS,您可以使用該外掛程式將憑證傳遞至在IDE. 如需詳細資訊,請參閱AWS 適用於 PyCharm
Visual Studio 程式碼的AWS 工具AWS 組和 IntelliJ IDEA 的工具組 。
下載並檢查阿帕奇 Flink 流 Python 代碼
此範例的 Python 應用程式程式碼可從中取得 GitHub。若要下載應用程式的程式碼,請執行下列動作:
-
使用以下指令複製遠端儲存庫:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
-
導覽至
./python/GettingStarted
目錄。
檢閱應用程式元
應用程式程式碼位於中main.py
。我們使用SQL嵌入在 Python 中來定義應用程序的流程。
注意
為了獲得最佳化的開發人員體驗,應用程式的設計目的是在適用於 Apache Flink 的 Amazon 受管服務和本機上進行開發時,不需要變更任何程式碼即可執行。應用程式會使用環境變數IS_LOCAL =
true
來偵測何時在本機執行。您必須在 shell 上IS_LOCAL = true
或IDE. 的運行配置中設置環境變量
-
應用程式會設定執行環境並讀取執行階段組態。若要同時在適用於 Apache Flink 和本機的 Amazon 受管服務上運作,應用程式會檢查
IS_LOCAL
變數。-
以下是應用程式在適用於 Apache Flink 的 Amazon 受管服務中執行時的預設行為:
-
載入與應用程式封裝的相依性。有關更多信息,請參閱(鏈接)
-
從您在 Apache Flink 應用程式的 Amazon 受管服務中定義的執行階段屬性載入組態。有關更多信息,請參閱(鏈接)
-
-
當應用程式偵測到您在本機執行應用程式
IS_LOCAL = true
時:-
從專案載入外部相依性。
-
從專案中包含的
application_properties.json
檔案載入規劃。... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
-
-
-
應用程式會使用 Kinesis 連接器
來定義含 CREATE TABLE
陳述式的來源資料表。此表格會從輸入 Kinesis 串流讀取資料。該應用程序從運行時配置獲取流的名稱,區域和初始位置。table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """)
-
在此範例中,應用程式也會使用 Kinesis 連接器來定義接
收表。這個故事將數據發送到輸出 Kinesis 流。 table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""")
-
最後,應用程式會從來源資料表中執行接收器表格。SQL
INSERT INTO...
在更複雜的應用程式中,在寫入接收器之前,您可能還有其他步驟來轉換資料。table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""")
-
您必須在函數結尾新增另一個步驟,才
main()
能在本機執行應用程式:if is_local: table_result.wait()
如果沒有這個陳述式,應用程式會在您在本機執行時立即終止。當您在 Apache Flink 的 Amazon 受管服務中執行應用程式時,不得執行此陳述式。
管理JAR相依性
一個 PyFlink 應用程序通常需要一個或多個連接器。本教學課程中的應用程式使用 Kinesis 連接器
在這個例子中,我們展示了如何使用 Apache 的 Maven 獲取依賴關係和打包應用程序在 Apache Flink 託管服務上運行。
注意
還有其他方法來獲取和打包依賴關係。此範例示範可正確搭配一個或多個連接器運作的方法。它也可讓您在本機、開發和 Apache Flink 受管理服務上執行應用程式,而無需變更程式碼。
使用 pom.xml 檔案
阿帕奇 Maven 使用該pom.xml
文件來控制依賴關係和應用程序打包。
任何JAR從屬關係都在<dependencies>...</dependencies>
塊中的pom.xml
文件中指定。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...
若要尋找要使用的正確加工品和連接器版本,請參閱搭配 Apache Flink 的受管理服務使用 Apache Flink 連接器。請確定您參考的是您正在使用的 Apache Flink 版本。在此範例中,我們使用 Kinesis 連接器。對於阿帕奇 Flink 1.19,連接器版本為。4.3.0-1.19
注意
如果您使用的是 Apache Flink 1.19,則沒有專門針對此版本發行的連接器版本。使用針對 1.18 版發行的連接器。
下載和封裝相依性
使用 Maven 下載pom.xml
檔案中定義的相依性,並將它們封裝給 Python Flink 應用程式。
-
瀏覽至包含名為 Python 入門專案的目錄
python/GettingStarted
。 -
執行以下命令:
$ mvn package
Maven 創建一個名為的新文件./target/pyflink-dependencies.jar
。當您在電腦上進行本機開發時,Python 應用程式會尋找此檔案。
注意
如果您忘記運行此命令,當您嘗試運行應用程序時,它將失敗並顯示錯誤:找不到標識符「kinesis 的任何工廠。
將樣本記錄寫入輸入流
在本節中,您會將範例記錄傳送至串流以供應用程式處理。您有兩個選項可以使用 Python 指令碼或 Kinesis 資料產生器來產生範例資料
使用 Python 腳本生成示例數據
您可以使用 Python 腳本將示例記錄發送到流。
注意
要運行這個 Python 腳本,你必須使用 Python 3.x 和有 Python(博托)
若要開始將測試資料傳送至 Kinesis 輸入串流:
-
從數據生成器 GitHub 存儲庫下載數據生成器
stock.py
Python 腳本。 -
執行
stock.py
指令碼:$ python stock.py
當您完成教學課程的其餘部分時,請保持指令碼的執行狀態。您現在可以執行您的 Apache Flink 應用程式。
使用 Kinesis 資料產生器產生範例資料
或者,若要使用 Python 指令碼,您也可以使用託管版本
若要設定和執行 Kinesis 資料產生器:
-
請遵循 Kinesis 資料產生器文件
中的指示來設定工具的存取權。您將執行設定使用者和密碼的 AWS CloudFormation 範本。 -
透過 CloudFormation範本產生的存取 Kinesis 資料URL產生器。 CloudFormation 模板完成後,您可以URL在「輸出」選項卡中找到。
-
設定資料產生器:
-
區域:選取您在此教學課程中使用的區域:us-east-1
-
串流/交付串流:選取應用程式將使用的輸入串流:
ExampleInputStream
-
每秒記錄數:100
-
記錄範本:複製並貼上下列範本:
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
測試範本:選擇「測試範本」,然後確認產生的記錄與下列內容類似:
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
-
啟動資料產生器:選擇 「選取傳送資料」。
Kinesis 資料產生器現在正在將資料傳送至. ExampleInputStream
在本機執行應用程式
您可以在本地測試應用程序,從命令行運行python main.py
或從IDE.
若要在本機執行應用程式,您必須安裝正確版本的程式 PyFlink庫,如上一節所述。有關更多信息,請參閱(鏈接)
注意
在繼續之前,請確認輸入和輸出串流是否可用。請參閱 建立兩個 Amazon Kinesis 資料串流。另外,請確認您具有從兩個串流讀取和寫入的權限。請參閱 驗證您的 AWS 會話。
將 Python 項目導入到您的 IDE
若要開始在您的應用程式上工作IDE,您必須將它匯入為 Python 專案。
您複製的儲存庫包含多個範例。每個例子都是一個單獨的項目。在本教學課程中,請將./python/GettingStarted
子目錄中的內容匯入您的IDE.
將程式碼匯入為現有的 Python 專案。
注意
導入新 Python 項目的確切過程取決於IDE您使用的項目。
檢查本地應用程序配置
在本機執行時,應用程式會使用下專application_properties.json
案資源資料夾中檔案中的組態./src/main/resources
。您可以編輯此檔案以使用不同的 Kinesis 串流名稱或區域。
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
在本機執行 Python 應用程式
您可以在本機執行應用程式,無論是從命令列做為一般 Python 指令碼,或從IDE.
若要從命令列執行應用程式
-
請確定獨立 Python 環境 (例如 Conda) 或您安裝 Python Flink 程式庫的 VirtualEnv 位置目前處於作用中狀態。
-
確保您至
mvn package
少跑了一次。 -
設定
IS_LOCAL = true
環境變數:$ export IS_LOCAL=true
-
運行應用程序作為一個常規的 Python 腳本。
$python main.py
若要從中執行應用程式 IDE
-
配置您IDE使用以下配置運行
main.py
腳本:-
使用獨立的 Python 環境,例如 Conda 或您安裝程式 PyFlink 庫的 VirtualEnv 位置。
-
使用 AWS 認證來存取輸入和輸出 Kinesis 資料串流。
-
設定
IS_LOCAL = true
。
-
-
設置運行配置的確切過程取決於您的IDE和變化。
-
設置完畢後IDE,請運行 Python 腳本並使用應用程序運行IDE時提供的工具。
在本機檢查應用程式
在本地運行時,除了應用程序啟動時打印和顯示的幾行之外,應用程序不會在控制台中顯示任何日誌。 PyFlink 將記錄寫入安裝 Python Flink 程式庫的目錄中的檔案中。應用程式會在啟動時列印記錄檔的位置。您也可以執行下列命令來尋找記錄檔:
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
-
列出記錄目錄中的檔案。您通常會找到一個
.log
文件。 -
在應用程序運行時尾部文件:
tail -f <log-path>/<log-file>.log
。
觀察 Kinesis 串流中的輸入和輸出資料
您可以使用 Amazon Kinesis Kinesis 主控台中的資料檢視器,觀察 (產生範例 Python) 或 Kinesis 資料產生器 (連結) 傳送至輸入串流的記錄。
要觀察記錄:
停止應用程式在本機執行
停止在您的IDE. 通IDE常會提供「停止」選項。確切的位置和方法取決於IDE.
Package 您的應用程式碼
在本節中,您可以使用 Apache Maven 將應用程式程式碼和所有必要的相依性封裝在 .zip 檔案中。
再次運行 Maven 包命令:
$ mvn package
這個命令會產生檔案target/managed-flink-pyflink-getting-started-1.0.0.zip
。
將應用程式套件上傳到 Amazon S3 儲存貯體
在本節中,您將在上一節中建立的 .zip 檔案上傳到您在本教學開始時建立的 Amazon 簡單儲存服務 (Amazon S3) 儲存貯體。如果您尚未完成此步驟,請參閱(鏈接)。
上傳應用程式程式碼JAR檔案
在開啟 Amazon S3 主控台https://console.aws.amazon.com/s3/
。 -
選擇您先前為應用程式程式碼建立的值區。
-
選擇上傳。
-
選擇 Add files (新增檔案)。
-
導覽至在上一個步驟中產生的 .zip 檔案:
target/managed-flink-pyflink-getting-started-1.0.0.zip
。 -
選擇「上傳」而不變更任何其他設定。
建立並設定 Apache Flink 應用程式的受管理服務
您可以使用主控台或. AWS CLI在本教程中,我們將使用控制台。
建立應用程式
在 https://console.aws.amazon.com /flink 開啟適用於阿帕奇 Flink 的受管理服務
-
確認已選取正確的區域:美國東部 (維吉尼亞北部) us-east-1。
-
開啟右側功能表並選擇 Apache Flink 應用程式,然後選擇建立串流應用程式。或者,從初始頁面的 [開始使用] 區段中選擇 [建立串流應用程式]。
-
在 [建立串流應用程式] 頁面上:
-
對於選擇設定串流處理應用程式的方法,請選擇從頭開始建立。
-
對於阿帕奇 Flink 配置,應用程序 Flink 版本,請選擇阿帕奇 F link 1.19。
-
對於應用程序配置:
-
在應用程式名稱中,輸入
MyApplication
。 -
對於 Description (說明),輸入
My Python test app
。 -
在 [存取應用程式資源] 中,選擇 [建立/更新IAM角色運動分析 MyApplication--us- east-1 具有必要原則]。
-
-
對於應用程式設定的範本:
-
針對範本,選擇開發。
-
-
選擇建立串流應用程式。
-
注意
當您使用主控台為 Apache Flink 應用程式建立受管理服務時,您可以選擇為應用程式建立IAM角色和原則。應用程式使用此角色和政策來存取其相依資源。這些IAM資源會使用您的應用程式名稱和區域命名,如下所示:
-
政策:
kinesis-analytics-service-
MyApplication
-us-west-2
-
角色:
kinesisanalytics-
MyApplication
-us-west-2
Amazon 阿帕奇 Flink 託管服務以前被稱為 Kinesis Data Analytics。kinesis-analytics
為了回溯相容,自動產生的資源名稱會加上前置詞。
編輯IAM策略
編輯政IAM策以新增存取 Amazon S3 儲存貯體的許可。
編輯IAM政策以新增 S3 儲存貯體許可
在開啟IAM主控台https://console.aws.amazon.com/iam/
。 -
選擇政策。選擇主控台為您在上一節所建立的
kinesis-analytics-service-MyApplication-us-east-1
政策。 -
選擇 [編輯],然後選擇索JSON引標籤。
-
將下列政策範例的反白部分新增至政策。取代範例帳戶 IDs (
012345678901
) 使用您的帳戶 ID。{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:
012345678901
:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901
:stream/ExampleOutputStream" } -
選擇下一步,然後選擇儲存變更。
設定應用程式
編輯應用程式組態以設定應用程式程式碼人工因素。
設定應用程式
-
在MyApplication頁面上,選擇設定。
-
在應用程式程式碼位置區段中:
-
對於 Amazon S3 儲存貯體,請選取先前為應用程式程式碼建立的儲存貯體。選擇「瀏覽」並選取正確的值區,然後選擇「選擇」。請勿選取值區名稱。
-
對於 Amazon S3 物件的路徑,請輸入
managed-flink-pyflink-getting-started-1.0.0.zip
。
-
-
對於 [存取權限],選擇 [建立/更新
kinesis-analytics-MyApplication-us-east-1
具有必要策略的IAM角色] -
移至「執行階段」屬性,並保留所有其他設定的預設值。
-
選擇「新增項目」,然後新增下列每個參數:
群組 ID 金鑰 值 InputStream0
stream.name
ExampleInputStream
InputStream0
flink.stream.initpos
LATEST
InputStream0
aws.region
us-east-1
OutputStream0
stream.name
ExampleOutputStream
OutputStream0
aws.region
us-east-1
kinesis.analytics.flink.run.options
python
main.py
kinesis.analytics.flink.run.options
jarfile
lib/pyflink-dependencies.jar
-
請勿修改任何其他區段,然後選擇 [儲存變更]。
注意
當您選擇啟用 Amazon 日誌 CloudWatch 記錄時,Apache Flink 的受管服務會為您建立日誌群組和日誌串流。這些資源的名稱如下所示:
-
日誌群組:
/aws/kinesis-analytics/MyApplication
-
日誌串流:
kinesis-analytics-log-stream
執行應用程式
該應用程序現在已配置並準備好運行。
執行應用程式
-
在適用於 Apache Flink 的 Amazon 受管服務的主控台上,選擇我的應用程式,然後選擇執行。
-
在下一頁的 [應用程式還原組態] 頁面上,選擇 [以最新快照執行],然後選擇 [執行]。
「應用程式中的狀態」 詳細資訊會從
Ready
應用程式啟動到再轉換為應用程式的Running
時間。Starting
當應用程式處於狀Running
態時,您現在可以開啟 Flink 儀表板。
開啟 儀表板
-
選擇「開啟阿帕奇 Flink 儀表板」。儀表板會在新頁面上開啟。
-
在「執行工作」清單中,選擇您可以看到的單一工作。
注意
如果您設定執行階段屬性或編輯不正確的IAM原則,應用程式狀態可能會變成
Running
,但 Flink 儀表板會顯示工作正在持續重新啟動。如果應用程式設定錯誤或缺少存取外部資源的權限,這是常見的失敗案例。發生這種情況時,請檢查 Flink 儀表板中的「例外」標籤以查看問題的原因。
觀察正在運行的應用程序的指標
在MyApplication頁面的 Amazon CloudWatch 指標區段中,您可以看到執行中應用程式中的一些基本指標。
若要檢視測量結果
-
在「重新整理」按鈕旁邊,從下拉式清單中選取 10 秒。
-
當應用程式執行且健康狀態良好時,您可以看到正常執行時間指標不斷增加。
-
完整重新啟動指標應為零。如果正在增加,則配置可能有問題。若要調查問題,請檢閱 Flink 儀表板上的「例外」標籤。
-
在運作狀況良好的應用程式中,失敗的檢查點數量應為零。
注意
此儀表板會顯示一組固定的指標,精細度為 5 分鐘。您可以使用儀表板中的任何指標來建立自訂應用程式 CloudWatch 儀表板。
觀察 Kinesis 串流中的輸出資料
確定您仍在使用 Python 指令碼或 Kinesis 資料產生器將資料發佈至輸入。
您現在可以使用中的「資料檢視器」,觀察在 Apache Flink 受管理服務上執行之應用程式的輸出 https://console.aws.amazon.com/kinesis/
若要檢視輸出
在 https://console.aws.amazon.com/
Kinesis 處開啟室壁運動主控台。 -
確認區域與您用來執行此自學課程的區域相同。依預設,它是美國東部-1 美國東部 (維吉尼亞北部)。如有必要,請變更「區域」。
-
選擇「資料串流」。
-
選取您要觀察的串流。在本教學課程中,使用
ExampleOutputStream
。 -
選擇「資料檢視器」標籤。
-
選取任何 [碎片],將 [最新] 保留為起始位置,然後選擇 [取得記錄]。您可能會看到「找不到此要求的記錄」錯誤訊息。如果是這樣,請選擇重試取得記錄。發佈至串流的最新記錄會顯示。
-
在「資料」欄中選取值,以JSON格式檢查記錄的內容。
停止應用程式
若要停止應用程式,請移至名為的 Apache Flink 應用程式的受管理服務的主控台頁面。MyApplication
停止應用程式
-
從「動作」下拉式清單中,選擇「停止」。
-
「應用程式中的狀態」 詳細資訊
Ready
會從應Running
用程式完全停止轉換為Stopping
,然後再轉換為應用程式的注意
別忘了也停止將資料從 Python 指令碼或 Kinesis 資料產生器傳送至輸入串流。