保持适用于 Apache Flink 应用程序的托管服务的最佳实践 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

保持适用于 Apache Flink 应用程序的托管服务的最佳实践

本节包含有关为 Apache Flink 应用程序开发稳定、高性能的托管服务的信息和建议。

尽量减少超级跑车的大小 JAR

Java/Scala application must be packaged in an uber (super/fat)JAR,并包括运行时尚未提供的所有其他必需依赖项。但是,Uber 的大小JAR会影响应用程序的启动和重启时间,并可能JAR导致超过 512 MB 的限制。

为了优化部署时间,您的优步JAR不应包含以下内容:

  • 运行时提供的任何依赖关系,如以下示例所示。它们应该在POM文件或 Gradle 配置compileOnly中具有provided作用域。

  • 任何仅用于测试的依赖项,例如JUnit或 Mockito。它们应该在POM文件或 Gradle 配置testImplementation中具有test作用域。

  • 您的应用程序实际未使用的任何依赖项

  • 您的应用程序所需的任何静态数据或元数据。静态数据应由应用程序在运行时加载,例如从数据存储或 Amazon S3 加载。

  • 有关上述配置设置的详细信息,请参阅此POM示例文件

提供的依赖关系

适用于 Apache 的托管服务 Flink 运行时提供了许多依赖项。这些依赖关系不应包含在 fat 中,JAR并且必须在POM文件中具有provided作用域,或者在maven-shade-plugin配置中明确排除这些依赖关系。fat 中包含的任何这些依赖关系在运行时都会JAR被忽略,但会增加部署期间JAR添加开销的大小。

运行时版本 1.18、1.19 和 1.20 中由运行时提供的依赖关系:

  • org.apache.flink:flink-core

  • org.apache.flink:flink-java

  • org.apache.flink:flink-streaming-java

  • org.apache.flink:flink-scala_2.12

  • org.apache.flink:flink-table-runtime

  • org.apache.flink:flink-table-planner-loader

  • org.apache.flink:flink-json

  • org.apache.flink:flink-connector-base

  • org.apache.flink:flink-connector-files

  • org.apache.flink:flink-clients

  • org.apache.flink:flink-runtime-web

  • org.apache.flink:flink-metrics-code

  • org.apache.flink:flink-table-api-java

  • org.apache.flink:flink-table-api-bridge-base

  • org.apache.flink:flink-table-api-java-bridge

  • org.apache.logging.log4j:log4j-slf4j-impl

  • org.apache.logging.log4j:log4j-api

  • org.apache.logging.log4j:log4j-core

  • org.apache.logging.log4j:log4j-1.2-api

此外com.amazonaws:aws-kinesisanalytics-runtime:1.2.0,还提供了用于在 Apache Flink 托管服务中获取应用程序运行时属性的库。

运行时提供的所有依赖项都必须使用以下建议,以将其包含在 uber 中JAR:

  • 在 Maven (pom.xml) 和 SBT (build.sbt) 中,使用provided作用域。

  • 在 Gradle (build.gradle) 中,使用compileOnly配置。

由于 Apache Flink 的父类优先加载,任何意外包含在 uber 中的依赖项都JAR将在运行时被忽略。有关更多信息,请参阅 Apache Flink 文档parent-first-patterns中的。

连接器

运行时中未包含的大多数 FileSystem 连接器(连接器除外)都必须包含在默认作用域 (compile) POM 的文件中。

其他建议

通常,你JAR提供给 Apache Flink 托管服务的 Apache Flink uber 应包含运行应用程序所需的最少代码。包含源类、测试数据集或引导状态的依赖关系不应包含在此 jar 中。如果需要在运行时提取静态资源,请将此问题分成诸如 Amazon S3 之类的资源。这方面的例子包括状态引导或推理模型。

花点时间考虑一下你的深度依赖树并移除非运行时依赖关系。

尽管适用于 Apache Flink 的托管服务支持 512MB 的 jar 大小,但这应该被视为规则的例外。Apache Flink 目前通过其默认配置支持大约 104MB 的 jar 大小,这应该是所需的 jar 的最大目标大小。

容错:检查点和保存点

