选择您的 Cookie 首选项

我们使用必要 Cookie 和类似工具提供我们的网站和服务。我们使用性能 Cookie 收集匿名统计数据,以便我们可以了解客户如何使用我们的网站并进行改进。必要 Cookie 无法停用,但您可以单击“自定义”或“拒绝”来拒绝性能 Cookie。

如果您同意,AWS 和经批准的第三方还将使用 Cookie 提供有用的网站功能、记住您的首选项并显示相关内容,包括相关广告。要接受或拒绝所有非必要 Cookie,请单击“接受”或“拒绝”。要做出更详细的选择,请单击“自定义”。

创建并运行适用于 Apache Flink 的托管服务应用程序

聚焦模式
创建并运行适用于 Apache Flink 的托管服务应用程序 - Amazon Kinesis Data Streams

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在本练习中,您将创建面向应用程序的适用于 Apache Flink 的托管服务,并将数据流作为源和接收器。

创建两个 Amazon Kinesis 数据流

在为本练习创建适用于 Apache Flink 的亚马逊托管服务之前,请创建两个 Kinesis 数据流(ExampleInputStreamExampleOutputStream)。您的应用程序将这些数据流用于应用程序源和目标流。

可以使用 Amazon Kinesis 控制台或以下 AWS CLI 命令创建这些流。有关控制台说明,请参阅创建和更新数据流

创建数据流 (AWS CLI)
  1. 要创建第一个流 (ExampleInputStream),请使用以下 Amazon Kinesis create-streamAWS CLI命令。

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  2. 要创建应用程序用来写入输出的第二个流,请运行同一命令(将流名称更改为 ExampleOutputStream)。

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

将示例记录写入输入流

在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。

注意

此部分需要 AWS SDK for Python (Boto)

  1. 使用以下内容创建名为 stock.py 的文件:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
  2. 在本教程的后面部分,您运行 stock.py 脚本,以将数据发送到应用程序。

    $ python stock.py

下载并检查 Apache Flink 流式处理 Java 代码

此示例的 Java 应用程序代码可从 GitHub 获得。要下载应用程序代码,请执行以下操作:

  1. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
  2. 导航到 GettingStarted 目录。

应用程序代码位于 CustomSinkStreamingJob.javaCloudWatchLogSink.java 文件中。请注意有关应用程序代码的以下信息:

  • 应用程序使用 Kinesis 源从源流中进行读取。以下代码段创建 Kinesis 接收器:

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

编译应用程序代码

在本节中,您使用 Apache Maven 编译器创建应用程序的 Java 代码。有关安装 Apache Maven 和 Java 开发工具包 (JDK) 的信息,请参阅完成练习的先决条件

您的 Java 应用程序需要以下组件:

  • 一个项目对象模型 (pom.xml) 文件。此文件包含有关应用程序的配置和从属项的信息,包括适用于 Apache Flink 的亚马逊托管服务库。

  • 它是一种 main 方法,其中包含应用程序的逻辑。

注意

要将 Kinesis 连接器用于以下应用程序,您必须下载连接器源代码并构建该连接器,如 Apache Flink 文档中所述。

创建并编译应用程序代码
  1. 在您的开发环境中创建 Java/Maven 应用程序。有关创建应用程序的信息,请参阅有关开发环境的文档:

  2. 将以下代码用于名为 StreamingJob.java 的文件。

    package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); /* * if you would like to use runtime configuration properties, uncomment the * lines below * input.addSink(createSinkFromApplicationProperties()) */ input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }

    请注意以下有关上述代码示例的信息:

    • 此文件包含 main 方法,它定义应用程序的功能。

    • 您的应用程序使用 StreamExecutionEnvironment 对象创建源和接收连接器以访问外部资源。

    • 该应用程序将使用静态属性创建源和接收连接器。要使用动态应用程序属性,请使用 createSourceFromApplicationPropertiescreateSinkFromApplicationProperties 方法以创建连接器。这些方法读取应用程序的属性来配置连接器。

  3. 要使用您的应用程序代码,您将其编译和打包成 JAR 文件。您可以通过两种方式之一编译和打包您的代码:

    • 使用命令行 Maven 工具。在包含 pom.xml 文件的目录中通过运行以下命令创建您的 JAR 文件:

      mvn package
    • 设置开发环境。有关详细信息,请参阅您的开发环境文档。

    您可以作为 JAR 文件上传您的包,也可以将包压缩为 ZIP 文件并上传。如果您使用 AWS CLI 创建应用程序,您可以指定您的代码内容类型(JAR 或 ZIP)。

  4. 如果编译时出错,请验证 JAVA_HOME 环境变量设置正确。

如果应用程序成功编译,则创建以下文件:

