Siga las prácticas recomendadas para el servicio gestionado para las aplicaciones de Apache Flink - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Siga las prácticas recomendadas para el servicio gestionado para las aplicaciones de Apache Flink

Esta sección contiene información y recomendaciones para desarrollar un servicio gestionado estable y eficaz para las aplicaciones de Apache Flink.

Minimice el tamaño del Uber JAR

Java/Scala application must be packaged in an uber (super/fat) JAR e incluya todas las dependencias adicionales necesarias que el tiempo de ejecución aún no haya proporcionado. Sin embargo, el tamaño del Uber JAR afecta a los tiempos de inicio y reinicio de la aplicación y puede provocar JAR que supere el límite de 512 MB.

Para optimizar el tiempo de despliegue, tu uber no JAR debe incluir lo siguiente:

  • Cualquier dependencia proporcionada por el tiempo de ejecución, como se ilustra en el siguiente ejemplo. Deben tener su provided alcance en el POM archivo o compileOnly en la configuración de Gradle.

  • Cualquier dependencia que se use solo para realizar pruebas, por ejemplo, JUnit o Mockito. Deben tener su test alcance en el POM archivo o testImplementation en la configuración de Gradle.

  • Cualquier dependencia que no utilice realmente tu aplicación.

  • Cualquier dato estático o metadato que requiera la aplicación. La aplicación debe cargar los datos estáticos en tiempo de ejecución, por ejemplo, desde un almacén de datos o desde Amazon S3.

  • Consulte este archivo de POM ejemplo para obtener detalles sobre los ajustes de configuración anteriores.

Dependencias proporcionadas

El tiempo de ejecución del servicio gestionado para Apache Flink proporciona una serie de dependencias. Estas dependencias no deben incluirse en el FAT JAR y deben tener un provided alcance en el POM archivo o excluirse explícitamente de la configuración. maven-shade-plugin Todas estas dependencias incluidas en el FAT JAR se ignoran en tiempo de ejecución, pero aumentan el tamaño de la sobrecarga JAR añadida durante la implementación.

Dependencias que proporciona el entorno de ejecución en las versiones 1.18, 1.19 y 1.20:

  • org.apache.flink:flink-core

  • org.apache.flink:flink-java

  • org.apache.flink:flink-streaming-java

  • org.apache.flink:flink-scala_2.12

  • org.apache.flink:flink-table-runtime

  • org.apache.flink:flink-table-planner-loader

  • org.apache.flink:flink-json

  • org.apache.flink:flink-connector-base

  • org.apache.flink:flink-connector-files

  • org.apache.flink:flink-clients

  • org.apache.flink:flink-runtime-web

  • org.apache.flink:flink-metrics-code

  • org.apache.flink:flink-table-api-java

  • org.apache.flink:flink-table-api-bridge-base

  • org.apache.flink:flink-table-api-java-bridge

  • org.apache.logging.log4j:log4j-slf4j-impl

  • org.apache.logging.log4j:log4j-api

  • org.apache.logging.log4j:log4j-core

  • org.apache.logging.log4j:log4j-1.2-api

Ademáscom.amazonaws:aws-kinesisanalytics-runtime:1.2.0, también se proporciona la biblioteca que se utiliza para obtener las propiedades de tiempo de ejecución de las aplicaciones en Managed Service for Apache Flink.

Todas las dependencias proporcionadas por el motor de ejecución deben seguir las siguientes recomendaciones para no incluirlas en el uber: JAR

  • En Maven (pom.xml) y SBT (build.sbt), usa provided scope.

  • En Gradle (build.gradle), usa compileOnly la configuración.

Cualquier dependencia proporcionada que se incluya accidentalmente en el uber se JAR ignorará en tiempo de ejecución debido a que Apache Flink carga primero la clase principal. Para obtener más información, consulte la documentación de Apache parent-first-patternsFlink.

Connectors

La mayoría de los conectores, excepto el FileSystem conector, que no se incluyen en el tiempo de ejecución deben incluirse en el POM archivo con el alcance predeterminado (compile).

Otras recomendaciones

Por regla general, el uber de Apache Flink JAR proporcionado a Managed Service for Apache Flink debe contener el código mínimo necesario para ejecutar la aplicación. No se deben incluir en este archivo jar las dependencias que incluyan las clases fuente, los conjuntos de datos de prueba o el estado de arranque. Si es necesario incorporar recursos estáticos en tiempo de ejecución, separe esta preocupación en un recurso como Amazon S3. Algunos ejemplos de esto son los bootstraps estatales o un modelo de inferencia.