使用检查点和保存点在 Apache Flink 托管服务应用程序中实现容错。在开发和维护应用程序时,请牢记以下几点:

  • 我们建议您继续为应用程序启用检查点功能。在计划维护期间以及由于服务问题、应用程序依赖项故障和其他问题而导致意外故障,检查点可以为应用程序提供容错功能。有关定期维护的更多信息,请参阅管理适用于 Apache Flink 的托管服务的维护任务

  • false在应用程序开发或故障排除期间SnapshotsEnabled将ApplicationSnapshotConfiguration:: 设置为。在每次应用程序停止期间,将会创建一个快照;如果应用程序处于不正常状态或性能不佳,则可能会出现问题。在应用程序处于生产状态并保持稳定后,将 SnapshotsEnabled 设置为 true

    注意

    我们建议您的应用程序每天创建几次快照,以便使用正确的状态数据正确重启。正确的快照频率取决于应用程序的业务逻辑。频繁拍摄快照可以恢复更新的数据,但会增加成本并需要更多的系统资源。

    有关监控应用程序停机时间的信息,请参阅

有关实施容错功能的更多信息,请参阅实现容错能力

连接器版本不受支持

从 Apache Flink 1.15 或更高版本开始,如果应用程序使用捆绑到应用程序中的不支持的 Kinesis 连接器版本,则适用于 Apache Flink 的托管服务会自动阻止应用程序启动或更新。JARs升级到适用于 Apache Flink 1.15 或更高版本的托管服务时,请确保使用的是最新的 Kinesis 连接器。可以是 1.15.2 或更高版本的任何版本。适用于 Apache Flink 的托管服务不支持所有其他版本,因为它们可能会导致一致性问题或使用 Savepoint 停止功能失败,从而阻止干净的停止/更新操作。要详细了解适用于 Apache Flink 的亚马逊托管服务 Flink 版本中的连接器兼容性,请参阅 Apach e Flink 连接器。

性能和并行度

应用程序可以调整应用程序并行度并避免性能陷阱,从而进行扩展以满足任何吞吐量级别要求。在开发和维护应用程序时,请牢记以下几点:

  • 验证是否充分预置了所有应用程序源和接收器,而不会受到限制。如果源和接收器是其他 AWS 服务,请使用监控这些服务CloudWatch

  • 对于并行度较高的应用程序,请检查是否将较高的并行度应用于应用程序中的所有操作符。默认情况下,Apache Flink 为应用程序图中的所有操作符应用相同的应用程序并行度。这可能会导致在源或接收器上出现预置问题,或者出现操作符数据处理瓶颈。您可以使用更改代码中每个运算符的并行度。setParallelism

  • 了解应用程序中的操作符的并行度设置的含义。如果更改操作符的并行度,您可能无法从操作符并行度与当前设置不兼容时创建的快照中还原应用程序。有关设置运算符并行度的更多信息,请参阅为运算符显式设置最大并行度

有关实施扩展的更多信息,请参阅实现应用程序扩展

设置每个运算符的并行度

默认情况下,所有运算符均在应用程序级别设置并行度。您可以使用 using 来覆盖单个运算符的并行度。 DataStream API .setParallelism(x)您可以将运算符并行度设置为等于或低于应用程序并行度的任一并行度。

如果可能,将运算符并行度定义为应用程序并行度的函数。这样,运算符的并行度就会随应用程序的并行度而变化。例如,如果您使用自动缩放,则所有运算符都将以相同的比例改变其并行度:

int appParallelism = env.getParallelism(); ... ...ops.setParalleism(appParallelism/2);

在某些情况下,您可能希望将运算符并行度设置为常数。例如,将 Kinesis Stream 源的并行度设置为分片数。在这些情况下,如果需要对源流进行重新分片,则应考虑将运算符并行度作为应用程序配置参数传递,以便在不更改代码的情况下对其进行更改。

日志记录

您可以使用 CloudWatch 日志监控应用程序的性能和错误情况。为应用程序配置日志记录时,请牢记以下几点:

  • 启用应用程序的 CloudWatch 日志记录,以便可以调试任何运行时问题。

  • 不要为应用程序中处理的每条记录创建一个日志条目。这会在处理期间出现严重瓶颈,并且可能会导致数据处理反向压力。

  • 创建 CloudWatch 警报,以便在应用程序无法正常运行时通知您。有关更多信息,请参阅

有关实施日志记录的更多信息,请参阅

编码