target/java-getting-started-1.0.jar

上传 Apache Flink 流式处理 Java 代码

在本节中,您创建 Amazon Simple Storage Service (Amazon S3) 存储桶并上传应用程序代码。

上传应用程序代码
  1. 通过以下网址打开 Simple Storage Service(Amazon S3)控制台:https://console.aws.amazon.com/s3/

  2. 选择 创建存储桶

  3. 存储桶名称 字段中输入 ka-app-code-<username>。将后缀(如您的用户名)添加到存储桶名称,以使其具有全局唯一性。选择 下一步

  4. 配置选项步骤中,让设置保持原样,然后选择下一步

  5. 设置权限步骤中,让设置保持原样,然后选择下一步

  6. 选择 创建存储桶

  7. 在 Amazon S3 控制台中,选择 ka-app-code-<用户名> 存储桶,然后选择上传

  8. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 java-getting-started-1.0.jar 文件。选择 下一步

  9. 设置权限步骤中,让设置保持原样。选择 下一步

  10. 设置属性步骤中,让设置保持原样。选择上传

您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。

创建并运行适用于 Apache Flink 的托管服务

您可以使用控制台或 AWS CLI 创建和运行适用于 Apache Flink 的托管服务的应用程序。

注意

当您使用控制台创建应用程序时,将创建您的 AWS Identity and Access Management(IAM) 和 Amazon CloudWatch Logs 资源。当您使用 AWS CLI 创建应用程序时,您可以单独创建这些资源。

创建并运行应用程序(控制台)

按照以下步骤,使用控制台创建、配置、更新和运行应用程序。

创建应用程序

  1. 打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis

  2. 在 Amazon Kinesis 控制面板上,选择创建分析应用程序

  3. Kinesis Analytics – 创建应用程序页面上,提供应用程序详细信息,如下所示:

    • 对于 应用程序名称 ,输入 MyApplication

    • 对于描述,输入 My java test app

    • 对于 Runtime (运行时),请选择 Apache Flink 1.6

  4. 对于访问权限,请选择 创建/更新 IAM 角色 kinesis-analytics-MyApplication-us-west-2

  5. 选择创建应用程序

注意

在使用控制台创建适用于 Apache Flink 的亚马逊托管服务应用程序时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:

  • 策略:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesis-analytics-MyApplication-us-west-2

编辑 IAM 策略

编辑 IAM policy 以添加访问 Kinesis 数据流的权限。

  1. 通过 https://console.aws.amazon.com/iam/ 打开 IAM 控制台。

  2. 选择策略。选择控制台在上一部分中为您创建的 kinesis-analytics-service-MyApplication-us-west-2 策略。

  3. 摘要 页面上,选择 编辑策略。选择 JSON 选项卡。

  4. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (012345678901) 替换为您的账户 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

配置应用程序

  1. 我的应用程序 页面上,选择配置

  2. 配置应用程序 页面上,提供 代码位置

    • 对于Amazon S3 存储桶,请输入ka-app-code-<username>

    • 在 Amazon S3 对象的路径中,输入java-getting-started-1.0.jar

  3. 对应用程序的访问权限 下,对于 访问权限,选择 创建/更新 IAM 角色 kinesis-analytics-MyApplication-us-west-2

  4. Properties (属性) 下,对于 Group ID (组 ID),输入 ProducerConfigProperties

  5. 输入以下应用程序属性和值:

    flink.inputstream.initpos LATEST
    aws:region us-west-2
    AggregationEnabled false
  6. 监控 下,确保 监控指标级别 设置为 应用程序

  7. 对于 CloudWatch 日志记录,选中启用复选框。

  8. 选择更新

注意

在选择启用 CloudWatch 日志记录时,适用于 Apache Flink 的托管服务将为您创建日志组和日志流。这些资源的名称如下所示:

  • 日志组:/aws/kinesis-analytics/MyApplication

  • 日志流:kinesis-analytics-log-stream

运行应用程序

  1. MyApplication (我的应用程序) 页面上,选择 Run (运行)。确认该操作。

  2. 当应用程序正在运行时,请刷新页面。控制台将显示 Application graph (应用程序图表)

停止应用程序

我的应用程序 页面上,选择 停止。确认该操作。

更新应用程序

使用控制台,您可以更新应用程序设置,例如应用程序属性、监控设置,或应用程序 JAR 文件的位置和文件名。如果您需要更新应用程序代码,您还可以从 Amazon S3 存储桶重新加载应用程序 JAR。

我的应用程序 页面上,选择 配置。更新应用程序设置,然后选择更新

创建并运行应用程序(AWS CLI)

