在 Apache Flink 的托管服务中使用运行时属性 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Apache Flink 的托管服务中使用运行时属性

您可以使用运行时系统属性配置应用程序,而无需重新编译应用程序代码。

使用控制台管理运行时属性

您可以使用在 Apache Flink 托管服务应用程序中添加、更新或删除运行时属性。 AWS Management Console

注意

如果您使用的是早期支持的 Apache Flink 版本,并且想要将现有应用程序升级到 Apache Flink 1.19.1,则可以使用就地升级 Apache Flink 版本来实现。通过就地版本升级,您可以针对ARN各个 Apache Flink 版本保持应用程序的可追溯性,包括快照、日志、指标、标签、Flink 配置等。您可以在 and st READY ate 中RUNNING使用此功能。有关更多信息,请参阅 使用 Apache Flink 的就地版本升级

更新 Managed Service for Apache Flink 应用程序的运行时系统属性
  1. 在 /flink 上打开适用于 Apache Flink 的托管服务控制台 https://console.aws.amazon.com

  2. 选择您的 Managed Service for Apache Flink应用程序 选择 Application details (应用程序详细信息)

  3. 在应用程序页面上,选择 Configure (配置)

  4. 展开 Properties (属性) 部分。

  5. 使用 Properties (属性) 部分中的控件,以键值对形式定义一个属性组。可以使用这些控件添加、更新或删除属性组和运行时系统属性。

  6. 选择更新

使用管理运行时属性 CLI

您可以使用 AWS CLI 添加、更新或删除运行时系统属性。

本节包括为应用程序配置运行时属性的API操作请求示例。有关如何使用JSON文件作为API操作输入的信息,请参见适用于 Apache 的托管服务 Flink API 示例代码

注意

替换示例账户 ID (012345678901) 在以下示例中使用您的账户 ID。

在创建应用程序时添加运行时属性

CreateApplication 操作的以下示例请求在创建应用程序时添加两个运行时系统属性组(ProducerConfigPropertiesConsumerConfigProperties):

{ "ApplicationName": "MyApplication", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_19", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }

在现有应用程序中添加和更新运行时属性

UpdateApplication 操作的以下示例请求为现有应用程序添加或更新运行时系统属性:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 2, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
注意

如果您使用的键在属性组中没有相应的运行时系统属性,则 Managed Service for Apache Flink将键值对添加为新属性。如果将一个键用于属性组中的现有运行时系统属性,Managed Service for Apache Flink将更新属性值。

移除运行时属性

UpdateApplication 操作的以下示例请求从现有应用程序中删除所有运行时系统属性和属性组:

{ "ApplicationName": "MyApplication", "CurrentApplicationVersionId": 3, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [] } } }
重要

如果省略现有的属性组或属性组中的现有属性键,则会删除该属性组或属性。

在适用于 Apache Flink 的托管服务应用程序中访问运行时属性

您可以使用静态 KinesisAnalyticsRuntime.getApplicationProperties() 方法在 Java 应用程序代码中检索运行时系统属性,该方法返回一个 Map<String, Properties> 对象。

以下 Java 代码示例检索应用程序的运行时系统属性:

Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

您按如下方式检索一个属性组(作为 Java.Util.Properties 对象):

Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");

通常,您传入 Properties 对象以配置 Apache Flink 源或接收器,而无需检索各个属性。以下代码示例说明了如何传入从运行时系统属性中检索的 Properties 对象以创建 Flink 源:

private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<String>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; }

有关代码示例,请参阅 创建和使用适用于 Apache Flink 应用程序的托管服务的示例