您可以使用建议的编程做法以提高应用程序性能和稳定性。在编写应用程序代码时,请牢记以下几点:

  • 不要在应用程序代码(应用程序的 main 方法或用户定义的函数)中使用 system.exit()。如果要从代码中关闭应用程序,请引发一个从 ExceptionRuntimeException 派生的异常,其中包含有关应用程序出现的错误的消息。

    请注意下面有关该服务如何处理此类异常的信息:

    • 如果异常是从应用程序的 main 方法中引发的,在应用程序转变为 RUNNING 状态时,该服务将其封装在 ProgramInvocationException 中,并且任务管理器无法提交任务。

    • 如果异常是从用户定义的函数中引发的,任务管理器使任务失败并重新启动,并将异常详细信息写入到异常日志中。

  • 考虑对应用程序JAR文件及其包含的依赖项进行阴影处理。如果应用程序和 Apache Flink 运行时系统的程序包名称存在潜在的冲突,则建议填充阴影。如果发生冲突,则应用程序日志可能包含 java.util.concurrent.ExecutionException 类型的异常。有关对应用程序JAR文件进行着色的更多信息,请参阅 Apache Maven Shade 插件。

管理凭证。

您不应在生产(或任何其他)应用程序中加入任何长期凭证。长期凭证很可能会被签入版本控制系统,很容易丢失。相反,您可以将角色与 Managed Service for Apache Flink 应用程序关联并为该角色授予权限。然后,正在运行的 Flink 应用程序可以从环境中获取具有相应权限的临时证书。如果未与IAM本机集成的服务(例如,需要用户名和密码才能进行身份验证的数据库)需要身份验证,则应考虑将机密存储在 Secrets Manager 中AWS 。

许多 AWS 本机服务都支持身份验证:

从分片/分区很少的源中读取

从 Apache Kafka 或 Kinesis 数据流读取数据时,流的并行度(即 Kafka 的分区数和 Kinesis 的分片数)与应用程序的并行度之间可能存在不匹配。在简单的设计中,应用程序的并行度不能超出流的并行度:源运算符的每个子任务只能从 1 个或多个分片/分区读取。这意味着,对于一个只有 2 个分片的流和一个并行度为 8 的应用程序,实际上只有两个子任务从流中消耗,6 个子任务处于空闲状态。这会大大限制应用程序的吞吐量,尤其是在反序列化成本高昂且由源端执行的情况下(这是默认设置)。

为了减轻这种影响,可以扩展流。但这不一定可取或可行。或者,您可以重构源,使其不进行任何序列化,而只是在 byte[] 上传递。然后,您可以重新平衡数据,使其在所有任务中均匀分布,然后在那里反序列化数据。通过这种方式,您可以利用所有子任务进行反序列化,这一操作可能昂贵,但可以不再受流中分片/分区数量的约束。

Studio 笔记本刷新间隔

如果更改段落结果刷新间隔,请将其值设置为不低于 1000 毫秒。

Studio 笔记本的最佳性能

我们使用以下语句进行了测试,events-per-second乘以number-of-keys低于 25,000,000 时获得了最佳性能。events-per-second 低于 150,000。

SELECT key, sum(value) FROM key-values GROUP BY key

水印策略和空闲分片如何影响时间窗口

从 Apache Kafka 和 Kinesis 数据流读取事件时,源可以根据流的属性设置事件时间。对于 Kinesis,事件时间等于事件的大致到达时间。但是,在源为事件设置事件时间不足以让 Flink 应用程序使用事件时间。源还必须生成水印,将有关事件时间的信息从源传播到所有其他运算符。Flink 文档很好地概述了该过程的工作原理。

默认情况下,从 Kinesis 读取的事件的时间戳设置为 Kinesis 确定的近似到达时间。要使事件时间在应用程序中发挥作用,另一个先决条件是水印策略。

WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(...));

然后,使用assignTimestampsAndWatermarks方法将水印策略应用于DataStream。有一些有用的内置策略:

  • forMonotonousTimestamps()只会使用事件时间(大概到达时间),并定期将最大值作为水印发出(针对每个特定的子任务)

  • forBoundedOutOfOrderness(Duration.ofSeconds(...))与之前的策略类似,但会使用事件时间 — 生成水印的持续时间。

