在 Apache Flink 的受管理服務中使用執行階段屬性 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Apache Flink 的受管理服務中使用執行階段屬性

您可以使用執行期屬性來設定應用程式,而無需重新編譯應用程式的程式碼。

使用主控台管理執行階段屬性

您可以使用在 AWS Management Console Apache Flink 應用程式中新增、更新或移除執行階段屬性。

注意

如果您使用的是較早支援的 Apache Flink 版本,並且想要將現有的應用程式升級到 Apache Flink 1.19.1,您可以使用就地 Apache Flink 版本升級來執行此操作。透過就地版本升級,您可以ARN跨 Apache Flink 版本保留應用程式可追溯性,包括快照、記錄、指標、標籤、Flink 組態等。您可以在RUNNINGREADY狀態中使用此功能。如需詳細資訊,請參閱針對 Apache Flink 使用就地版本升級

更新 Managed Service for Apache Flink 應用程式的執行期屬性
  1. 在 https://console.aws.amazon.com /flink 開啟適用於阿帕奇 Flink 的受管理服務

  2. 選擇 Managed Service for Apache Flink 應用程式。選擇應用程式詳細資訊

  3. 在應用程式頁面,選擇設定

  4. 展開屬性區段。

  5. 使用屬性區段中的控制項,以鍵值對定義屬性群組。可以使用這些控制項來新增、更新或移除屬性群組和執行期屬性。

  6. 選擇更新

使用管理執行階段屬性 CLI

您可以使用 AWS CLI 新增、更新或移除執行期屬性。

本節包含設定應用程式執行階段屬性的API動作要求範例。若要取得有關如何使用JSON檔案進API行動作輸入的資訊,請參閱〈〉Managed Service for Apache Flink API範例程式碼

注意

取代範例帳戶 ID (012345678901) 在下列範例中顯示您的帳戶 ID。

建立應用程式時新增執行階段屬性

CreateApplication 動作的下列範例請求會在您建立應用程式時新增兩個執行期屬性群組 (ProducerConfigPropertiesConsumerConfigProperties):

{ "ApplicationName": "MyApplication", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_19", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }

在現有應用程式中新增和更新執行階段屬性

UpdateApplication 動作的下列範例請求會新增或更新現有應用程式的執行期屬性:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
注意

如果您使用的索引鍵在屬性群組中沒有對應的執行期屬性,Managed Service for Apache Flink 會將鍵值對新增為新屬性。如果您將索引鍵用於屬性群組中的現有執行期屬性,Managed Service for Apache Flink 會更新屬性值。

刪除運行時屬性

UpdateApplication 動作的下列範例請求會從現有應用程式中移除所有執行期屬性和屬性群組:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 3, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [] } } }
重要

如果您省略現有屬性群組或屬性群組中的現有屬性索引鍵,則會移除該屬性群組或屬性。

在 Apache Flink 應用程式的受管理服務中存取執行階段屬性

您可以使用可傳回 Map<String, Properties> 物件的靜態 KinesisAnalyticsRuntime.getApplicationProperties() 方法,在 Java 應用程式的程式碼中擷取執行期屬性。

下列 Java 程式碼範例會擷取應用程式的執行期屬性:

Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

您可以擷取屬性群組 (作為 Java.Util.Properties 物件),如下所示:

Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");

您通常透過傳入 Properties 物件來設定 Apache Flink 來源或接收器,而無需擷取個別屬性。下列程式碼範例示範如何透過傳入從執行期屬性擷取的 Properties 物件來建立 Flink 來源:

private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<String>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; }

如需程式碼範例,請參閱「建立和使用 Managed Service for Apache Flink 應用程式的範例」。