為 Apache Flink 應用程式建立並執行受管理的服務 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

為 Apache Flink 應用程式建立並執行受管理的服務

在此步驟中,您會建立 Apache Flink 應用程式的受管服務,並將 Kinesis 資料串流當做來源和接收器。

建立相依資源

在為本練習建立 Managed Service for Apache Flink 應用程式之前,先建立下列相依資源:

  • 用於輸入和輸出的兩個 Kinesis 資料串流

  • 用於存放應用程式程式碼的 Amazon S3 儲存貯體

    注意

    本教學課程假設您正在 us-east-1 美國東部 (維吉尼亞北部) 區域部署應用程式。如果您使用其他區域,請相應地調整所有步驟。

建立兩個 Amazon Kinesis 資料串流

在為本練習建立 Managed Service for Apache Flink 應用程式之前,請先建立兩個 Kinesis 資料串流 (ExampleInputStreamExampleOutputStream)。您的應用程式會將這些串流用於應用程式來源和目的地串流。

您可以使用 Amazon Kinesis 主控台或下列 AWS CLI 命令來建立這些串流。如需主控台指示,請參閱《Amazon Kinesis Data Streams 開發人員指南》中的建立和更新資料串流。若要使用建立串流 AWS CLI,請使用下列指令,調整至您用於應用程式的 [區域]。

建立資料串流 (AWS CLI)
  1. 若要建立第一個串流 (ExampleInputStream),請使用下列 Amazon Kinesis create-stream AWS CLI 命令:

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \
  2. 若要建立應用程式用來寫入輸出的第二個資料流,請執行相同的命令,將串流名稱變更為ExampleOutputStream

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \

為應用程式程式碼建立 Amazon S3 儲存貯體

您可以使用主控台建立 Amazon S3 儲存貯體。若要了解如何使用主控台建立 Amazon S3 儲存貯體,請參閱 Amazon S3 使用者指南中的建立儲存貯體。使用全域唯一名稱來命名 Amazon S3 儲存貯體,例如透過附加您的登入名稱。

注意

請務必在您用於本教學課程的「地區」中建立值區 (us-east-1)。

其他資源

當您建立應用程式時,Apache Flink 的受管服務會自動建立下列 Amazon CloudWatch 資源 (如果這些資源尚未存在):

  • 名為 /AWS/KinesisAnalytics-java/<my-application> 的日誌群組。

  • 名為 kinesis-analytics-log-stream 的日誌串流。

設定您的本機開發環境

對於開發和除錯,您可以直接從您選擇IDE的電腦上執行 Apache Flink 應用程式。任何阿帕奇 Flink 依賴關係都像使用阿帕奇 Maven 的常規 Java 依賴關係處理。

注意

在你的開發機器上,你必須安裝 Java JDK 11,Maven 和 Git。我們建議您使用開發環境,如日食 Java 霓虹燈IntelliJ IDEA。若要確認您符合所有先決條件,請參閱滿足完成練習的先決條件。您需要在您的機器上安裝 Apache Flink 叢集。

驗證您的 AWS 會話

應用程式會使用 Kinesis 資料串流來發佈資料。在本機執行時,您必須擁有具有寫入 Kinesis 資料串流之權限的有效 AWS 驗證工作階段。請使用下列步驟來驗證您的工作階段:

  1. 如果您沒有設定 AWS CLI 具有有效認證的具名設定檔,請參閱設置 AWS Command Line Interface (AWS CLI)

  2. 發佈下列測試記錄,確認您的設定正確,且您的使用者具有寫入 Kinesis 資料串流的權限: AWS CLI

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. 如果您IDE有要整合的外掛程式 AWS,您可以使用該外掛程式將憑證傳遞至在IDE. 如需詳細資訊,請參閱 IntelliJ 的AWS 工具AWS組IDEA和 Eclipse 的工具組

下載並檢查阿帕奇 Flink 流 Java 代碼