Tómate un tiempo para considerar tu árbol de dependencias profundo y eliminar las dependencias que no estén relacionadas con el tiempo de ejecución.

Si bien Managed Service for Apache Flink admite tamaños de jar de 512 MB, esto debería considerarse una excepción a la regla. Actualmente, Apache Flink admite tarros de aproximadamente 104 MB a través de su configuración predeterminada, y ese debería ser el tamaño objetivo máximo necesario para un tarro.

Tolerancia a fallos: puntos de control y puntos de almacenamiento

Utilice puntos de control y puntos de almacenamiento para implementar la tolerancia a errores en su aplicación Managed Service for Apache Flink. Tenga en cuenta las siguientes consideraciones al desarrollar y mantener la aplicación:

  • Le recomendamos que mantenga los puntos de control activados en su aplicación. Los puntos de control permiten que la aplicación tolere los errores durante el mantenimiento programado, así como en caso de que se produzcan fallos inesperados debidos a problemas de servicio, fallos de dependencia de la aplicación y otros problemas. Para obtener más información sobre mantenimiento, consulte Gestione las tareas de mantenimiento de Managed Service for Apache Flink.

  • Configure ApplicationSnapshotConfiguration: en false durante el desarrollo SnapshotsEnabled de la aplicación o la solución de problemas. Se crea una instantánea cada vez que se detiene una aplicación, lo que puede provocar problemas si la aplicación se encuentra en mal estado o no funciona correctamente. Establecer SnapshotsEnabled en true cuando la aplicación esté en producción y esté estable.

    nota

    Se recomienda que la aplicación cree una instantánea varias veces al día para que se reinicie correctamente con los datos de estado correctos. La frecuencia correcta de las instantáneas depende de la lógica de negocios de la aplicación. Tomar instantáneas frecuentes le permite recuperar datos más recientes, pero aumenta el costo y requiere más recursos del sistema.

    Para obtener información sobre la supervisión del tiempo de inactividad de las aplicaciones, consulte .

Para obtener más información acerca de la implementación de tolerancias ante fallos, consulte Implemente la tolerancia a errores.

Versiones de conector no compatibles

A partir de la versión 1.15 o posterior de Apache Flink, Managed Service for Apache Flink impide automáticamente que las aplicaciones se inicien o se actualicen si utilizan versiones del conector Kinesis no compatibles incluidas en la aplicación. JARs Al actualizar a la versión 1.15 o posterior de Managed Service for Apache Flink, asegúrese de utilizar el conector de Kinesis más reciente. Se trata de cualquier versión igual o posterior a la 1.15.2. El resto de versiones no son compatibles con Managed Service for Apache Flink porque podrían provocar problemas de coherencia o fallos en la función Stop with Savepoint, lo que impediría realizar operaciones de parada o actualización limpias. Para obtener más información sobre la compatibilidad de los conectores en las versiones de Amazon Managed Service para Apache Flink, consulte Conectores Apache Flink.

Rendimiento y paralelismo

Su aplicación puede escalarse para cumplir con cualquier nivel de rendimiento ajustando el paralelismo de la aplicación y evitando los problemas de rendimiento. Tenga en cuenta las siguientes consideraciones al desarrollar y mantener la aplicación:

  • Compruebe que todas las fuentes y receptores de las aplicaciones estén suficientemente aprovisionadas y que no estén siendo restringidas. Si las fuentes y los receptores son otros AWS servicios, supervise esos servicios mediante. CloudWatch

  • En el caso de aplicaciones con un paralelismo muy alto, compruebe si los niveles altos de paralelismo se aplican a todos los operadores de la aplicación. De forma predeterminada, Apache Flink aplica el mismo paralelismo de aplicación a todos los operadores del gráfico de la aplicación. Esto puede provocar problemas de aprovisionamiento en las fuentes o los receptores, o provocar cuellos de botella en el procesamiento de los datos de los operadores. Puede cambiar el paralelismo de cada operador en el código con. setParallelism

  • Comprenda el significado de la configuración de paralelismo para los operadores de su aplicación. Si cambia el paralelismo de un operador, es posible que no pueda restaurar la aplicación a partir de una instantánea creada cuando el operador tenía un paralelismo que no es compatible con la configuración actual. Para obtener más información sobre cómo configurar el paralelismo del operador, consulte Set maximum parallelism for operators explicitly.

