本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
為 Apache Flink 應用程式建立並執行受管理的服務
在本練習中,您會建立 Managed Service for Apache Flink 應用程式,並將資料串流作為來源和目的地。
本節包含下列步驟:
建立兩個 Amazon Kinesis 資料串流
在為本練習建立適用於 Apache Flink 的 Amazon 受管服務之前,請先建立兩個 Kinesis 資料串流 (ExampleInputStream
和ExampleOutputStream
)。您的應用程式會將這些串流用於應用程式來源和目的地串流。
您可以使用 Amazon Kinesis 主控台或下列方法建立這些串流 AWS CLI 指令。如需主控台說明,請參閱建立及更新資料串流。
若要建立資料串流 (AWS CLI)
-
若要建立第一個串流 (
ExampleInputStream
),請使用下列 Amazon Kinesiscreate-stream
AWS CLI 指令。$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
-
若要建立應用程式用來寫入輸出的第二個串流,請執行相同的命令,將串流名稱變更為
ExampleOutputStream
。$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
寫入範例記錄至輸入串流
在本節,您會使用 Python 指令碼將範例記錄寫入供應用程式處理的串流。
注意
-
使用下列內容建立名為
stock.py
的檔案:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
-
在教學課程後半段,您會執行
stock.py
指令碼來傳送資料至應用程式。$ python stock.py
下載並檢查阿帕奇 Flink 流 Java 代碼
此範例的 Java 應用程式程式碼可從中取得 GitHub。若要下載應用程式的程式碼,請執行下列動作:
-
使用以下指令複製遠端儲存庫:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
-
導覽至
GettingStarted
目錄。
應用程式碼位於 CustomSinkStreamingJob.java
和 CloudWatchLogSink.java
檔案。請留意下列與應用程式的程式碼相關的資訊:
-
應用程式使用 Kinesis 來源從來源串流讀取。以下程式碼片段會建立 Kinesis 目的地:
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
編譯應用程式程式碼
在本節中,您會使用 Apache Maven 編譯器來建立應用程式的 Java 程式碼。如需有關安裝 Apache Maven 和 Java 開發套件 (JDK) 的詳細資訊,請參閱完成練習的先決條件。
Java 應用程式需要下列元件:
-
專案物件模型 (pom.xml)
檔案。此檔案包含有關應用程式組態和相依性的資訊,包括 Apache Flink 程式庫的 Amazon 受管服務。 -
包含應用程式邏輯的
main
方法。
注意
若要針對下列應用程式使用 Kinesis 連接器,您必須下載連接器的原始程式碼,並依照 Apache Flink
建立和編譯應用程式碼
-
在您的開發環境中建立 Java/Maven 應用程式。如需建立應用程式的詳細資訊,請參閱您開發環境的文件:
-
將以下程式碼用於名為
StreamingJob.java
的檔案。package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); /* * if you would like to use runtime configuration properties, uncomment the * lines below * input.addSink(createSinkFromApplicationProperties()) */ input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }
請注意下列關於上述程式碼範例的事項:
-
此檔案包含定義應用程式功能的
main
方法。 -
您的應用程式會建立來源與目的地連接器,以使用
StreamExecutionEnvironment
物件來存取外部資源。 -
應用程式會使用靜態屬性來建立來源與目的地連接器。若要使用動態應用程式屬性,請使用
createSourceFromApplicationProperties
和createSinkFromApplicationProperties
方法來建立連接器。這些方法會讀取應用程式的屬性,來設定連接器。
-
-
若要使用應用程式程式碼,請將其編譯並封裝到JAR檔案中。您可以使用下列兩種方式的其中之一,編譯和封裝您的程式碼:
-
使用命令列 Maven 工具。在包含JAR檔案的目錄中執行下列命令,以建立
pom.xml
檔案:mvn package
-
設定開發環境。如需詳細資訊,請參閱您的開發環境文件。
您可以將套件上傳為JAR檔案,也可以壓縮套件並將其上傳為ZIP檔案。如果您使用 AWS CLI,您可以指定您的程式碼內容類型 (JAR或ZIP)。
-
-
如果編譯時發生錯誤,請確認您的
JAVA_HOME
環境變數是否正確設定。
如果應用程式成功編譯,則會建立下列檔案:
target/java-getting-started-1.0.jar
上傳阿帕奇 Flink 流 Java 代碼
在本節中,您會建立 Amazon Simple Storage Service (Amazon S3) 儲存貯體並上傳您的應用程式的程式碼。
上傳應用程式的程式碼
在開啟 Amazon S3 主控台https://console.aws.amazon.com/s3/
。 -
選擇建立儲存貯體。
-
在儲存貯體名稱欄位中,輸入
ka-app-code-
。新增尾碼至儲存貯體名稱,例如您的使用者名稱,使其成為全域唯一的。選擇 Next (下一步)。<username>
-
在設定選項步驟中,保留原有設定並選擇 Next (下一步)。
-
在設定許可步驟中,保留原有設定並選擇 Next (下一步)。
-
選擇建立儲存貯體。
-
在 Amazon S3 控制台中,選擇 ka-app-code-
<username>
桶,然後選擇上傳。 -
在選取檔案步驟中,選擇 新增檔案。導覽至您在上一步驟中建立的
java-getting-started-1.0.jar
檔案。選擇 Next (下一步)。 -
在設定許可步驟中,保留原有設定。選擇 Next (下一步)。
-
在設定屬性步驟中,保留原有設定。選擇上傳。
您的應用程式的程式碼現在儲存在您的應用程式可以存取的 Amazon S3 儲存貯體中。
建立並執行 Apache Flink 應用程式的受管理服務
您可以使用主控台或 AWS CLI.
注意
當您使用主控台建立應用程式時, AWS Identity and Access Management (IAM)和 Amazon CloudWatch 日誌資源是為您創建的。當您使用 AWS CLI,您可以個別建立這些資源。
創建並運行應用程序(控制台)
依照以下步驟來使用主控台建立、設定、更新及執行應用程式。
建立應用程式
在 https://console.aws.amazon.com/
Kinesis 處開啟室壁運動主控台。 -
在 Amazon Kinesis 儀表板上,選擇建立分析應用程式。
-
在 Kinesis Analytics - Create application (Kinesis 分析 - 建立應用程式) 頁面,請如下所述提供應用程式詳細資訊:
-
在應用程式名稱中,輸入
MyApplication
。 -
對於 Description (說明),輸入
My java test app
。 -
針對 Runtime (執行時間),選擇 Apache Flink 1.6。
-
-
對於 [存取權限],選擇 [建立/更新IAM角色]
kinesis-analytics-MyApplication-us-west-2
。 -
選擇建立應用程式。
注意
使用主控台為 Apache Flink 應用程式建立 Amazon 受管服務時,您可以選擇為應用程式建立IAM角色和政策。應用程式使用此角色和政策來存取其相依資源。這些IAM資源會使用您的應用程式名稱和區域命名,如下所示:
-
政策:
kinesis-analytics-service-
MyApplication
-us-west-2
-
角色:
kinesis-analytics-
MyApplication
-us-west-2
編輯IAM策略
編輯原IAM則以新增存取 Kinesis 資料串流的權限。
在開啟IAM主控台https://console.aws.amazon.com/iam/
。 -
選擇政策。選擇主控台為您在上一節所建立的
kinesis-analytics-service-MyApplication-us-west-2
政策。 -
在摘要頁面,選擇編輯政策。選擇標JSON籤。
-
將下列政策範例的反白部分新增至政策。取代範例帳戶 IDs (
012345678901
) 使用您的帳戶 ID。{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-
username
/java-getting-started-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" }
] }
設定應用程式
-
在MyApplication頁面上,選擇設定。
-
在設定應用程式頁面,提供程式碼位置:
-
對於 Amazon S3 儲存貯體,請輸入
ka-app-code-
。<username>
-
對於 Amazon S3 物件的路徑,請輸入
java-getting-started-1.0.jar
。
-
-
在 [存取應用程式資源] 下,針對 [存取權限] 選擇 [建立/更新IAM角色]
kinesis-analytics-MyApplication-us-west-2
。 -
在屬性下,為群組 ID輸入
ProducerConfigProperties
。 -
輸入以下應用程式屬性和數值:
金鑰 值 flink.inputstream.initpos
LATEST
aws:region
us-west-2
AggregationEnabled
false
-
在監控下,確保監控指標層級設為應用程式。
-
若要CloudWatch 記錄,請選取 [啟用] 核取方塊。
-
選擇更新。
注意
當您選擇啟用 CloudWatch 記錄時,Apache Flink 的受管理服務會為您建立記錄群組和記錄資料流。這些資源的名稱如下所示:
-
日誌群組:
/aws/kinesis-analytics/MyApplication
-
日誌串流:
kinesis-analytics-log-stream
執行應用程式
-
在MyApplication頁面上,選擇 [執行]。確認動作。
-
應用程式執行時,重新整理頁面。主控台會顯示 Application graph (應用程式圖形)。
停止應用程式
在MyApplication頁面上,選擇 [停止]。確認動作。
更新應用程式
您可以使用主控台更新應用程式設定,例如應用程式屬性、監視設定,以及應用程式的位置或檔案名稱JAR。如果您需要更新應用程式程式碼,也可以JAR從 Amazon S3 儲存貯體重新載入應用程式。
在MyApplication頁面上,選擇設定。更新應用程式設定,然後選擇更新。
建立並執行應用程式 (AWS CLI)
在本節中,您可以使用 AWS CLI 以建立並執行 Apache Flink 應用程式的受管理服務。阿帕奇 Flink 的託管服務使用 kinesisanalyticsv2
AWS CLI 用來建立 Apache Flink 應用程式的受管理服務並與之互動的指令。
建立許可政策
您會先建立具有兩條陳述式的許可政策:一條陳述式授與來源串流上 read
動作的許可,而另一條則是授與目的地串流上 write
動作的許可。接著,您可以將原則附加至IAM角色 (您在下一節中建立)。因此,當 Managed Service for Apache Flink 擔任角色時,服務便具有從來源串流讀取並寫入目的地串流的所需許可。
使用以下程式碼來建立 KAReadSourceStreamWriteSinkStream
許可政策。以您用於建立 Amazon S3 儲存貯體 (以儲存應用程式的程式碼) 的使用者名稱來取代
。將 Amazon 資源名稱 (ARNs) (username
) 中的帳戶 ID 取代為您的帳戶 ID。012345678901
{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-
username
", "arn:aws:s3:::ka-app-code-username
/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" } ] }
如需建立權限原則的指 step-by-step 示,請參閱IAM使用指南中的教學課程:建立並附加您的第一個客戶管理原則。
注意
若要存取其他 AWS 服務,您可以使用 AWS SDK for Java。 Apache Flink 的受管理服務會自動SDK將所需的認證設定為與應用程式相關聯之服務執行IAM角色的認證。無須採取額外的步驟。
建立 IAM 角色
在本節中,您將建立 Apache Flink 的受管理服務可假定讀取來源串流並寫入接收串流的IAM角色。
Managed Service for Apache Flink 沒有許可,無法存取串流。您可以透過IAM角色授與這些權限。每個IAM角色都附加了兩個策略。信任政策會授與擔任角色的 Managed Service for Apache Flink 許可,而許可政策決定了 Managed Service for Apache Flink 在擔任角色後可以執行的作業。
您會將在上一節中建立的許可政策連接至此角色。
建立 IAM 角色
在開啟IAM主控台https://console.aws.amazon.com/iam/
。 -
在導覽窗格中,選擇角色 、建立角色。
-
在 [選取信任的身分類型] 下,選擇 AWS 服務。在選擇將使用此角色的服務下,選擇 Kinesis。在 Select your use case (選取您的使用案例) 下,選擇 Kinesis Analytics (Kinesis 分析)。
選擇下一步:許可。
-
在連接許可政策頁面,選擇下一步:檢閱。您會在建立角色後連接許可政策。
-
在建立角色頁面,輸入
KA-stream-rw-role
作為角色名稱。選擇建立角色。現在,您已經創建了一個名為的新IAM角色
KA-stream-rw-role
。您接著會更新角色的信任和許可政策。 -
將 許可政策連接到角色。
注意
在此練習中,Managed Service for Apache Flink 擔任從 Kinesis 資料串流 (來源) 讀取資料並將輸出寫入另一個 Kinesis 資料串流的角色。因此您會連接在上一個步驟中建立的政策,建立許可政策。
-
在摘要頁面,選擇許可標籤。
-
選擇連接政策。
-
在搜尋方塊中,輸入
KAReadSourceStreamWriteSinkStream
(您在上一節中建立的政策)。 -
選擇KAReadInputStreamWriteOutputStream策略,然後選擇「附加策略」。
-
您現在已建立應用程式用於存取資源的服務執行角色。記下新角色ARN的。
如需建立角色的指 step-by-step 示,請參閱IAM使用指南中的建立IAM角色 (主控台)。
建立 Managed Service for Apache Flink 應用程式
-
將以下JSON代碼保存到名為的文件中
create_request.json
。將範例角ARN色取代ARN為您先前建立的角色。將值區ARN尾碼 (
) 取代為您在上一節中選擇的尾碼。使用您的帳戶 ID 取代服務執行角色中的範例帳戶 ID (username
)。012345678901
{ "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::
012345678901
:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username
", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } } -
使用前述請求執行
CreateApplication
動作以建立應用程式:aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
應用程式現在已建立。您會在下一個步驟中啟動應用程式。
啟動應用程式
在本節中,您會透過 StartApplication
動作來啟動應用程式。
啟動應用程式
-
將以下JSON代碼保存到名為的文件中
start_request.json
。{ "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
-
以啟動應用程式的上述請求,執行
StartApplication
動作:aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
應用程式現在正在執行。您可以在 Amazon CloudWatch 主控台上查看 Apache Flink 的受管服務指標,以確認應用程式是否正常運作。
停止應用程式
在本節,您會使用該 StopApplication
動作來停止應用程式。
停止應用程式
-
將以下JSON代碼保存到名為的文件中
stop_request.json
。{"ApplicationName": "test" }
-
以停止應用程式的上述請求,執行
StopApplication
動作:aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
現在已停止應用程式。