此範例的 Java 應用程式程式碼可從中取得 GitHub。若要下載應用程式的程式碼,請執行下列動作:

  1. 使用以下指令複製遠端儲存庫:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. 導覽至 amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted 目錄。

檢閱應用程式元

該應用程序完全在com.amazonaws.services.msf.BasicStreamingJob類中實現。此方main()法會定義處理串流資料並執行資料流程的資料流程。

注意

為了獲得最佳化的開發人員體驗,該應用程式的設計目的是在適用於 Apache Flink 的 Amazon 受管服務和本機上執行任何程式碼變更,以便在您IDE的.

  • 若要讀取執行階段組態,以便在 Apache Flink 的 Amazon 受管服務中執行時運作IDE,應用程式會自動偵測它是否在IDE. 在這種情況下,應用程序以不同的方式加載運行時配置:

    1. 當應用程序檢測到它在您的獨立模式下運行時IDE,形成包含在項目資application_properties.json文件夾中的文件。檔案的內容如下。

    2. 當應用程式在適用於 Apache Flink 的 Amazon 受管服務中執行時,預設行為會從您將在 Apache Flink 應用程式的 Amazon 受管服務中定義的執行時期屬性載入應用程式組態。請參閱 建立並設定 Apache Flink 應用程式的受管理服務

      private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
  • main()方法定義應用程序數據流並運行它。

    • 初始化預設串流環境。在此範例中,我們將示範如何同時建立StreamExecutionEnvironment要與 DataSteam API和搭配使用StreamTableEnvironment的和 Table 一起SQL使用的API。這兩個環境對象是對同一運行時環境的兩個單獨的引用,以使用不同的APIs。

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    • 載入應用程式組態參數。這將自動從正確的位置加載它們,具體取決於應用程序運行的位置:

      Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    • 應用程式使用 Kinesis 取用者連接器定義來源,從輸入串流讀取資料。輸入流的配置在 PropertyGroupId = 中定義InputStream0。串流的名稱和區域aws.region分別位於命名stream.name的屬性中。為了簡單起見,此源將記錄讀取為字符串。

      private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... }
    • 接著,應用程式會使用 Kinesis Streams 接收器定義接收器,將資料傳送至輸出串流。輸出流名稱和區域在 PropertyGroupId = 中定義OutputStream0,類似於輸入流。接收器直接連接到從源獲取數據的內部DataStream。在真實的應用程序中,您在源和接收器之間進行了一些轉換。

      private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... }
    • 最後,您執行剛才定義的資料流程。這必須是該main()方法的最後一條指令,在定義了數據流需要的所有運算符之後:

      env.execute("Flink streaming Java API skeleton");

使用 pom.xml 檔案

pom.xml 文件定義了應用程序所需的所有依賴關係,並設置了 Maven Shade 插件來構建包含 Flink 所需的所有依賴關係的脂肪罐。

  • 一些依賴關係具有provided範圍。當應用程式在適用於 Apache Flink 的 Amazon 受管服務中執行時,這些相依性會自動提供。他們需要編譯應用程式,或在IDE. 如需詳細資訊,請參閱在本機執行應用程式。請確定您使用的 Flink 版本與您將在 Apache Flink 的 Amazon 受管服務中使用的執行階段相同。

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • 您必須將其他 Apache Flink 相依性新增至具有預設範圍的 pom,例如此應用程式使用的 Kinesis 連接器。如需詳細資訊,請參閱搭配 Apache Flink 的受管理服務使用 Apache Flink 連接器。您也可以新增應用程式所需的任何其他 Java 相依性。

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
  • Maven Java 編譯器插件可以確保代碼是根據 Java 11 編譯的,這是阿帕奇 Flink 目前支持的JDK版本。

  • Maven Shade 插件包裝脂肪罐,不包括運行時提供的一些庫。它還指定了兩個變壓器:ServicesResourceTransformerManifestResourceTransformer。後者配置包含啟動應用程序的main方法的類。如果重命名主類,請不要忘記更新此變壓器。

  • <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>

