創建使用阿帕奇梁的應用程序 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

創建使用阿帕奇梁的應用程序

在本練習中,您將使用 Apache Beam 建立可轉換資料的 Managed Service for Apache Flink 應用程式。Apache Beam 是用於處理串流資料的程式設計模型。如需將 Apache Beam 與 Managed Service for Apache Flink 搭配使用的相關資訊,請參閱為 Apache Flink 應用程式使用具有託管服務的 Apache 光束

注意

若要設定此練習的必要先決條件,請先完成 教學課程:開始使用 Managed Service for Apache Flink 中的 DataStream API 練習。

建立相依資源

在為本練習建立 Managed Service for Apache Flink 應用程式之前,先建立下列相依資源:

  • 兩個 Kinesis 資料串流 (ExampleInputStreamExampleOutputStream)

  • Amazon S3 儲存貯體,用來儲存應用程式的程式碼 (ka-app-code-<username>)

您可以在主控台中建立 Kinesis 串流和 Amazon S3 儲存貯體。如需建立這些資源的相關指示,請參閱以下主題:

  • 《Amazon Kinesis Data Streams 開發人員指南》中的建立和更新資料串流。為資料串流 ExampleInputStreamExampleOutputStream 命名。

  • 《Amazon Simple Storage Service 使用者指南》中的如何建立 S3 儲存貯體透過附加登入名稱 (例如 ka-app-code-<username>),為 Amazon S3 儲存貯體提供全域唯一的名稱。

將樣本記錄寫入輸入流

在本節中,您會透過 Python 指令碼將隨機字串寫入串流供應用程式處理。

注意
  1. 使用下列內容建立名為 ping.py 的檔案:

    import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
  2. 執行 ping.py 指令碼:

    $ python ping.py

    在完成教學課程的其餘部分時,讓指令碼保持執行狀態。

下載並檢查應用程式程式碼

此範例的 Java 應用程式程式碼可從中取得 GitHub。若要下載應用程式的程式碼,請執行下列動作:

  1. 如果您尚未安裝 Git 用戶端,請先安裝。如需詳細資訊,請參閱安裝 Git

  2. 使用以下指令複製遠端儲存庫:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. 導覽至 amazon-kinesis-data-analytics-java-examples/Beam 目錄。

應用程式的程式碼位於 BasicBeamStreamingJob.java 檔案中。請留意下列與應用程式的程式碼相關的資訊:

  • 該應用程序使用 Apache Beam 通過調ParDo用名為的自定義轉換函PingPongFn數來處理傳入的記錄。

    調用 PingPongFn 函數的代碼如下:

    .apply("Pong transform", ParDo.of(new PingPongFn())
  • 使用 Apache Beam 的 Managed Service for Apache Flink 應用程式需要下列元件。如果您未在 pom.xml 中包含這些元件和版本,應用程式會從環境相依性載入不正確的版本,而且由於版本不符合,應用程式會在執行期損毀。

    <jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
  • PingPongFn 轉換函數會將輸入資料傳遞到輸出串流,除非輸入資料是 ping,在這種情況下,它發出字串 pong\n 到輸出串流。

    轉換函數的程式碼如下:

    private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }

編譯應用程式程式碼

若要編譯應用程式,請執行下列動作:

  1. 如果尚未安裝 Java 和 Maven,請先安裝。如需詳細資訊,請參閱教學課程:開始使用 Managed Service for Apache Flink 中的 DataStream API 教學課程中的完成必要的先決條件

  2. 使用下列命令編譯應用程式:

    mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
    注意

    提供的來源程式碼依賴於 Java 11 中的程式庫。

編譯應用程式會建立應用程式JAR檔案 (target/basic-beam-app-1.0.jar)。

上傳阿帕奇 Flink 流 Java 代碼

在本節中,您會將應用程式的程式碼上傳至在建立相依資源一節建立的 Amazon S3 儲存貯體。

  1. 在 Amazon S3 控制台中,選擇 ka-app-code-<username>桶,然後選擇上傳

  2. 選取檔案步驟中,選擇 新增檔案。導覽至您在上一步驟中建立的 basic-beam-app-1.0.jar 檔案。

  3. 您不需要變更物件的任何設定,因此請選擇上傳

您的應用程式的程式碼現在儲存在您的應用程式可以存取的 Amazon S3 儲存貯體中。

建立並執行 Apache Flink 應用程式的受管理服務

依照以下步驟來使用主控台建立、設定、更新及執行應用程式。

建立應用程式

  1. 在 https://console.aws.amazon.com /flink 開啟適用於阿帕奇 Flink 的受管理服務

  2. 在 Managed Service for Apache Flink 儀表板上,選擇建立分析應用程式

  3. Managed Service for Apache Flink - 建立應用程式頁面,提供應用程式詳細資訊,如下所示:

    • 應用程式名稱中,輸入 MyApplication

    • 對於​執行期,選擇 ​Apache Flink

      注意

      阿帕奇梁目前不與阿帕奇 Flink 版本 1.19 或更高版本兼容。

    • 從版本下拉式清單中選取 Apache Flink 版本 1.15

  4. 對於 [存取權限],選擇 [建立/更新IAM角色] kinesis-analytics-MyApplication-us-west-2

  5. 選擇建立應用程式

