Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
為 Apache Flink 應用程式建立並執行受管理的服務
在此步驟中,您會建立 Apache Flink 應用程式的受管服務,並將 Kinesis 資料串流當做來源和接收器。
本節包含下列步驟:
建立相依資源
在為本練習建立 Managed Service for Apache Flink 應用程式之前,先建立下列相依資源:
-
用於輸入和輸出的兩個 Kinesis 資料串流
-
用於存放應用程式程式碼的 Amazon S3 儲存貯體
注意
本教學課程假設您正在 us-east-1 美國東部 (維吉尼亞北部) 區域部署應用程式。如果您使用其他區域,請相應地調整所有步驟。
建立兩個 Amazon Kinesis 資料串流
在為本練習建立 Managed Service for Apache Flink 應用程式之前,請先建立兩個 Kinesis 資料串流 (ExampleInputStream
和 ExampleOutputStream
)。您的應用程式會將這些串流用於應用程式來源和目的地串流。
您可以使用 Amazon Kinesis 主控台或下列 AWS CLI 命令來建立這些串流。如需主控台指示,請參閱《Amazon Kinesis Data Streams 開發人員指南》中的建立和更新資料串流。若要使用建立串流 AWS CLI,請使用下列指令,調整至您用於應用程式的 [區域]。
建立資料串流 (AWS CLI)
-
若要建立第一個串流 (
ExampleInputStream
),請使用下列 Amazon Kinesiscreate-stream
AWS CLI 命令:$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \
-
若要建立應用程式用來寫入輸出的第二個資料流,請執行相同的命令,將串流名稱變更為
ExampleOutputStream
:$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \
為應用程式程式碼建立 Amazon S3 儲存貯體
您可以使用主控台建立 Amazon S3 儲存貯體。若要了解如何使用主控台建立 Amazon S3 儲存貯體,請參閱 Amazon S3 使用者指南中的建立儲存貯體。使用全域唯一名稱來命名 Amazon S3 儲存貯體,例如透過附加您的登入名稱。
注意
請務必在您用於本教學課程的「地區」中建立值區 (us-east-1)。
其他資源
當您建立應用程式時,Apache Flink 的受管服務會自動建立下列 Amazon CloudWatch 資源 (如果這些資源尚未存在):
-
名為
/AWS/KinesisAnalytics-java/<my-application>
的日誌群組。 -
名為
kinesis-analytics-log-stream
的日誌串流。
設定您的本機開發環境
對於開發和除錯,您可以直接從您選擇IDE的電腦上執行 Apache Flink 應用程式。任何阿帕奇 Flink 依賴關係都像使用阿帕奇 Maven 的常規 Java 依賴關係處理。
注意
在你的開發機器上,你必須安裝 Java JDK 11,Maven 和 Git。我們建議您使用開發環境,如日食 Java 霓虹燈
驗證您的 AWS 會話
應用程式會使用 Kinesis 資料串流來發佈資料。在本機執行時,您必須擁有具有寫入 Kinesis 資料串流之權限的有效 AWS 驗證工作階段。請使用下列步驟來驗證您的工作階段:
-
如果您沒有設定 AWS CLI 具有有效認證的具名設定檔,請參閱設置 AWS Command Line Interface (AWS CLI)。
-
發佈下列測試記錄,確認您的設定正確,且您的使用者具有寫入 Kinesis 資料串流的權限: AWS CLI
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
-
如果您IDE有要整合的外掛程式 AWS,您可以使用該外掛程式將憑證傳遞至在IDE. 如需詳細資訊,請參閱 IntelliJ 的AWS 工具AWS
組IDEA和 Eclipse 的工具組。
下載並檢查阿帕奇 Flink 流 Java 代碼
此範例的 Java 應用程式程式碼可從中取得 GitHub。若要下載應用程式的程式碼,請執行下列動作:
-
使用以下指令複製遠端儲存庫:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
-
導覽至
amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted
目錄。
檢閱應用程式元
該應用程序完全在com.amazonaws.services.msf.BasicStreamingJob
類中實現。此方main()
法會定義處理串流資料並執行資料流程的資料流程。
注意
為了獲得最佳化的開發人員體驗,該應用程式的設計目的是在適用於 Apache Flink 的 Amazon 受管服務和本機上執行任何程式碼變更,以便在您IDE的.
-
若要讀取執行階段組態,以便在 Apache Flink 的 Amazon 受管服務中執行時運作IDE,應用程式會自動偵測它是否在IDE. 在這種情況下,應用程序以不同的方式加載運行時配置:
-
當應用程序檢測到它在您的獨立模式下運行時IDE,形成包含在項目資源
application_properties.json
文件夾中的文件。檔案的內容如下。 -
當應用程式在適用於 Apache Flink 的 Amazon 受管服務中執行時,預設行為會從您將在 Apache Flink 應用程式的 Amazon 受管服務中定義的執行時期屬性載入應用程式組態。請參閱 建立並設定 Apache Flink 應用程式的受管理服務。
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
-
-
該
main()
方法定義應用程序數據流並運行它。-
初始化預設串流環境。在此範例中,我們將示範如何同時建立
StreamExecutionEnvironment
要與 DataSteam API和搭配使用StreamTableEnvironment
的和 Table 一起SQL使用的API。這兩個環境對象是對同一運行時環境的兩個單獨的引用,以使用不同的APIs。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
載入應用程式組態參數。這將自動從正確的位置加載它們,具體取決於應用程序運行的位置:
Map<String, Properties> applicationParameters = loadApplicationProperties(env);
-
應用程式使用 Kinesis 取用者
連接器定義來源,從輸入串流讀取資料。輸入流的配置在 PropertyGroupId
= 中定義InputStream0
。串流的名稱和區域aws.region
分別位於命名stream.name
的屬性中。為了簡單起見,此源將記錄讀取為字符串。private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... }
-
接著,應用程式會使用 Kinesis Streams 接收器定義接收
器,將資料傳送至輸出串流。輸出流名稱和區域在 PropertyGroupId
= 中定義OutputStream0
,類似於輸入流。接收器直接連接到從源獲取數據的內部DataStream
。在真實的應用程序中,您在源和接收器之間進行了一些轉換。private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... }
-
最後,您執行剛才定義的資料流程。這必須是該
main()
方法的最後一條指令,在定義了數據流需要的所有運算符之後:env.execute("Flink streaming Java API skeleton");
-
使用 pom.xml 檔案
pom.xml 文件定義了應用程序所需的所有依賴關係,並設置了 Maven Shade 插件來構建包含 Flink 所需的所有依賴關係的脂肪罐。
-
一些依賴關係具有
provided
範圍。當應用程式在適用於 Apache Flink 的 Amazon 受管服務中執行時,這些相依性會自動提供。他們需要編譯應用程式,或在IDE. 如需詳細資訊,請參閱在本機執行應用程式。請確定您使用的 Flink 版本與您將在 Apache Flink 的 Amazon 受管服務中使用的執行階段相同。<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
您必須將其他 Apache Flink 相依性新增至具有預設範圍的 pom,例如此應用程式使用的 Kinesis 連接器
。如需詳細資訊,請參閱搭配 Apache Flink 的受管理服務使用 Apache Flink 連接器。您也可以新增應用程式所需的任何其他 Java 相依性。 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
-
Maven Java 編譯器插件可以確保代碼是根據 Java 11 編譯的,這是阿帕奇 Flink 目前支持的JDK版本。
-
Maven Shade 插件包裝脂肪罐,不包括運行時提供的一些庫。它還指定了兩個變壓器:
ServicesResourceTransformer
和ManifestResourceTransformer
。後者配置包含啟動應用程序的main
方法的類。如果重命名主類,請不要忘記更新此變壓器。 -
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>
將樣本記錄寫入輸入流
在本節中,您會將範例記錄傳送至串流以供應用程式處理。您有兩個選項可以使用 Python 指令碼或 Kinesis 資料產生器來產生範例資料
使用 Python 腳本生成示例數據
您可以使用 Python 腳本將示例記錄發送到流。
注意
要運行這個 Python 腳本,你必須使用 Python 3.x 和有 Python(博托)
若要開始將測試資料傳送至 Kinesis 輸入串流:
-
從數據生成器 GitHub 存儲庫下載數據生成器
stock.py
Python 腳本。 -
執行
stock.py
指令碼:$ python stock.py
當您完成教學課程的其餘部分時,請保持指令碼的執行狀態。您現在可以執行您的 Apache Flink 應用程式。
使用 Kinesis 資料產生器產生範例資料
或者,若要使用 Python 指令碼,您也可以使用託管版本
若要設定和執行 Kinesis 資料產生器:
-
請遵循 Kinesis 資料產生器文件
中的指示來設定工具的存取權。您將執行設定使用者和密碼的 AWS CloudFormation 範本。 -
透過 CloudFormation範本產生的存取 Kinesis 資料URL產生器。 CloudFormation 模板完成後,您可以URL在「輸出」選項卡中找到。
-
設定資料產生器:
-
區域:選取您在此教學課程中使用的區域:us-east-1
-
串流/交付串流:選取應用程式將使用的輸入串流:
ExampleInputStream
-
每秒記錄數:100
-
記錄範本:複製並貼上下列範本:
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
測試範本:選擇「測試範本」,然後確認產生的記錄與下列內容類似:
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
-
啟動資料產生器:選擇 「選取傳送資料」。
Kinesis 資料產生器現在正在將資料傳送至. ExampleInputStream
在本機執行應用程式
您可以在. IDE
注意
在繼續之前,請確認輸入和輸出串流是否可用。請參閱 建立兩個 Amazon Kinesis 資料串流。另外,請確認您具有從兩個串流讀取和寫入的權限。請參閱 驗證您的 AWS 會話。
設置本地開發環境需要 Java 11JDK,阿帕奇 Maven 的,並IDE用於 Java 開發。確認您符合必要的先決條件。請參閱 滿足完成練習的先決條件。
將 Java 專案匯入您的 IDE
若要在中開始處理應用程式IDE,您必須將其匯入為 Java 專案。
您複製的儲存庫包含多個範例。每個例子都是一個單獨的項目。在本教學課程中,請將./java/GettingStarted
子目錄中的內容匯入您的IDE.
將代碼作為使用 Maven 的現有 Java 項目插入。
注意
導入新 Java 項目的確切過程根據IDE您使用的不同而有所不同。
檢查本地應用程序配置
在本機執行時,應用程式會使用下專application_properties.json
案資源資料夾中檔案中的組態./src/main/resources
。您可以編輯此檔案以使用不同的 Kinesis 串流名稱或區域。
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
設置您的IDE運行配置
您可以透過執行主類別com.amazonaws.services.msf.BasicStreamingJob
,IDE直接從您的 Flink 應用程式執行和偵錯,就像執行任何 Java 應用程式一樣。在運行應用程序之前,您必須設置運行配置。設置取決於IDE您正在使用的。例如,請參閱 Intelli IDEA J 文檔中的運行/調試配置
-
將
provided
依賴關係添加到類路徑中。這是必要的,以確保在本地運行時將具有provided
範圍的依賴關係傳遞給應用程序。如果沒有此設定,應用程式會立即顯示class not found
錯誤。 -
傳遞 AWS 認證以存取 Kinesis 串流至應用程式。最快的方法是使用 IntelliJ IDEA 的AWS 工具包
。在 Run 配置中使用此IDE插件,您可以選擇特定的配置 AWS 文件。 AWS 使用此設定檔進行驗證。您不需要直接傳遞 AWS 憑證。 -
驗證是否使用 JDK11 IDE 執行應用程式。
在您的中運行應用程序 IDE
設定的 [執行] 組態之後BasicStreamingJob
,您可以像執行一般 Java 應用程式一樣執行或偵錯它。
注意
您不能直接java -jar
...
從命令行運行 Maven 生成的脂肪罐。此 jar 不包含獨立執行應用程式所需的 Flink 核心相依性。
當應用程式成功啟動時,它會記錄有關獨立迷你叢集和連接器初始化的一些資訊。後面是 Flink 通常會在應用程序啟動時發出的INFO一些WARN日誌。
13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....
初始化完成後,應用程式不會發出任何進一步的記錄項目。當資料流動時,不會發出任何記錄檔。
若要驗證應用程式是否正確處理資料,您可以檢查輸入和輸出 Kinesis 串流,如下節所述。
注意
不發出有關流動資料的記錄是 Flink 應用程式的正常行為。在每個記錄上發出日誌可能很方便調試,但在生產環境中運行時可能會增加相當大的開銷。
觀察 Kinesis 串流中的輸入和輸出資料
您可以使用 Amazon Kinesis Kinesis 主控台中的資料檢視器,觀察 (產生範例 Python) 或 Kinesis 資料產生器 (連結) 傳送至輸入串流的記錄。
觀察記錄
在 https://console.aws.amazon.com/
Kinesis 處開啟室壁運動主控台。 -
確認區域與您執行此教學課程的位置相同,依預設為 us-east-1 美國東部 (維吉尼亞北部)。如果「區域」不相符,請變更該區域。
-
選擇「資料串流」。
-
選取您要觀察的串流,
ExampleInputStream
或ExampleOutputStream.
-
選擇「資料檢視器」標籤。
-
選擇任何碎片,保持最新作為起始位置,然後選擇獲取記錄。您可能會看到「找不到此要求的記錄」錯誤訊息。如果是這樣,請選擇重試取得記錄。發佈至串流的最新記錄會顯示。
-
在「資料」欄中選擇值,以JSON格式檢查記錄的內容。
停止應用程式在本機執行
停止在您的IDE. 通IDE常會提供「停止」選項。確切的位置和方法取決於IDE您使用的。
編譯並封裝應用程式程式碼
在本節中,您可以使用阿帕奇 Maven 編譯 Java 代碼並將其打包到一個JAR文件中。您可以使用 Maven 命令行工具或IDE.
若要使用 Maven 命令列進行編譯和封裝:
移至包含 Java GettingStarted 專案的目錄,並執行下列命令:
$ mvn package
要使用以下命令編譯和打包IDE:
mvn package
從你的 IDE Maven 集成運行。
在這兩種情況下,都會建立下列JAR檔案:target/amazon-msf-java-stream-app-1.0.jar
。
注意
從您運行「構建項目」IDE 可能無法創建該JAR文件。
上傳應用程序代碼JAR文件
在本節中,您將在上一節中建立的JAR檔案上傳到您在本教學開始時建立的 Amazon 簡單儲存服務 (Amazon S3) 儲存貯體。如果您尚未完成此步驟,請參閱(鏈接)。
上傳應用程式程式碼JAR檔
在開啟 Amazon S3 主控台https://console.aws.amazon.com/s3/
。 -
選擇您先前為應用程式程式碼建立的值區。
-
選擇上傳。
-
選擇 Add files (新增檔案)。
-
導覽至在上一個步驟中產生的JAR檔案:
target/amazon-msf-java-stream-app-1.0.jar
。 -
選擇「上傳」而不變更任何其他設定。
警告
請確定您在中選取了正確的JAR檔案<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar
。
該target
目錄還包含您不需要上傳的其他JAR文件。
建立並設定 Apache Flink 應用程式的受管理服務
您可以使用主控台或 AWS CLI建立和執行 Managed Service for Apache Flink 應用程式。在本教程中,您將使用控制台。
注意
當您使用主控台建立應用程式時,系統會為您建立 AWS Identity and Access Management (IAM) 和 Amazon CloudWatch 日誌資源。使用建立應用程式時 AWS CLI,您需要分別建立這些資源。
建立應用程式
建立應用程式
在 https://console.aws.amazon.com /flink 開啟適用於阿帕奇 Flink 的受管理服務
-
確認已選取正確的區域:us-east-1 美國東部 (維吉尼亞北部)
-
打開右側的菜單,然後選擇 Apache Flink 應用程序,然後創建流應用程序。或者,在初始頁面的 [開始使用] 容器中選擇 [建立串流應用程式]。
-
在 [建立串流應用程式] 頁面上:
-
選擇設定串流處理應用程式的方法:選擇「從頭開始建立」。
-
阿帕奇 Flink 配置,應用程序 Flink 版本:選擇阿帕奇 F link 1.19。
-
-
設定應用程式
-
應用程式名稱:輸入
MyApplication
。 -
描述:輸入
My java test app
。 -
存取應用程式資源:選擇 [建立/更新
kinesis-analytics-MyApplication-us-east-1
具有必要策略的IAM角色]。
-
-
設定應用程式設定的範本
-
範本:選擇 [開發]。
-
-
選擇頁面底部的 [建立串流應用程式]。
注意
當您使用主控台為 Apache Flink 應用程式建立受管理服務時,您可以選擇為應用程式建立IAM角色和原則。應用程式使用此角色和政策來存取其相依資源。這些IAM資源會使用您的應用程式名稱和區域命名,如下所示:
-
政策:
kinesis-analytics-service-
MyApplication
-us-east-1
-
角色:
kinesisanalytics-
MyApplication
-us-east-1
Amazon 阿帕奇 Flink 託管服務以前被稱為 Kinesis Data Analytics。自動創建的資源的名稱是kinesis-analytics-
為了向後兼容性的前綴。
編輯IAM策略
編輯原IAM則以新增存取 Kinesis 資料串流的權限。
若要編輯策略
在開啟IAM主控台https://console.aws.amazon.com/iam/
。 -
選擇政策。選擇主控台為您在上一節所建立的
kinesis-analytics-service-MyApplication-us-east-1
政策。 -
選擇 [編輯],然後選擇索JSON引標籤。
-
將下列政策範例的反白部分新增至政策。取代範例帳戶 IDs (
012345678901
) 使用您的帳戶 ID。{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:
012345678901
:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901
:stream/ExampleOutputStream" } -
選擇頁面底部的 [下一步],然後選擇 [儲存變更]。
設定應用程式
編輯應用程式組態以設定應用程式程式碼人工因素。
編輯模型組態
-
在MyApplication頁面上,選擇設定。
-
在應用程式程式碼位置區段中:
-
對於 Amazon S3 儲存貯體,請選取先前為應用程式程式碼建立的儲存貯體。選擇 [瀏覽] 並選取正確的值區,然後選取 [選擇]。請勿按一下值區名稱。
-
對於 Amazon S3 物件的路徑,請輸入
amazon-msf-java-stream-app-1.0.jar
。
-
-
對於 [存取權限],選擇 [建立/更新
kinesis-analytics-MyApplication-us-east-1
具有必要策略的IAM角色] -
在 [執行階段屬性] 區段中,新增下列屬性。
-
選擇「新增項目」,然後新增下列每個參數:
群組 ID 金鑰 值 InputStream0
stream.name
ExampleInputStream
InputStream0
aws.region
us-east-1
OutputStream0
stream.name
ExampleOutputStream
OutputStream0
aws.region
us-east-1
-
請勿修改任何其他區段。
-
選擇 Save changes (儲存變更)。
注意
當您選擇啟用 Amazon 日誌 CloudWatch 記錄時,Apache Flink 的受管服務會為您建立日誌群組和日誌串流。這些資源的名稱如下所示:
-
日誌群組:
/aws/kinesis-analytics/MyApplication
-
日誌串流:
kinesis-analytics-log-stream
執行應用程式
該應用程序現在已配置並準備好運行。
執行應用程式
-
在適用於 Apache Flink 的 Amazon 受管服務的主控台上,選擇我的應用程式,然後選擇執行。
-
在下一頁的 [應用程式還原組態] 頁面上,選擇 [以最新快照執行],然後選擇 [執行]。
「應用程式中的狀態」 詳細資訊會從
Ready
應用程式啟動到再轉換為應用程式的Running
時間。Starting
當應用程式處於狀Running
態時,您現在可以開啟 Flink 儀表板。
開啟 儀表板
-
選擇「開啟阿帕奇 Flink 儀表板」。儀表板會在新頁面上開啟。
-
在「執行工作」清單中,選擇您可以看到的單一工作。
注意
如果您設定執行階段屬性或編輯不正確的IAM原則,應用程式狀態可能會變成
Running
,但 Flink 儀表板會顯示工作正在持續重新啟動。如果應用程式設定錯誤或缺少存取外部資源的權限,這是常見的失敗案例。發生這種情況時,請檢查 Flink 儀表板中的「例外」標籤以查看問題的原因。
觀察正在運行的應用程序的指標
在MyApplication頁面的 Amazon CloudWatch 指標區段中,您可以看到執行中應用程式中的一些基本指標。
若要檢視測量結果
-
在「重新整理」按鈕旁邊,從下拉式清單中選取 10 秒。
-
當應用程式執行且健康狀態良好時,您可以看到正常執行時間指標不斷增加。
-
完整重新啟動指標應為零。如果正在增加,則配置可能有問題。若要調查問題,請檢閱 Flink 儀表板上的「例外」標籤。
-
在運作狀況良好的應用程式中,失敗的檢查點數量應為零。
注意
此儀表板會顯示一組固定的指標,精細度為 5 分鐘。您可以使用儀表板中的任何指標來建立自訂應用程式 CloudWatch 儀表板。
觀察 Kinesis 串流中的輸出資料
請確定您仍在使用 Python 指令碼或 Kinesis 資料產生器將資料發佈至輸入。
您現在可以使用中的「資料檢視器」,觀察在 Apache Flink 受管理服務上執行之應用程式的輸出 https://console.aws.amazon.com/kinesis/
若要檢視輸出
在 https://console.aws.amazon.com/
Kinesis 處開啟室壁運動主控台。 -
確認區域與您用來執行此自學課程的區域相同。依預設,它是美國東部-1 美國東部 (維吉尼亞北部)。如有必要,請變更「區域」。
-
選擇「資料串流」。
-
選取您要觀察的串流。在本教學課程中,使用
ExampleOutputStream
。 -
選擇「資料檢視器」標籤。
-
選取任何 [碎片],將 [最新] 保留為起始位置,然後選擇 [取得記錄]。您可能會看到「找不到此要求的記錄」錯誤訊息。如果是這樣,請選擇重試取得記錄。發佈至串流的最新記錄會顯示。
-
在「資料」欄中選取值,以JSON格式檢查記錄的內容。
停止應用程式
若要停止應用程式,請移至名為的 Apache Flink 應用程式的受管理服務的主控台頁面。MyApplication
停止應用程式
-
從「動作」下拉式清單中,選擇「停止」。
-
「應用程式中的狀態」 詳細資訊
Ready
會從應Running
用程式完全停止轉換為Stopping
,然後再轉換為應用程式的注意
別忘了也停止將資料從 Python 指令碼或 Kinesis 資料產生器傳送至輸入串流。