Para obtener más información acerca de la implementación del escalado, consulte Implementar el escalado de aplicaciones.

Establecimiento del paralelismo por operador

De forma predeterminada, todos los operadores tienen el paralelismo establecido en el nivel de la aplicación. Puede anular el paralelismo de un solo operador mediante el uso. DataStream API .setParallelism(x) Puede establecer el paralelismo de un operador en cualquier paralelismo igual o inferior al paralelismo de la aplicación.

Si es posible, defina el paralelismo del operador como una función del paralelismo de la aplicación. De esta forma, el paralelismo del operador variará con el paralelismo de la aplicación. Si utiliza el escalado automático, por ejemplo, todos los operadores variarán su paralelismo en la misma proporción:

int appParallelism = env.getParallelism(); ... ...ops.setParalleism(appParallelism/2);

En algunos casos, es posible que desee establecer el paralelismo del operador en una constante. Por ejemplo, establecer el paralelismo de una fuente de Kinesis Stream con la cantidad de particiones. En estos casos, debería considerar la posibilidad de incluir el paralelismo del operador como parámetro de configuración de la aplicación para poder cambiarlo sin cambiar el código, si necesita, por ejemplo, volver a fragmentar el flujo de origen.

Registro

Puede supervisar el rendimiento y las condiciones de error de su aplicación mediante los registros. CloudWatch Tenga en cuenta las siguientes consideraciones al configurar los registros de la aplicación:

  • Habilita el CloudWatch registro de la aplicación para poder depurar cualquier problema de tiempo de ejecución.

  • No cree una entrada de registro para cada registro que se esté procesando en la aplicación. Esto provoca graves cuellos de botella durante el procesamiento y puede provocar retrasos en el procesamiento de los datos.

  • Cree CloudWatch alarmas que le notifiquen cuando la aplicación no se ejecute correctamente. Para obtener más información, consulte

Para obtener más información acerca de la implementación de los registros, consulte .

Codificación

Puede hacer que su aplicación funcione y sea estable mediante las prácticas de programación recomendadas. Tenga en cuenta las siguientes consideraciones al escribir el código de aplicación:

  • No utilice system.exit() en el código de la aplicación, ni en el método main de la aplicación ni en las funciones definidas por el usuario. Si quiere cerrar la aplicación desde el código, lance una excepción derivada de Exception o RuntimeException que contenga un mensaje sobre el problema de la aplicación.

    Tenga en cuenta lo siguiente sobre cómo gestiona el servicio esta excepción:

    • Si la excepción proviene del método main de la solicitud, el servicio la incluirá en una ProgramInvocationException cuando la solicitud pase al estado RUNNING y el administrador del trabajo no podrá enviar el trabajo.

    • Si la excepción proviene de una función definida por el usuario, el administrador de tareas fallará en la tarea y la reiniciará, y los detalles de la excepción se escribirán en el registro de excepciones.

  • Considere la posibilidad de sombrear el JAR archivo de la aplicación y las dependencias incluidas en él. Se recomienda sombrear cuando haya posibles conflictos en los nombres de los paquetes entre la aplicación y el tiempo de ejecución de Apache Flink. Si se produce un conflicto, es posible que los registros de la aplicación contengan alguna excepción de tipo java.util.concurrent.ExecutionException. Para obtener más información sobre cómo sombrear el JAR archivo de la aplicación, consulte el complemento Apache Maven Shade.

Administración de credenciales

No debe incluir credenciales a largo plazo en las aplicaciones de producción (ni en ninguna otra). Es probable que las credenciales a largo plazo se registren en un sistema de control de versiones y se pierdan fácilmente. En su lugar, puede asociar un rol a la aplicación Managed Service para Apache Flink y conceder privilegios a ese rol. La aplicación Flink en ejecución puede recoger credenciales temporales del entorno con los privilegios correspondientes. En caso de que sea necesaria la autenticación para un servicio que no esté integrado de forma nativa conIAM, por ejemplo, una base de datos que requiera un nombre de usuario y una contraseña para la autenticación, debería considerar la posibilidad de almacenar los AWS secretos en Secrets Manager.

Muchos servicios AWS nativos admiten la autenticación:

Lectura de fuentes con pocas particiones