將樣本記錄寫入輸入流

在本節中,您會將範例記錄傳送至串流以供應用程式處理。您有兩個選項可以使用 Python 指令碼或 Kinesis 資料產生器來產生範例資料

使用 Python 腳本生成示例數據

您可以使用 Python 腳本將示例記錄發送到流。

注意

要運行這個 Python 腳本,你必須使用 Python 3.x 和有 Python(博托)庫安裝。AWS SDK

若要開始將測試資料傳送至 Kinesis 輸入串流:

  1. 從數據生成器 GitHub 存儲庫下載數據生成器 stock.py Python 腳本。

  2. 執行 stock.py 指令碼:

    $ python stock.py

當您完成教學課程的其餘部分時,請保持指令碼的執行狀態。您現在可以執行您的 Apache Flink 應用程式。

使用 Kinesis 資料產生器產生範例資料

或者,若要使用 Python 指令碼,您也可以使用託管版本提供的 Kinesis 資料產生器,將隨機範例資料傳送至串流。Kinesis 資料產生器會在您的瀏覽器中執行,而且您不需要在電腦上安裝任何項目。

若要設定和執行 Kinesis 資料產生器:

  1. 請遵循 Kinesis 資料產生器文件中的指示來設定工具的存取權。您將執行設定使用者和密碼的 AWS CloudFormation 範本。

  2. 透過 CloudFormation範本產生的存取 Kinesis 資料URL產生器。 CloudFormation 模板完成後,您可以URL在「輸出」選項卡中找到。

  3. 設定資料產生器:

    • 區域:選取您在此教學課程中使用的區域:us-east-1

    • 流/交付串流:選取應用程式將使用的輸入串流:ExampleInputStream

    • 每秒記錄數:100

    • 記錄範本:複製並貼上下列範本:

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. 測試範本:選擇「測試範本」,然後確認產生的記錄與下列內容類似:

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. 啟動資料產生器:選擇 「選取傳送資料」。

Kinesis 資料產生器現在正在將資料傳送至. ExampleInputStream

在本機執行應用程式

您可以在. IDE

注意

在繼續之前,請確認輸入和輸出串流是否可用。請參閱 建立兩個 Amazon Kinesis 資料串流。另外,請確認您具有從兩個串流讀取和寫入的權限。請參閱 驗證您的 AWS 會話

設置本地開發環境需要 Java 11JDK,阿帕奇 Maven 的,並IDE用於 Java 開發。確認您符合必要的先決條件。請參閱 滿足完成練習的先決條件

將 Java 專案匯入您的 IDE

若要在中開始處理應用程式IDE,您必須將其匯入為 Java 專案。

您複製的儲存庫包含多個範例。每個例子都是一個單獨的項目。在本教學課程中,請將./java/GettingStarted子目錄中的內容匯入您的IDE.

將代碼作為使用 Maven 的現有 Java 項目插入。

注意

導入新 Java 項目的確切過程根據IDE您使用的不同而有所不同。

檢查本地應用程序配置

在本機執行時,應用程式會使用下專application_properties.json案資源資料夾中檔案中的組態./src/main/resources。您可以編輯此檔案以使用不同的 Kinesis 串流名稱或區域。

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

設置您的IDE運行配置

您可以透過執行主類別com.amazonaws.services.msf.BasicStreamingJob,IDE直接從您的 Flink 應用程式執行和偵錯,就像執行任何 Java 應用程式一樣。在運行應用程序之前,您必須設置運行配置。設置取決於IDE您正在使用的。例如,請參閱 Intelli IDEA J 文檔中的運行/調試配置。特別是,您必須設定下列項目:

  1. provided依賴關係添加到類路徑中。這是必要的,以確保在本地運行時將具有provided範圍的依賴關係傳遞給應用程序。如果沒有此設定,應用程式會立即顯示class not found錯誤。

  2. 傳遞 AWS 認證以存取 Kinesis 串流至應用程式。最快的方法是使用 IntelliJ IDEA 的AWS 工具包。在 Run 配置中使用此IDE插件,您可以選擇特定的配置 AWS 文件。 AWS 使用此設定檔進行驗證。您不需要直接傳遞 AWS 憑證。

  3. 驗證是否使用 JDK11 IDE 執行應用程式。