这行得通,但有几个注意事项需要注意。水印是在子任务级别生成的,并流经运算符图。

来自 Flink 文档

源函数的每个并行子任务通常会独立生成其水印。这些水印定义了该特定并行源上的事件时间。

当水印流经流式处理程序时,它们会推进到达运算符的事件时间。每当运算符推进其事件时间时,它都会为其后继运算符在下游生成一个新的水印。

有些运算符消耗多个输入流;例如,一个并集,或者跟在 keyBy (...) 或分区 (...) 函数之后的运算符。此类运算符的当前事件时间是其输入流事件时间的最小值。当其输入流更新其事件时间时,运算符也会更新。

这意味着,如果源子任务从空闲分片中消耗,则下游运算符不会从该子任务中收到新的水印,因此所有使用时间窗口的下游运算符的处理都会停止。为避免这种情况,客户可以在水印策略中添加withIdleness选项。使用该选项,在计算运算符的事件时间时,运算符会将水印从空闲的上游子任务中排除。因此,空闲子任务不再阻碍下游运算符的事件时间的推进。

但是,如果没有子任务在读取任何事件(即流中没有事件),则带有内置水印策略的空闲选项不会推进事件时间。对于从流中读取有限事件集的测试用例来说,这一点尤其明显。由于读取最后一个事件后,事件时间不会推进,因此最后一个窗口(包含最后一个事件)将永远不会关闭。

Summary

  • 如果分片处于空闲状态,该withIdleness设置不会生成新的水印,它只会将空闲子任务发送的最后一个水印排除在下游运算符的最小水位线计算之外

  • 使用内置水印策略,最后一个打开的窗口将永远不会关闭(除非将发送推进水印的新事件,但这会创建一个随后保持打开状态的新窗口)

  • 即使时间由 Kinesis 流设置,但如果一个分片的消耗速度比其他分片快(例如,在应用程序初始化期间或使用TRIM_HORIZON所有现有分片并行消耗时,忽略其父子关系),仍可能发生延迟到达事件

  • 水印策略的withIdleness设置似乎弃用了空闲分片(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS的 Kinesis 源特定设置

示例

以下应用程序正在从流中读取数据,并根据事件时间创建会话窗口。

Properties consumerConfig = new Properties(); consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("...", new SimpleStringSchema(), consumerConfig); WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(15)); env.addSource(consumer) .assignTimestampsAndWatermarks(s) .map(new MapFunction<String, Long>() { @Override public Long map(String s) throws Exception { return Long.parseLong(s); } }) .keyBy(l -> 0l) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .process(new ProcessWindowFunction<Long, Object, Long, TimeWindow>() { @Override public void process(Long aLong, ProcessWindowFunction<Long, Object, Long, TimeWindow>.Context context, Iterable<Long>iterable, Collector<Object> collector) throws Exception { long count = StreamSupport.stream(iterable.spliterator(), false).count(); long timestamp = context.currentWatermark(); System.out.print("XXXXXXXXXXXXXX Window with " + count + " events"); System.out.println("; Watermark: " + timestamp + ", " + Instant.ofEpochMilli(timestamp)); for (Long l : iterable) { System.out.println(l); } } });

在以下示例中,8 个事件被写入一个 16 个分片流(前 2 个和最后一个事件恰好落在同一个分片中)。

$ aws kinesis put-record --stream-name hp-16 --partition-key 1 --data MQ== $ aws kinesis put-record --stream-name hp-16 --partition-key 2 --data Mg== $ aws kinesis put-record --stream-name hp-16 --partition-key 3 --data Mw== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028721934184977530127978070210" } { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028795678659974022576354623682" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275134360684221592378842022114" } Wed Mar 23 11:19:57 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 4 --data NA== $ aws kinesis put-record --stream-name hp-16 --partition-key 5 --data NQ== $ date { "ShardId": "shardId-000000000010", "SequenceNumber": "49627894338570054070103749783042116732419934393936642210" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275659034489934342334017700066" } Wed Mar 23 11:20:10 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 6 --data Ng== $ date { "ShardId": "shardId-000000000001", "SequenceNumber": "49627894338369347363316974173886988345467035365375213586" } Wed Mar 23 11:20:22 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 7 --data Nw== $ date { "ShardId": "shardId-000000000008", "SequenceNumber": "49627894338525452579706688535878947299195189349725503618" } Wed Mar 23 11:20:34 CET 2022 $ sleep 60 $ aws kinesis put-record --stream-name hp-16 --partition-key 8 --data OA== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811029600823255837371928900796610" } Wed Mar 23 11:21:27 CET 2022