Al leer desde Apache Kafka o un flujo de datos de Kinesis, es posible que no coincida el paralelismo del flujo (es decir, el número de particiones de Kafka y el número de particiones de Kinesis) y el paralelismo de la aplicación. Con un diseño ingenuo, el paralelismo de una aplicación no puede ampliarse más allá del paralelismo de un flujo: cada subtarea de un operador fuente solo puede leer de 1 o más particiones. Esto significa que, en el caso de una transmisión con solo 2 fragmentos y una aplicación con un paralelismo de 8, solo se consumen realmente dos subtareas de la transmisión y 6 subtareas permanecen inactivas. Esto puede limitar considerablemente el rendimiento de la aplicación, especialmente si la deserialización es cara y la lleva a cabo la fuente (que es la opción predeterminada).

Para mitigar este efecto, puede escalar la transmisión. Pero eso no siempre es deseable o posible. Alternativamente, puede reestructurar la fuente para que no realice ninguna serialización y simplemente transmita la byte[]. A continuación, puede volver a equilibrar los datos para distribuirlos uniformemente entre todas las tareas y, a continuación, deserializar los datos allí. De esta forma, puede aprovechar todas las subtareas para la deserialización y esta operación, potencialmente costosa, ya no está limitada por el número de particiones de la transmisión.

Intervalo de actualización del cuaderno de Studio

Si cambia el intervalo de actualización de resultados de párrafos, configúrelo en un valor de al menos 1000 milisegundos.

Rendimiento óptimo del cuaderno de Studio

Probamos con la siguiente afirmación y obtuvimos el mejor rendimiento cuando events-per-second multiplicado por number-of-keys dio menos de 25 000 000. Esto fue para events-per-second por debajo de 150 000.

SELECT key, sum(value) FROM key-values GROUP BY key

Cómo afectan las estrategias de marcas de agua y las particiones inactivas a las ventanas temporales

Al leer los eventos de Apache Kafka y flujo de datos de Kinesis, la fuente puede establecer la hora del evento en función de los atributos de la transmisión. En el caso de Kinesis, la hora del evento es igual a la hora aproximada de llegada de los eventos. Sin embargo, establecer la hora del evento en el origen de los eventos no es suficiente para que una aplicación de Flink utilice la hora del evento. El origen también debe generar marcas de agua que propaguen la información sobre la hora del evento desde el origen a todos los demás operadores. La documentación de Flink ofrece una buena visión general de cómo funciona ese proceso.

De forma predeterminada, la marca de tiempo de un evento leído en Kinesis se establece en la hora de llegada aproximada determinada por Kinesis. Un requisito previo adicional para que la hora del evento funcione en la aplicación es una estrategia de marca de agua.

WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(...));

Luego, la estrategia de marca de agua se aplica a DataStream con el método assignTimestampsAndWatermarks. Hay algunas estrategias integradas útiles:

  • forMonotonousTimestamps() solo utilizará la hora del evento (hora aproximada de llegada) y emitirá periódicamente el valor máximo como marca de agua (para cada subtarea específica)

  • forBoundedOutOfOrderness(Duration.ofSeconds(...)) similar a la estrategia anterior, pero utilizará el tiempo y la duración del evento para generar marcas de agua.

Esto funciona, pero hay un par de advertencias que debe tener en cuenta. Las marcas de agua se generan a nivel de subtarea y fluyen a través del gráfico del operador.

De la documentación de Flink:

Cada subtarea paralela de una función fuente suele generar sus marcas de agua de forma independiente. Estas marcas de agua definen la hora del evento en esa fuente paralela en particular.

A medida que las marcas de agua recorren el programa de streaming, adelantan la hora del evento en los operadores a los que llegan. Cada vez que un operador adelanta la hora de su evento, genera una nueva marca de agua para sus operadores subsiguientes aguas abajo.

Algunos operadores consumen varios flujos de entrada; una unión, por ejemplo, o los operadores que siguen una función keyBy (...) o de partición (...). El tiempo de evento actual de un operador de este tipo es el mínimo de los tiempos de evento de sus flujos de entrada. A medida que sus flujos de entrada actualizan los tiempos de sus eventos, también lo hace el operador.

Esto significa que, si una subtarea de origen consume contenido de una partición inactiva, los operadores intermedios no reciben nuevas marcas de agua de esa subtarea y, por lo tanto, el procesamiento se detiene para todos los operadores intermedios que utilizan ventanas temporales. Para evitarlo, los clientes pueden añadir la opción withIdleness a la estrategia de marcas de agua. Con esta opción, el operador excluye las marcas de agua de las subtareas previas inactivas al calcular la hora del evento del operador. Por lo tanto, las subtareas inactivas ya no bloquean el avance de la hora del evento en los operadores intermedios.