在您的中運行應用程序 IDE

設定的 [執行] 組態之後BasicStreamingJob,您可以像執行一般 Java 應用程式一樣執行或偵錯它。

注意

您不能直接java -jar ...從命令行運行 Maven 生成的脂肪罐。此 jar 不包含獨立執行應用程式所需的 Flink 核心相依性。

當應用程式成功啟動時,它會記錄有關獨立迷你叢集和連接器初始化的一些資訊。後面是 Flink 通常會在應用程序啟動時發出的INFO一些WARN日誌。

13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....

初始化完成後,應用程式不會發出任何進一步的記錄項目。當資料流動時,不會發出任何記錄檔。

若要驗證應用程式是否正確處理資料,您可以檢查輸入和輸出 Kinesis 串流,如下節所述。

注意

不發出有關流動資料的記錄是 Flink 應用程式的正常行為。在每個記錄上發出日誌可能很方便調試,但在生產環境中運行時可能會增加相當大的開銷。

觀察 Kinesis 串流中的輸入和輸出資料

您可以使用 Amazon Kinesis Kinesis 主控台中的資料檢視器,觀察 (產生範例 Python) 或 Kinesis 資料產生 (連結) 傳送至輸入串流的記錄。

觀察記錄
  1. https://console.aws.amazon.com/Kinesis 處開啟室壁運動主控台。

  2. 確認區域與您執行此教學課程的位置相同,依預設為 us-east-1 美國東部 (維吉尼亞北部)。如果「區域」不相符,請變更該區域。

  3. 選擇「資料串流」。

  4. 選取您要觀察的串流,ExampleInputStreamExampleOutputStream.

  5. 選擇「資料檢視器」標籤。

  6. 選擇任何碎片,保持最新作為起始位置,然後選擇獲取記錄。您可能會看到「找不到此要求的記錄」錯誤訊息。如果是這樣,請選擇重試取得記錄。發佈至串流的最新記錄會顯示。

  7. 在「資料」欄中選擇值,以JSON格式檢查記錄的內容。

停止應用程式在本機執行

停止在您的IDE. 通IDE常會提供「停止」選項。確切的位置和方法取決於IDE您使用的。

編譯並封裝應用程式程式碼

在本節中,您可以使用阿帕奇 Maven 編譯 Java 代碼並將其打包到一個JAR文件中。您可以使用 Maven 命令行工具或IDE.

若要使用 Maven 命令列進行編譯和封裝:

移至包含 Java GettingStarted 專案的目錄,並執行下列命令:

$ mvn package

要使用以下命令編譯和打包IDE:

mvn package從你的 IDE Maven 集成運行。

在這兩種情況下,都會建立下列JAR檔案:target/amazon-msf-java-stream-app-1.0.jar

注意

從您運行「構建項目」IDE 可能無法創建該JAR文件。

上傳應用程序代碼JAR文件

在本節中,您將在上一節中建立的JAR檔案上傳到您在本教學開始時建立的 Amazon 簡單儲存服務 (Amazon S3) 儲存貯體。如果您尚未完成此步驟,請參閱(鏈接)。

上傳應用程式程式碼JAR檔
  1. 在開啟 Amazon S3 主控台https://console.aws.amazon.com/s3/

  2. 選擇您先前為應用程式程式碼建立的值區。

  3. 選擇上傳

  4. 選擇 Add files (新增檔案)

  5. 導覽至在上一個步驟中產生的JAR檔案:target/amazon-msf-java-stream-app-1.0.jar

  6. 選擇「上傳」而不變更任何其他設定。