在本节中,您将使用 AWS CLI 创建和运行适用于 Apache Flink 的托管服务应用程序。适用于 Apache Flink 的托管服务使用该kinesisanalyticsv2AWS CLI命令创建适用于 Apache Flink 的托管服务应用程序并与之交互。

创建权限策略

首先,使用两个语句创建权限策略:一个语句授予对源流执行 read 操作的权限,另一个语句授予对接收器流执行 write 操作的权限。然后,将策略附加到 IAM 角色(下一部分中将创建此角色)。因此,在适用于 Apache Flink 的托管服务代入该角色时,服务具有必要的权限从源流进行读取和写入接收器流。

使用以下代码创建 KAReadSourceStreamWriteSinkStream 权限策略。将 username 替换为您用于创建 Amazon S3 存储桶来存储应用程序代码的用户名。将 Amazon 资源名称 (ARN) 中的账户 ID (012345678901) 替换为您的账户 ID。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-username", "arn:aws:s3:::ka-app-code-username/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

有关创建权限策略的分步说明,请参阅《IAM 用户指南》中的教程:创建和附加您的第一个客户管理型策略

注意

要访问其他 AWS 服务,可以使用 AWS SDK for Java。Apache Flink 托管服务会自动将开发工具包所需的凭证设置为与您的应用程序关联的服务执行 IAM 角色的凭证。无需执行其他步骤。

创建 IAM 角色

在本节中,您将创建一个 IAM 角色,适用于 Apache Flink 的托管服务可以代入此角色来读取源流和写入接收器流。

权限不足时,适用于 Apache Flink 的托管服务无法访问您的串流。您通过 IAM 角色授予这些权限。每个 IAM 角色附加了两种策略。此信任策略授予适用于 Apache Flink 的托管服务代入该角色的权限,权限策略确定适用于 Apache Flink 的托管服务代入这个角色后可以执行的操作。

您将在上一部分中创建的权限策略附加到此角色。

创建 IAM 角色
  1. 通过以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 在导航窗格中,选择 角色创建角色

  3. 选择受信任实体的类型 下,选择 AWS 服务。在 选择将使用此角色的服务 下,选择 Kinesis。在选择您的使用案例下,选择 Kinesis Analytics

    选择下一步: 权限

  4. 附加权限策略 页面上,选择 下一步: 审核。在创建角色后,您可以附加权限策略。

  5. 创建角色 页面上,输入KA-stream-rw-role作为角色名称。选择 创建角色

    现在,您已经创建了一个名为 KA-stream-rw-role 的新 IAM 角色。接下来,您更新角色的信任和权限策略。

  6. 将权限策略附加到角色。

    注意

    对于本练习,适用于 Apache Flink 的托管服务代入此角色,以便同时从 Kinesis 数据流(源)读取数据和将输出写入另一个 Kinesis 数据流。因此,您附加在上一步(创建权限策略)中创建的策略。

    1. 摘要 页上,选择 权限 选项卡。

    2. 选择附加策略

    3. 在搜索框中,输入 KAReadSourceStreamWriteSinkStream(您在上一部分中创建的策略)。

    4. 选择 KAReadInputStreamWriteOutputStream 策略,然后选择 Attach policy (附加策略)

现在,您已经创建了应用程序用来访问资源的服务执行角色。记下新角色的 ARN。

有关创建角色的分步说明,请参阅《IAM 用户指南》中的在您的中创建 IAM 角色(控制台)

创建适用于 Apache Flink 的托管服务应用程序

  1. 将以下 JSON 代码保存到名为 create_request.json 的文件中。将示例角色 ARN 替换为您之前创建的角色的 ARN。将存储桶 ARN 后缀 (username) 替换为在前一部分中选择的后缀。将服务执行角色中的示例账户 ID (012345678901) 替换为您的账户 ID。

    { "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
  2. 使用上述请求执行 CreateApplication 操作来创建应用程序:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

应用程序现已创建。您在下一步中启动应用程序。

启动应用程序

在本节中,您使用 StartApplication 操作来启动应用程序。

启动应用程序
  1. 将以下 JSON 代码保存到名为 start_request.json 的文件中。

    { "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. 使用上述请求执行 StartApplication 操作来启动应用程序:

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

应用程序正在运行。您可以在 Amazon CloudWatch 控制台上查看适用于 Apache Flink 的托管服务的指标,以验证应用程序是否正常运行。

停止应用程序

在本节中,您使用 StopApplication 操作来停止应用程序。

停止应用程序
  1. 将以下 JSON 代码保存到名为 stop_request.json 的文件中。

    {"ApplicationName": "test" }
  2. 使用下面的请求执行 StopApplication 操作来停止应用程序:

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

应用程序现已停止。

隐私网站条款Cookie 首选项
© 2025, Amazon Web Services, Inc. 或其附属公司。保留所有权利。