Sin embargo, la opción de inactividad con las estrategias de marca de agua integradas no adelantará la hora del evento si ninguna subtarea lee ningún evento, es decir, si no hay ningún evento en la transmisión. Esto resulta especialmente visible en los casos de prueba en los que se lee un conjunto finito de eventos de la secuencia. Como el tiempo del evento no avanza después de haber leído el último evento, la última ventana (que contiene el último evento) nunca se cerrará.

Resumen

  • el parámetro withIdleness no generará nuevas marcas de agua en caso de que una partición esté inactiva, sino que simplemente excluirá la última marca de agua enviada por las subtareas inactivas del cálculo de la marca de agua mínima en los operadores aguas abajo

  • con las estrategias de marcas de agua integradas, la última ventana abierta nunca se cerrará (a menos que se envíen nuevos eventos que hagan avanzar la marca de agua, pero que creen una nueva ventana que luego permanecerá abierta)

  • incluso si la transmisión de Kinesis establece la hora, pueden producirse eventos que lleguen tarde si una partición se consume más rápido que otras (por ejemplo, durante la inicialización de la aplicación o cuando se usa, TRIM_HORIZON cuando todas las particiones existentes se consumen en paralelo sin tener en cuenta su relación padre/hijo)

  • el parámetro withIdleness de la estrategia de marca de agua parece dejar obsoleta la configuración específica de origen de Kinesis para las particiones inactivas (ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS

Ejemplo

La siguiente aplicación lee una transmisión y crea ventanas de sesión en función de la hora del evento.

Properties consumerConfig = new Properties(); consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("...", new SimpleStringSchema(), consumerConfig); WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(15)); env.addSource(consumer) .assignTimestampsAndWatermarks(s) .map(new MapFunction<String, Long>() { @Override public Long map(String s) throws Exception { return Long.parseLong(s); } }) .keyBy(l -> 0l) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .process(new ProcessWindowFunction<Long, Object, Long, TimeWindow>() { @Override public void process(Long aLong, ProcessWindowFunction<Long, Object, Long, TimeWindow>.Context context, Iterable<Long>iterable, Collector<Object> collector) throws Exception { long count = StreamSupport.stream(iterable.spliterator(), false).count(); long timestamp = context.currentWatermark(); System.out.print("XXXXXXXXXXXXXX Window with " + count + " events"); System.out.println("; Watermark: " + timestamp + ", " + Instant.ofEpochMilli(timestamp)); for (Long l : iterable) { System.out.println(l); } } });

En el siguiente ejemplo, se escriben 8 eventos en una secuencia de 16 particiones (los 2 primeros y el último evento ocurren en la misma partición).

$ aws kinesis put-record --stream-name hp-16 --partition-key 1 --data MQ== $ aws kinesis put-record --stream-name hp-16 --partition-key 2 --data Mg== $ aws kinesis put-record --stream-name hp-16 --partition-key 3 --data Mw== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028721934184977530127978070210" } { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028795678659974022576354623682" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275134360684221592378842022114" } Wed Mar 23 11:19:57 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 4 --data NA== $ aws kinesis put-record --stream-name hp-16 --partition-key 5 --data NQ== $ date { "ShardId": "shardId-000000000010", "SequenceNumber": "49627894338570054070103749783042116732419934393936642210" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275659034489934342334017700066" } Wed Mar 23 11:20:10 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 6 --data Ng== $ date { "ShardId": "shardId-000000000001", "SequenceNumber": "49627894338369347363316974173886988345467035365375213586" } Wed Mar 23 11:20:22 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 7 --data Nw== $ date { "ShardId": "shardId-000000000008", "SequenceNumber": "49627894338525452579706688535878947299195189349725503618" } Wed Mar 23 11:20:34 CET 2022 $ sleep 60 $ aws kinesis put-record --stream-name hp-16 --partition-key 8 --data OA== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811029600823255837371928900796610" } Wed Mar 23 11:21:27 CET 2022

Esta entrada debería dar como resultado 5 ventanas de sesión: evento 1,2,3; evento 4,5; evento 6; evento 7; evento 8. Sin embargo, el programa solo muestra las 4 primeras ventanas.