警告

請確定您在中選取了正確的JAR檔案<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar

target目錄還包含您不需要上傳的其他JAR文件。

建立並設定 Apache Flink 應用程式的受管理服務

您可以使用主控台或 AWS CLI建立和執行 Managed Service for Apache Flink 應用程式。在本教程中,您將使用控制台。

注意

當您使用主控台建立應用程式時,系統會為您建立 AWS Identity and Access Management (IAM) 和 Amazon CloudWatch 日誌資源。使用建立應用程式時 AWS CLI,您需要分別建立這些資源。

建立應用程式

建立應用程式
  1. 在 https://console.aws.amazon.com /flink 開啟適用於阿帕奇 Flink 的受管理服務

  2. 確認已選取正確的區域:us-east-1 美國東部 (維吉尼亞北部)

  3. 打開右側的菜單,然後選擇 Apache Flink 應用程序,然後創建流應用程序。或者,在初始頁面的 [開始使用] 容器中選擇 [建立串流應用程式]。

  4. 在 [建立串流應用程式] 頁面上:

    • 選擇設定串流處理應用程式的方法:選擇「從頭開始建立」。

    • 阿帕奇 Flink 配置,應用程序 Flink 版本:選擇阿帕奇 F link 1.19。

  5. 設定應用程式

    • 應用程式名稱:輸入MyApplication

    • 描述:輸入My java test app

    • 存取應用程式資源:選擇 [建立/更新kinesis-analytics-MyApplication-us-east-1具有必要策略的IAM角色]。

  6. 設定應用程式設定的範本

    • 範本:選擇 [開發]。

  7. 選擇頁面底部的 [建立串流應用程式]。

注意

當您使用主控台為 Apache Flink 應用程式建立受管理服務時,您可以選擇為應用程式建立IAM角色和原則。應用程式使用此角色和政策來存取其相依資源。這些IAM資源會使用您的應用程式名稱和區域命名,如下所示:

  • 政策:kinesis-analytics-service-MyApplication-us-east-1

  • 角色:kinesisanalytics-MyApplication-us-east-1

Amazon 阿帕奇 Flink 託管服務以前被稱為 Kinesis Data Analytics。自動創建的資源的名稱是kinesis-analytics-為了向後兼容性的前綴。

編輯IAM策略

編輯原IAM則以新增存取 Kinesis 資料串流的權限。

若要編輯策略
  1. 在開啟IAM主控台https://console.aws.amazon.com/iam/

  2. 選擇政策。選擇主控台為您在上一節所建立的 kinesis-analytics-service-MyApplication-us-east-1 政策。

  3. 選擇 [編輯],然後選擇索JSON引標籤。

  4. 將下列政策範例的反白部分新增至政策。取代範例帳戶 IDs (012345678901) 使用您的帳戶 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. 選擇頁面底部的 [下一步],然後選擇 [儲存變更]。

設定應用程式

編輯應用程式組態以設定應用程式程式碼人工因素。

編輯模型組態
  1. MyApplication頁面上,選擇設定

  2. 應用程式程式碼位置區段中:

    • 對於 Amazon S3 儲存貯體,請選取先前為應用程式程式碼建立的儲存貯體。選擇 [瀏覽] 並選取正確的值區,然後選取 [選擇]。請勿按一下值區名稱。

    • 對於 Amazon S3 物件的路徑,請輸入 amazon-msf-java-stream-app-1.0.jar

  3. 對於 [存取權限],選擇 [建立/更新kinesis-analytics-MyApplication-us-east-1具有必要策略的IAM角色]

  4. 在 [執行階段屬性] 區段中,新增下列屬性。

  5. 選擇「新增項目」,然後新增下列每個參數:

    群組 ID 金鑰
    InputStream0 stream.name ExampleInputStream
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
  6. 請勿修改任何其他區段。

  7. 選擇 Save changes (儲存變更)。

注意