此输入应生成 5 个会话窗口:事件 1、2、3;事件 4、5;事件 6;事件 7;事件 8。但是,该程序仅生成前 4 个窗口。

11:59:21,529 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 5 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 5 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,531 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 4 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 4 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:23,209 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,244 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 event: 6; timestamp: 1648030822428, 2022-03-23T10:20:22.428Z 11:59:23,377 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,405 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,581 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,586 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:24,790 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 4; timestamp: 1648030809282, 2022-03-23T10:20:09.282Z event: 3; timestamp: 1648030797697, 2022-03-23T10:19:57.697Z event: 5; timestamp: 1648030810871, 2022-03-23T10:20:10.871Z 11:59:24,907 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 7; timestamp: 1648030834105, 2022-03-23T10:20:34.105Z event: 1; timestamp: 1648030794441, 2022-03-23T10:19:54.441Z event: 2; timestamp: 1648030796122, 2022-03-23T10:19:56.122Z event: 8; timestamp: 1648030887171, 2022-03-23T10:21:27.171Z XXXXXXXXXXXXXX Window with 3 events; Watermark: 1648030809281, 2022-03-23T10:20:09.281Z 3 1 2 XXXXXXXXXXXXXX Window with 2 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 4 5 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 6 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030887170, 2022-03-23T10:21:27.170Z 7

输出仅显示 4 个窗口(缺少包含事件 8 的最后一个窗口)。这是由于事件时间和水印策略造成的。最后一个窗口无法关闭,因为使用预先构建的水印策略,时间永远不会超过从流中读取的最后一个事件的时间。但是要关闭窗口,时间需要早于最后一个事件发生后 10 秒以上。在本例中,最后一个水印是 2022-03-23T10:21:27.170 Z,但是为了关闭会话窗口,需要在 10.001 秒后添加水印。

如果从水印策略中删除该withIdleness选项,则任何会话窗口都不会关闭,因为窗口运算符的“全局水印”无法向前推进。

请注意,当 Flink 应用程序启动时(或者如果存在数据偏差),某些分片的消耗速度可能比其他分片快。这可能会导致子任务过早发出一些水印(子任务可能会根据一个分片的内容发出水印,而不会从其订阅的其他分片消耗)。缓解的方法是使用不同的水印策略,可以添加安全缓冲区(forBoundedOutOfOrderness(Duration.ofSeconds(30))或明确允许延迟到达的事件(allowedLateness(Time.minutes(5))

为所有运算符设置 a UUID

当 Managed Service for Apache Flink 为带有快照的应用程序启动 Flink 任务时,Flink 任务可能由于某些问题而无法启动。其中一个原因是运算符 ID 不匹配。Flink 期望为 Flink 作业图运算符IDs提供明确、一致的运算符。如果未明确设置,Flink 会自动为运算符生成 ID。这是因为 Flink 使用这些运算符IDs来唯一标识作业图中的运算符,并使用它们将每个运算符的状态存储在保存点中。

当 Flink 找不到作业图的运算符和保存点中IDs定义的运算符IDs之间的 1:1 映射时,就会出现运算符 ID 不匹配问题。当未设置显式一致的运算符,IDs而 Flink 会自动生成可能与每次创建的任务图不一致的运算符时IDs,就会发生这种情况。在维护运行期间,应用程序遇到此问题的可能性很高。为避免这种情况,我们建议客户在 flink 代码中UUID为所有运算符进行设置。有关更多信息,请参阅 “生产就绪” 下的 “UUID为所有操作员设置 a” 主题。

添加 ServiceResourceTransformer 到 Maven 阴影插件

Flink 使用 Java 的服务提供者接口 (SPI) 来加载连接器和格式等组件。使用多个 Flink 依赖关系SPI可能会导致 uber-jar 中的冲突和意外的应用程序行为。建议添加 pom.xml 中定义ServiceResourceTransformer的 Maven shade 插件

<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <id>shade</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- ... --> </transformers> </configuration> </execution> </executions> </plugin>