11:59:21,529 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 5 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 5 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,531 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 4 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 4 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:23,209 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,244 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 event: 6; timestamp: 1648030822428, 2022-03-23T10:20:22.428Z 11:59:23,377 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,405 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,581 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,586 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:24,790 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 4; timestamp: 1648030809282, 2022-03-23T10:20:09.282Z event: 3; timestamp: 1648030797697, 2022-03-23T10:19:57.697Z event: 5; timestamp: 1648030810871, 2022-03-23T10:20:10.871Z 11:59:24,907 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 7; timestamp: 1648030834105, 2022-03-23T10:20:34.105Z event: 1; timestamp: 1648030794441, 2022-03-23T10:19:54.441Z event: 2; timestamp: 1648030796122, 2022-03-23T10:19:56.122Z event: 8; timestamp: 1648030887171, 2022-03-23T10:21:27.171Z XXXXXXXXXXXXXX Window with 3 events; Watermark: 1648030809281, 2022-03-23T10:20:09.281Z 3 1 2 XXXXXXXXXXXXXX Window with 2 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 4 5 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 6 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030887170, 2022-03-23T10:21:27.170Z 7

La salida solo muestra 4 ventanas (falta la última ventana que contiene el evento 8). Esto se debe a la hora del evento y a la estrategia de marca de agua. La última ventana no se puede cerrar porque, con las estrategias de marca de agua predefinidas, el tiempo nunca pasa más allá de la hora del último evento que se lee desde la transmisión. Sin embargo, para que la ventana se cierre, el tiempo debe avanzar más de 10 segundos después del último evento. En este caso, la última marca de agua es 2022-03-23T 10:21:27.170Z, pero para que la ventana de la sesión se cierre, es necesaria una marca de agua 10 segundos y 1 ms después.

Si se elimina la opción withIdleness de la estrategia de marca de agua, no se cerrará nunca ninguna ventana de sesión, ya que la “marca de agua global” del operador de la ventana no podrá avanzar.

Tenga en cuenta que cuando se inicia la aplicación Flink (o si los datos están sesgados), es posible que algunos fragmentos se consuman más rápido que otros. Esto puede provocar que algunas marcas de agua se emitan demasiado pronto desde una subtarea (es posible que la subtarea emita la marca de agua en función del contenido de una partición sin haber consumido contenido de las demás particiones a las que está suscrita). Las formas de mitigarlo son distintas estrategias de marca de agua que añadan un margen de seguridad (forBoundedOutOfOrderness(Duration.ofSeconds(30)) o permitan explícitamente que los eventos (allowedLateness(Time.minutes(5)) lleguen tarde.

Defina a UUID para todos los operadores

Cuando Managed Service para Apache Flink inicia un trabajo de Flink para una aplicación con una instantánea, es posible que el trabajo de Flink no se inicie debido a ciertos problemas. Uno de ellos es la disparidad de ID de los operadores. Flink espera un operador explícito y coherente IDs para los operadores de gráficos de tareas de Flink. Si no se establece de forma explícita, Flink genera automáticamente un ID para los operadores. Esto se debe a que Flink usa estos operadores IDs para identificar de forma exclusiva a los operadores en un gráfico de tareas y los usa para almacenar el estado de cada operador en un punto de almacenamiento.

El problema de discordancia de los identificadores de los operadores se produce cuando Flink no encuentra un mapeo 1:1 entre el operador IDs de un gráfico de tareas y el operador IDs definido en un punto de almacenamiento. Esto ocurre cuando no IDs se establece un operador coherente explícito y Flink genera automáticamente un operador IDs que puede no ser coherente con la creación de cada gráfico de trabajo. La probabilidad de que las aplicaciones presenten este problema es alta durante las operaciones de mantenimiento. Para evitarlo, recomendamos a los clientes configurar todos UUID los operadores en código flink. Para obtener más información, consulte el tema Establecer a UUID para todos los operadores en la sección Preparación para la producción.

ServiceResourceTransformer Añádelo al plugin Maven shade

Flink utiliza las interfaces de proveedor de servicios (SPI) de Java para cargar componentes como conectores y formatos. El uso de varias dependencias de Flink SPI puede provocar conflictos en el archivo uber-jar y comportamientos inesperados de la aplicación. Se recomienda añadir el plugin de sombreado ServiceResourceTransformerde Maven, definido en el archivo pom.xml

<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <id>shade</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- ... --> </transformers> </configuration> </execution> </executions> </plugin>