當您選擇啟用 Amazon 日誌 CloudWatch 記錄時,Apache Flink 的受管服務會為您建立日誌群組和日誌串流。這些資源的名稱如下所示:

  • 日誌群組:/aws/kinesis-analytics/MyApplication

  • 日誌串流:kinesis-analytics-log-stream

執行應用程式

該應用程序現在已配置並準備好運行。

執行應用程式
  1. 在適用於 Apache Flink 的 Amazon 受管服務的主控台上,選擇我的應用程式,然後選擇執行

  2. 在下一頁的 [應用程式還原組態] 頁面上,選擇 [以最新快照執行],然後選擇 [執行]。

    應用程式中的狀態」 詳細資訊會從Ready應用程式啟動到再轉換為應用程式的Running時間。Starting

當應用程式處於狀Running態時,您現在可以開啟 Flink 儀表板。

開啟 儀表板
  1. 選擇「開啟阿帕奇 Flink 儀表板」。儀表板會在新頁面上開啟。

  2. 在「執行工作」清單中,選擇您可以看到的單一工作。

    注意

    如果您設定執行階段屬性或編輯不正確的IAM原則,應用程式狀態可能會變成Running,但 Flink 儀表板會顯示工作正在持續重新啟動。如果應用程式設定錯誤或缺少存取外部資源的權限,這是常見的失敗案例。

    發生這種情況時,請檢查 Flink 儀表板中的「例外」標籤以查看問題的原因。

觀察正在運行的應用程序的指標

MyApplication頁面的 Amazon CloudWatch 指標區段中,您可以看到執行中應用程式中的一些基本指標。

若要檢視測量結果
  1. 在「重新整理」按鈕旁邊,從下拉式清單中選取 10 秒

  2. 當應用程式執行且健康狀態良好時,您可以看到正常執行時指標不斷增加。

  3. 完整重新啟動指標應為零。如果正在增加,則配置可能有問題。若要調查問題,請檢閱 Flink 儀表板上的「例外」標籤。

  4. 在運作狀況良好的應用程式中,失敗的檢查點數量應為零。

    注意

    此儀表板會顯示一組固定的指標,精細度為 5 分鐘。您可以使用儀表板中的任何指標來建立自訂應用程式 CloudWatch 儀表板。

觀察 Kinesis 串流中的輸出資料

請確定您仍在使用 Python 指令碼或 Kinesis 資料產生器將資料發佈至輸入。

您現在可以使用中的「資料檢視器」,觀察在 Apache Flink 受管理服務上執行之應用程式的輸出 https://console.aws.amazon.com/kinesis/,與您之前已執行的操作類似。

若要檢視輸出
  1. https://console.aws.amazon.com/Kinesis 處開啟室壁運動主控台。

  2. 確認區域與您用來執行此自學課程的區域相同。依預設,它是美國東部-1 美國東部 (維吉尼亞北部)。如有必要,請變更「區域」。

  3. 選擇「資料串流」。

  4. 選取您要觀察的串流。在本教學課程中,使用 ExampleOutputStream

  5. 選擇「資料檢視器」標籤。

  6. 選取任何 [碎片],將 [最新] 保留為起始位置,然後選擇 [取得記錄]。您可能會看到「找不到此要求的記錄」錯誤訊息。如果是這樣,請選擇重試取得記錄。發佈至串流的最新記錄會顯示。

  7. 在「資料」欄中選取值,以JSON格式檢查記錄的內容。

停止應用程式

若要停止應用程式,請移至名為的 Apache Flink 應用程式的受管理服務的主控台頁面。MyApplication

停止應用程式
  1. 從「動作」下拉式清單中,選擇「停止」。

  2. 應用程式中的狀態」 詳細資訊Ready會從應Running用程式完全停止轉換為Stopping,然後再轉換為應用程式的

    注意

    別忘了也停止將資料從 Python 指令碼或 Kinesis 資料產生器傳送至輸入串流。

下一步驟

清理 AWS 資源