Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
創建使用阿帕奇梁的應用程序
在本練習中,您將使用 Apache Beam
注意
若要設定此練習的必要先決條件,請先完成 教學課程:開始使用 Managed Service for Apache Flink 中的 DataStream API 練習。
本主題包含下列章節:
建立相依資源
在為本練習建立 Managed Service for Apache Flink 應用程式之前,先建立下列相依資源:
兩個 Kinesis 資料串流 (
ExampleInputStream
和ExampleOutputStream
)Amazon S3 儲存貯體,用來儲存應用程式的程式碼 (
ka-app-code-
)<username>
您可以在主控台中建立 Kinesis 串流和 Amazon S3 儲存貯體。如需建立這些資源的相關指示,請參閱以下主題:
《Amazon Kinesis Data Streams 開發人員指南》中的建立和更新資料串流。為資料串流
ExampleInputStream
和ExampleOutputStream
命名。《Amazon Simple Storage Service 使用者指南》中的如何建立 S3 儲存貯體。透過附加登入名稱 (例如
ka-app-code-
),為 Amazon S3 儲存貯體提供全域唯一的名稱。<username>
將樣本記錄寫入輸入流
在本節中,您會透過 Python 指令碼將隨機字串寫入串流供應用程式處理。
注意
-
使用下列內容建立名為
ping.py
的檔案:import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
-
執行
ping.py
指令碼:$ python ping.py
在完成教學課程的其餘部分時,讓指令碼保持執行狀態。
下載並檢查應用程式程式碼
此範例的 Java 應用程式程式碼可從中取得 GitHub。若要下載應用程式的程式碼,請執行下列動作:
如果您尚未安裝 Git 用戶端,請先安裝。如需詳細資訊,請參閱安裝 Git
。 使用以下指令複製遠端儲存庫:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
導覽至
amazon-kinesis-data-analytics-java-examples/Beam
目錄。
應用程式的程式碼位於 BasicBeamStreamingJob.java
檔案中。請留意下列與應用程式的程式碼相關的資訊:
該應用程序使用 Apache Beam 通過調ParDo
用名為的自定義轉換函 PingPongFn
數來處理傳入的記錄。調用
PingPongFn
函數的代碼如下:.apply("Pong transform", ParDo.of(new PingPongFn())
使用 Apache Beam 的 Managed Service for Apache Flink 應用程式需要下列元件。如果您未在
pom.xml
中包含這些元件和版本,應用程式會從環境相依性載入不正確的版本,而且由於版本不符合,應用程式會在執行期損毀。<jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
PingPongFn
轉換函數會將輸入資料傳遞到輸出串流,除非輸入資料是 ping,在這種情況下,它發出字串 pong\n 到輸出串流。轉換函數的程式碼如下:
private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }
編譯應用程式程式碼
若要編譯應用程式,請執行下列動作:
如果尚未安裝 Java 和 Maven,請先安裝。如需詳細資訊,請參閱教學課程:開始使用 Managed Service for Apache Flink 中的 DataStream API 教學課程中的完成必要的先決條件。
使用下列命令編譯應用程式:
mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
注意
提供的來源程式碼依賴於 Java 11 中的程式庫。
編譯應用程式會建立應用程式JAR檔案 (target/basic-beam-app-1.0.jar
)。
上傳阿帕奇 Flink 流 Java 代碼
在本節中,您會將應用程式的程式碼上傳至在建立相依資源一節建立的 Amazon S3 儲存貯體。
-
在 Amazon S3 控制台中,選擇 ka-app-code-
<username>
桶,然後選擇上傳。 -
在選取檔案步驟中,選擇 新增檔案。導覽至您在上一步驟中建立的
basic-beam-app-1.0.jar
檔案。 您不需要變更物件的任何設定,因此請選擇上傳。
您的應用程式的程式碼現在儲存在您的應用程式可以存取的 Amazon S3 儲存貯體中。
建立並執行 Apache Flink 應用程式的受管理服務
依照以下步驟來使用主控台建立、設定、更新及執行應用程式。
建立應用程式
在 https://console.aws.amazon.com /flink 開啟適用於阿帕奇 Flink 的受管理服務
-
在 Managed Service for Apache Flink 儀表板上,選擇建立分析應用程式。
-
在 Managed Service for Apache Flink - 建立應用程式頁面,提供應用程式詳細資訊,如下所示:
-
在應用程式名稱中,輸入
MyApplication
。 -
對於執行期,選擇 Apache Flink。
注意
阿帕奇梁目前不與阿帕奇 Flink 版本 1.19 或更高版本兼容。
從版本下拉式清單中選取 Apache Flink 版本 1.15。
-
-
對於 [存取權限],選擇 [建立/更新IAM角色]
kinesis-analytics-MyApplication-us-west-2
。 -
選擇建立應用程式。
注意
當您使用主控台為 Apache Flink 應用程式建立受管理服務時,您可以選擇為應用程式建立IAM角色和原則。應用程式使用此角色和政策來存取其相依資源。這些IAM資源會使用您的應用程式名稱和區域命名,如下所示:
-
政策:
kinesis-analytics-service-
MyApplication
-us-west-2
-
角色:
kinesis-analytics-MyApplication-
us-west-2
編輯IAM策略
編輯原IAM則以新增存取 Kinesis 資料串流的權限。
在開啟IAM主控台https://console.aws.amazon.com/iam/
。 -
選擇政策。選擇主控台為您在上一節所建立的
kinesis-analytics-service-MyApplication-us-west-2
政策。 -
在摘要頁面,選擇編輯政策。選擇索JSON引標籤。
-
將下列政策範例的反白部分新增至政策。取代範例帳戶 IDs (
012345678901
) 使用您的帳戶 ID。{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:
012345678901
:log-group:*", "arn:aws:s3:::ka-app-code-<username>
/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" }
設定應用程式
-
在MyApplication頁面上,選擇設定。
-
在設定應用程式頁面,提供程式碼位置:
-
對於 Amazon S3 儲存貯體,請輸入
ka-app-code-
。<username>
-
對於 Amazon S3 物件的路徑,請輸入
basic-beam-app-1.0.jar
。
-
-
在 [存取應用程式資源] 下,針對 [存取權限] 選擇 [建立/更新IAM角色]
kinesis-analytics-MyApplication-us-west-2
。 -
輸入下列資料:
群組 ID 金鑰 值 BeamApplicationProperties
InputStreamName
ExampleInputStream
BeamApplicationProperties
OutputStreamName
ExampleOutputStream
BeamApplicationProperties
AwsRegion
us-west-2
-
在監控下,確保監控指標層級設為應用程式。
-
若要CloudWatch 記錄,請選取 [啟用] 核取方塊。
-
選擇更新。
注意
當您選擇啟用 CloudWatch 記錄時,Apache Flink 的受管理服務會為您建立記錄群組和記錄資料流。這些資源的名稱如下所示:
-
日誌群組:
/aws/kinesis-analytics/MyApplication
-
日誌串流:
kinesis-analytics-log-stream
此日誌串流用於監控應用程式。這與應用程式用來傳送結果的日誌串流不同。
執行應用程式
透過執行應用程式、開啟 Apache Flink 儀表板並選擇所需的 Flink 作業,即可檢視 Flink 作業圖表。
您可以在 CloudWatch 主控台上檢查 Apache Flink 的受管理服務量度,以確認應用程式是否正常運作。
清理 AWS 資源
本節包括清除在「暫停視窗」教學課程中建立之 AWS 資源的程序。
本主題包含下列章節:
刪除適用於 Apache Flink 應用程式的受管理服務
在 https://console.aws.amazon.com /flink 開啟適用於阿帕奇 Flink 的受管理服務
在適用於 Apache Flink 的受管理服務面板中,選擇MyApplication。
在應用程式的頁面,選擇刪除,然後確認刪除。
刪除您的 Kinesis 資料串流
在 https://console.aws.amazon.com/
Kinesis 處開啟室壁運動主控台。 在「Kinesis Data Streams」面板中,選擇ExampleInputStream。
在ExampleInputStream頁面中,選擇「刪除 Kinesis 串流」,然後確認刪除。
在 Kinesis 串流頁面中,選擇,選擇「ExampleOutputStream動作」,選擇「刪除」,然後確認刪除。
刪除您的 Amazon S3 對象和存儲桶
在開啟 Amazon S3 主控台https://console.aws.amazon.com/s3/
。 選擇 ka-app-code-
<username>
桶。選擇刪除,然後輸入儲存貯體名稱以確認刪除。
刪除您的IAM資源
在開啟IAM主控台https://console.aws.amazon.com/iam/
。 在導覽列中,選擇政策。
在篩選器控制項中,輸入 kinesis。
選擇 kinesis-analytics-service--MyApplication us-we st-2 原則。
選擇政策動作,然後選擇刪除。
在導覽列中,選擇角色。
選擇運動分析-MyApplication us-west-2 角色。
選擇刪除角色,然後確認刪除。
刪除您的 CloudWatch 資源
在開啟 CloudWatch 主控台https://console.aws.amazon.com/cloudwatch/
。 在導覽列中,選擇日誌。
選擇 /aws/運動分析/日誌群MyApplication組。
選擇刪除日誌群組,然後確認刪除。
後續步驟
現在您已建立並執行使用 Apache Beam 轉換資料的基本 Managed Service for Apache Flink 應用程式,請參閱下列應用程式,取得更進階 Managed Service for Apache Flink 解決方案的範例。
Beam 用於 Managed Service for Apache Flink 串流研討會
:在此研討會中,我們將探索一個端對端範例,將批次和串流方面結合在一個統一的 Apache Beam 管道中。