注意

當您使用主控台為 Apache Flink 應用程式建立受管理服務時,您可以選擇為應用程式建立IAM角色和原則。應用程式使用此角色和政策來存取其相依資源。這些IAM資源會使用您的應用程式名稱和區域命名,如下所示:

  • 政策:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesis-analytics-MyApplication-us-west-2

編輯IAM策略

編輯原IAM則以新增存取 Kinesis 資料串流的權限。

  1. 在開啟IAM主控台https://console.aws.amazon.com/iam/

  2. 選擇政策。選擇主控台為您在上一節所建立的 kinesis-analytics-service-MyApplication-us-west-2 政策。

  3. 摘要頁面,選擇編輯政策。選擇索JSON引標籤。

  4. 將下列政策範例的反白部分新增至政策。取代範例帳戶 IDs (012345678901) 使用您的帳戶 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

設定應用程式

  1. MyApplication頁面上,選擇設定

  2. 設定應用程式頁面,提供程式碼位置

    • 對於 Amazon S3 儲存貯體,請輸入 ka-app-code-<username>

    • 對於 Amazon S3 物件的路徑,請輸入 basic-beam-app-1.0.jar

  3. 在 [存取應用程式資源] 下,針對 [存取權限] 選擇 [建立/更新IAM角色] kinesis-analytics-MyApplication-us-west-2

  4. 輸入下列資料:

    群組 ID 金鑰
    BeamApplicationProperties InputStreamName ExampleInputStream
    BeamApplicationProperties OutputStreamName ExampleOutputStream
    BeamApplicationProperties AwsRegion us-west-2
  5. 監控下,確保監控指標層級設為應用程式

  6. 若要CloudWatch 記錄,請選取 [啟用] 核取方塊。

  7. 選擇更新

注意

當您選擇啟用 CloudWatch 記錄時,Apache Flink 的受管理服務會為您建立記錄群組和記錄資料流。這些資源的名稱如下所示:

  • 日誌群組:/aws/kinesis-analytics/MyApplication

  • 日誌串流:kinesis-analytics-log-stream

此日誌串流用於監控應用程式。這與應用程式用來傳送結果的日誌串流不同。

執行應用程式

透過執行應用程式、開啟 Apache Flink 儀表板並選擇所需的 Flink 作業,即可檢視 Flink 作業圖表。

您可以在 CloudWatch 主控台上檢查 Apache Flink 的受管理服務量度,以確認應用程式是否正常運作。

清理 AWS 資源

本節包括清除在「暫停視窗」教學課程中建立之 AWS 資源的程序。

刪除適用於 Apache Flink 應用程式的受管理服務

  1. 在 https://console.aws.amazon.com /flink 開啟適用於阿帕奇 Flink 的受管理服務

  2. 在適用於 Apache Flink 的受管理服務面板中,選擇MyApplication

  3. 在應用程式的頁面,選擇刪除,然後確認刪除。

刪除您的 Kinesis 資料串流

  1. https://console.aws.amazon.com/Kinesis 處開啟室壁運動主控台。

  2. 在「Kinesis Data Streams」面板中,選擇ExampleInputStream

  3. ExampleInputStream頁面中,選擇「刪除 Kinesis 串流」,然後確認刪除。

  4. Kinesis 串流頁面中,選擇,選擇「ExampleOutputStream動作」,選擇「刪除」,然後確認刪除。

刪除您的 Amazon S3 對象和存儲桶

  1. 在開啟 Amazon S3 主控台https://console.aws.amazon.com/s3/

  2. 選擇 ka-app-code-<username> 桶。

  3. 選擇刪除,然後輸入儲存貯體名稱以確認刪除。

刪除您的IAM資源

  1. 在開啟IAM主控台https://console.aws.amazon.com/iam/

  2. 在導覽列中,選擇政策

  3. 在篩選器控制項中,輸入 kinesis

  4. 選擇 kinesis-analytics-service--MyApplication us-we st-2 原則。

  5. 選擇政策動作,然後選擇刪除

  6. 在導覽列中,選擇角色

  7. 選擇運動分析-MyApplication us-west-2 角色

  8. 選擇刪除角色,然後確認刪除。

刪除您的 CloudWatch 資源

  1. 在開啟 CloudWatch 主控台https://console.aws.amazon.com/cloudwatch/

  2. 在導覽列中,選擇日誌

  3. 選擇 /aws/運動分析/日誌群MyApplication組。

  4. 選擇刪除日誌群組,然後確認刪除。

後續步驟

現在您已建立並執行使用 Apache Beam 轉換資料的基本 Managed Service for Apache Flink 應用程式,請參閱下列應用程式,取得更進階 Managed Service for Apache Flink 解決方案的範例。