Mantenha as melhores práticas de serviço gerenciado para aplicativos Apache Flink - Managed Service for Apache Flink

Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Amazon Kinesis Data Analytics for Apache Flink.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Mantenha as melhores práticas de serviço gerenciado para aplicativos Apache Flink

Esta seção contém informações e recomendações para o desenvolvimento de um serviço gerenciado estável e de alto desempenho para aplicativos Apache Flink.

Minimize o tamanho do uber JAR

Java/Scala application must be packaged in an uber (super/fat) JAR e inclua todas as dependências adicionais necessárias que ainda não foram fornecidas pelo tempo de execução. No entanto, o tamanho do uber JAR afeta os horários de início e reinicialização do aplicativo e pode fazer com JAR que ele exceda o limite de 512 MB.

Para otimizar o tempo de implantação, seu uber não JAR deve incluir o seguinte:

  • Todas as dependências fornecidas pelo tempo de execução, conforme ilustrado no exemplo a seguir. Eles devem ter provided escopo no POM arquivo ou compileOnly na configuração do Gradle.

  • Quaisquer dependências usadas somente para testes, por exemplo, JUnit ou Mockito. Eles devem ter test escopo no POM arquivo ou testImplementation na configuração do Gradle.

  • Quaisquer dependências que não sejam realmente usadas pelo seu aplicativo.

  • Qualquer dado estático ou metadado exigido pelo seu aplicativo. Os dados estáticos devem ser carregados pelo aplicativo em tempo de execução, por exemplo, de um armazenamento de dados ou do Amazon S3.

  • Consulte este arquivo de POM exemplo para obter detalhes sobre as configurações anteriores.

Dependências fornecidas

O serviço gerenciado para o tempo de execução do Apache Flink fornece várias dependências. Essas dependências não devem ser incluídas no fat JAR e devem ter provided escopo no POM arquivo ou ser explicitamente excluídas na maven-shade-plugin configuração. Qualquer uma dessas dependências incluídas no fat JAR é ignorada em tempo de execução, mas aumenta o tamanho da sobrecarga JAR adicional durante a implantação.

Dependências fornecidas pelo tempo de execução, nas versões 1.18, 1.19 e 1.20 do tempo de execução:

  • org.apache.flink:flink-core

  • org.apache.flink:flink-java

  • org.apache.flink:flink-streaming-java

  • org.apache.flink:flink-scala_2.12

  • org.apache.flink:flink-table-runtime

  • org.apache.flink:flink-table-planner-loader

  • org.apache.flink:flink-json

  • org.apache.flink:flink-connector-base

  • org.apache.flink:flink-connector-files

  • org.apache.flink:flink-clients

  • org.apache.flink:flink-runtime-web

  • org.apache.flink:flink-metrics-code

  • org.apache.flink:flink-table-api-java

  • org.apache.flink:flink-table-api-bridge-base

  • org.apache.flink:flink-table-api-java-bridge

  • org.apache.logging.log4j:log4j-slf4j-impl

  • org.apache.logging.log4j:log4j-api

  • org.apache.logging.log4j:log4j-core

  • org.apache.logging.log4j:log4j-1.2-api

Além dissocom.amazonaws:aws-kinesisanalytics-runtime:1.2.0, a biblioteca usada para buscar propriedades de tempo de execução do aplicativo no Managed Service for Apache Flink também é fornecida.

Todas as dependências fornecidas pelo runtime devem usar as seguintes recomendações para não incluí-las no uberJAR:

  • Em Maven (pom.xml) e SBT (build.sbt), use o provided escopo.

  • Em Gradle (build.gradle), use a compileOnly configuração.

Qualquer dependência fornecida acidentalmente incluída no uber JAR será ignorada em tempo de execução devido ao carregamento de classe principal do Apache Flink. Para obter mais informações, consulte a parent-first-patternsdocumentação do Apache Flink.

Conectores

A maioria dos conectores, exceto o FileSystem conector, que não estão incluídos no tempo de execução devem ser incluídos no POM arquivo com o escopo padrão (compile).

Outras recomendações

Como regra geral, seu Apache Flink uber JAR fornecido ao Managed Service for Apache Flink deve conter o código mínimo necessário para executar o aplicativo. A inclusão de dependências que incluem classes de origem, conjuntos de dados de teste ou estado de inicialização não deve ser incluída neste jar. Se os recursos estáticos precisarem ser extraídos em tempo de execução, separe essa preocupação em um recurso como o Amazon S3. Exemplos disso incluem bootstraps de estado ou um modelo de inferência.

Reserve um tempo para considerar sua árvore de dependências profunda e remover dependências que não sejam de tempo de execução.

Embora o Managed Service para Apache Flink suporte tamanhos de jar de 512 MB, isso deve ser visto como uma exceção à regra. Atualmente, o Apache Flink suporta tamanhos de jar de aproximadamente 104 MB por meio de sua configuração padrão, e esse deve ser o tamanho máximo de um jar necessário.

Tolerância a falhas: pontos de verificação e pontos de salvamento

Use pontos de verificação e pontos de salvamento para implementar a tolerância a falhas em seu aplicativo Managed Service for Apache Flink. Lembre-se disso ao desenvolver e manter seu aplicativo:

  • Recomendamos que você mantenha o ponto de verificação ativado para seu aplicativo. O ponto de verificação fornece tolerância a falhas para o seu aplicativo durante a manutenção programada, assim como no caso de falhas inesperadas devido a problemas de serviço, falhas de dependência do aplicativo e outros problemas. Para obter mais informações manutenções programas, consulte Gerencie tarefas de manutenção do Managed Service for Apache Flink.

  • Defina ApplicationSnapshotConfiguration:: SnapshotsEnabled para false durante o desenvolvimento ou solução de problemas do aplicativo. Um snapshot é criado durante cada parada do aplicativo, o que pode causar problemas se o aplicativo não estiver íntegro ou não estiver funcionando. Defina SnapshotsEnabled para true depois que o aplicativo estiver em produção e estável.

    nota

    Recomendamos que seu aplicativo crie um instantâneo várias vezes ao dia para reiniciar adequadamente com os dados de estado corretos. A frequência correta para seus instantâneos depende da lógica de negócios do seu aplicativo. Tirar snapshots com frequência permite recuperar dados mais recentes, mas aumenta os custos e exige mais recursos do sistema.

    Para obter informações sobre como monitorar o tempo de inatividade do aplicativo, consulte .

Para obter mais informações sobre a implementação da tolerância a falhas, consulte Implemente a tolerância a falhas.

Versões de conectores incompatíveis

A partir da versão 1.15 ou posterior do Apache Flink, o Managed Service for Apache Flink impede automaticamente que os aplicativos sejam iniciados ou atualizados se estiverem usando versões incompatíveis do conector Kinesis agrupadas no aplicativo. JARs Ao atualizar para o Managed Service for Apache Flink versão 1.15 ou posterior, verifique se você está usando o conector Kinesis mais recente. Isso quer dizer, qualquer versão igual ou mais recente do que a versão 1.15.2. Todas as outras versões não são suportadas pelo Managed Service for Apache Flink porque elas podem causar problemas de consistência ou falhas com o recurso Stop with Savepoint, impedindo operações limpas de parada/atualização. Para saber mais sobre a compatibilidade de conectores nas versões do Amazon Managed Service para Apache Flink, consulte Conectores Apache Flink.

Desempenho e paralelismo

Seu aplicativo pode escalar para atender a qualquer nível de throughput ajustando o paralelismo do aplicativo e evitando problemas de desempenho. Lembre-se disso ao desenvolver e manter seu aplicativo:

  • Verifique se todas as fontes e coletores do seu aplicativo estão suficientemente provisionados e não estão sendo limitados. Se as fontes e os coletores forem outros AWS serviços, monitore esses serviços usando CloudWatch.

  • Para aplicativos com paralelismo muito alto, verifique se os altos níveis de paralelismo são aplicados a todos os operadores no aplicativo. Por padrão, o Apache Flink aplica o mesmo paralelismo de aplicativos para todos os operadores no gráfico do aplicativo. Isso pode causar problemas de provisionamento em fontes ou coletores ou gargalos no processamento de dados do operador. Você pode alterar o paralelismo de cada operador no código com. setParallelism

  • Entenda o significado das definições do paralelismo para os operadores em seu aplicativo. Se você alterar o paralelismo de um operador, talvez você não consiga restaurar o aplicativo a partir de um snapshot criado quando o operador tinha um paralelismo incompatível com as configurações atuais. Para obter mais informações sobre como definir o paralelismo do operador, consulte Definir explicitamente o paralelismo máximo para os operadores.

Para obter mais informações sobre a implementação da escalabilidade, consulte Implemente o escalonamento de aplicativos.

Definindo o paralelismo por operador

Por padrão, todos os operadores têm o paralelismo definido no nível do aplicativo. Você pode substituir o paralelismo de um único operador usando o using. DataStream API .setParallelism(x) Você pode definir um paralelismo do operador para qualquer paralelismo igual ou inferior ao paralelismo do aplicativo.

Se possível, defina o paralelismo do operador em função do paralelismo do aplicativo. Dessa forma, o paralelismo do operador variará com o paralelismo do aplicativo. Se você estiver usando o ajuste de escala automático, por exemplo, todos os operadores irão variar seu paralelismo na mesma proporção:

int appParallelism = env.getParallelism(); ... ...ops.setParalleism(appParallelism/2);

Em alguns casos, você pode querer definir o paralelismo do operador como uma constante. Por exemplo, definir o paralelismo de uma fonte do Kinesis Stream para o número de fragmentos. Nesses casos, considere passar o paralelismo do operador como parâmetro de configuração do aplicativo, a fim de alterá-lo sem alterar o código, se você precisar, por exemplo, refragmentar o fluxo de origem.

Registro em log

Você pode monitorar o desempenho e as condições de erro do seu aplicativo usando o CloudWatch Logs. Lembre-se disso ao configurar o log para o aplicativo:

  • Ative o CloudWatch registro do aplicativo para que quaisquer problemas de tempo de execução possam ser depurados.

  • Não crie uma entrada de log para cada registro que está sendo processado no aplicativo. Isso causa gargalos graves durante o processamento e pode levar à contrapressão no processamento de dados.

  • Crie CloudWatch alarmes para notificá-lo quando seu aplicativo não estiver funcionando corretamente. Para ter mais informações, consulte

Para obter mais informações sobre o registro em log, consulte .

Codificação

Você pode tornar seu aplicativo eficiente e estável usando as práticas de programação recomendadas. Lembre-se disso ao escrever o código do aplicativo:

  • Não use system.exit() no código do aplicativo, no main método do aplicativo ou em funções definidas pelo usuário. Se você quiser desligar seu aplicativo a partir do código, lance uma exceção derivada de Exception ou RuntimeException contendo uma mensagem sobre o que deu errado com o aplicativo.

    Observe a seguir como o serviço lida com essa exceção:

    • Se a exceção for gerada pelo método main do seu aplicativo, o serviço a encapsulará em um ProgramInvocationException quando o aplicativo fizer a transição para o status RUNNING, e o Job Manager não enviará a tarefa.

    • Se a exceção for lançada a partir de uma função definida pelo usuário, o Job Manager falhará a tarefa e a reiniciará, e os detalhes da exceção serão gravados no log de exceções.

  • Considere sombrear o JAR arquivo do aplicativo e as dependências incluídas. O sombreamento é recomendado quando há possíveis conflitos nos nomes dos pacotes entre seu aplicativo e o runtime do Apache Flink. Se ocorrer um conflito, os registros do seu aplicativo podem conter uma exceção do tipo java.util.concurrent.ExecutionException. Para obter mais informações sobre como sombrear o JAR arquivo do aplicativo, consulte Apache Maven Shade Plugin.

Gerenciamento de credenciais

Você não deve incorporar nenhuma credencial de longo prazo em aplicativos de produção (ou em qualquer outro). As credenciais de longo prazo são, provavelmente, verificadas em um sistema de controle de versão e podem ser facilmente perdidas. Em vez disso, você pode associar um perfil ao aplicativo Managed Service for Apache Flink e conceder privilégios a esse perfil. O aplicativo Flink em execução pode, em seguida, obter credenciais temporárias com os respectivos privilégios do ambiente. Caso a autenticação seja necessária para um serviço que não está nativamente integrado comIAM, por exemplo, um banco de dados que requer um nome de usuário e senha para autenticação, considere armazenar AWS segredos no Secrets Manager.

Muitos serviços AWS nativos oferecem suporte à autenticação:

Lendo a partir de fontes com poucos fragmentos/partições

Ao se ler a partir do Apache Kafka ou de um Kinesis Data Stream, pode haver uma incompatibilidade entre o paralelismo do fluxo (ou seja, o número de partições do Kafka e o número de fragmentos do Kinesis) e o paralelismo do aplicativo. Com um design simples, o paralelismo de um aplicativo não pode escalar além do paralelismo de um fluxo: cada subtarefa de um operador de origem só pode ler a partir de um ou mais fragmentos/partições. Isso significa que, para um fluxo com apenas dois fragmentos e um aplicativo com um paralelismo de oito, apenas duas subtarefas estão realmente consumindo o fluxo e seis subtarefas permanecem inativas. Isso pode limitar substancialmente o throughput do aplicativo, especialmente se a desserialização for cara e realizada pela fonte (que é o padrão).

Para mitigar esse efeito, você pode escalar o fluxo. Mas, isso nem sempre é desejável ou possível. Como alternativa, você pode reestruturar a fonte para que ela não faça nenhuma serialização e só transmita o byte[]. Em seguida, você pode reequilibrar os dados para distribuí-los uniformemente entre todas as tarefas e, em seguida, desserializar os dados lá. Dessa forma, você pode aproveitar todas as subtarefas para a desserialização e essa operação potencialmente cara não estará mais limitada ao número de fragmentos/partições do fluxo.

Intervalo de atualização de um notebook com Studio

Se você alterar o parágrafo de resultado do intervalo de atualização, defina-o para um valor de pelo menos 1.000 milissegundos.

Desempenho ideal de um notebook com Studio

Testamos com a seguinte instrução e obtivemos o melhor desempenho quando events-per-second multiplicado por number-of-keys menos de 25.000.000. Isso foi para events-per-second abaixo de 150.000.

SELECT key, sum(value) FROM key-values GROUP BY key

Como as estratégias de marca d’água e os fragmentos inativos afetam as janelas de tempo

Ao ler os eventos do Apache Kafka e do Kinesis Data Streams, a fonte pode definir o horário do evento com base nos atributos do fluxo. No caso do Kinesis, o horário do evento é igual ao horário aproximado da chegada dos eventos. Mas, definir o horário do evento na origem dos eventos não é suficiente para que um aplicativo Flink use o horário do evento. A fonte também deve gerar marcas d’água que propaguem informações sobre o horário do evento da fonte para todos os outros operadores. A documentação do Flink apresenta uma visão geral abrangente sobre como esse processo funciona.

Por padrão, o registro de data e horário de um evento lido do Kinesis é definido como o horário aproximado de chegada determinado pelo Kinesis. Um pré-requisito adicional para que o horário do evento funcione no aplicativo é uma estratégia de marca d’água.

WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(...));

A estratégia de marcas d’água é então aplicada a um DataStream com o método assignTimestampsAndWatermarks. Existem algumas estratégias integradas úteis:

  • forMonotonousTimestamps() usará apenas o horário do evento (hora aproximada de chegada) e emitirá periodicamente o valor máximo como marca d’água (para cada subtarefa específica)

  • forBoundedOutOfOrderness(Duration.ofSeconds(...)) semelhante à estratégia anterior, mas usará o tempo – duração do evento para gerar a marca d’água.

Isso funciona, mas há algumas ressalvas que você deve observar. As marcas d’água são geradas em um nível de subtarefa e fluem através do gráfico do operador.

Da documentação do Flink:

Geralmente, cada subtarefa paralela de uma função de origem gera suas marcas d’água de forma independente. Essas marcas d’água definem o horário do evento nessa fonte paralela específica.

Conforme as marcas d’água fluem pelo programa de streaming, elas avançam o horário do evento nos operadores onde chegam. Sempre que um operador avança o horário do evento, ele gera uma nova marca d’água posterior para seus operadores sucessores.

Alguns operadores consomem vários fluxos de entrada; uma união, por exemplo, ou operadores seguindo uma função keyBy (...) ou partição (...). O horário atual do evento desse operador é o tempo mínimo dos eventos de seus fluxos de entrada. À medida que seus fluxos de entrada atualizam os horários dos eventos, o mesmo acontece com o operador.

Isso significa que, se uma subtarefa de origem estiver consumindo um fragmento inativo, os operadores posteriores não recebem novas marcas d’água dessa subtarefa e, portanto, o processamento é interrompido para todos os operadores posteriores que usam janelas de tempo. Para evitar isso, os clientes podem adicionar a opção withIdleness à estratégia de marcas d’água. Com essa opção, um operador exclui as marcas d'água das subtarefas inativas anteriores ao calcular o horário do evento do operador. Portanto, a subtarefa inativa não bloqueia mais o avanço do horário do evento nos operadores posteriores.

No entanto, a opção de inatividade com as estratégias integradas de marca d’água não avançará o horário do evento se nenhuma subtarefa estiver lendo nenhum evento, ou seja, se não houver eventos no fluxo. Isso se torna particularmente visível em casos de teste em que um conjunto finito de eventos é lido a partir do fluxo. Como a hora do evento não avança após a leitura do último evento, a última janela (que contém o último evento) nunca será fechada.

Resumo

  • a configuração withIdleness não gerará novas marcas d’água caso algum fragmento fique inativo, ela apenas excluirá a última marca d’água enviada por subtarefas inativas do cálculo mínimo de marcas d’água nos operadores posteriores

  • com as estratégias de marca d’água incorporadas, a última janela aberta nunca será fechada (a menos que novos eventos que avancem a marca d’água sejam enviados, mas isso cria uma nova janela que permanece aberta)

  • mesmo quando a hora é definida pelo fluxo do Kinesis, eventos de chegada tardia ainda podem ocorrer se um fragmento for consumido mais rápido do que outros (por exemplo, durante a inicialização do aplicativo ou ao usar TRIM_HORIZON onde todos os fragmentos existentes são consumidos paralelamente, ignorando a relação pai/filho)

  • as configurações withIdleness da estratégia de marcas d’água parecem descontinuar as configurações específicas da fonte do Kinesis para fragmentos inativos (ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS

Exemplo

O aplicativo a seguir está lendo a partir de um fluxo e criando janelas de sessão com base no horário do evento.

Properties consumerConfig = new Properties(); consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("...", new SimpleStringSchema(), consumerConfig); WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(15)); env.addSource(consumer) .assignTimestampsAndWatermarks(s) .map(new MapFunction<String, Long>() { @Override public Long map(String s) throws Exception { return Long.parseLong(s); } }) .keyBy(l -> 0l) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .process(new ProcessWindowFunction<Long, Object, Long, TimeWindow>() { @Override public void process(Long aLong, ProcessWindowFunction<Long, Object, Long, TimeWindow>.Context context, Iterable<Long>iterable, Collector<Object> collector) throws Exception { long count = StreamSupport.stream(iterable.spliterator(), false).count(); long timestamp = context.currentWatermark(); System.out.print("XXXXXXXXXXXXXX Window with " + count + " events"); System.out.println("; Watermark: " + timestamp + ", " + Instant.ofEpochMilli(timestamp)); for (Long l : iterable) { System.out.println(l); } } });

No exemplo a seguir, oito eventos são gravados em um fluxo de 16 fragmentos (os dois primeiros e o último evento caem no mesmo fragmento).

$ aws kinesis put-record --stream-name hp-16 --partition-key 1 --data MQ== $ aws kinesis put-record --stream-name hp-16 --partition-key 2 --data Mg== $ aws kinesis put-record --stream-name hp-16 --partition-key 3 --data Mw== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028721934184977530127978070210" } { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028795678659974022576354623682" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275134360684221592378842022114" } Wed Mar 23 11:19:57 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 4 --data NA== $ aws kinesis put-record --stream-name hp-16 --partition-key 5 --data NQ== $ date { "ShardId": "shardId-000000000010", "SequenceNumber": "49627894338570054070103749783042116732419934393936642210" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275659034489934342334017700066" } Wed Mar 23 11:20:10 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 6 --data Ng== $ date { "ShardId": "shardId-000000000001", "SequenceNumber": "49627894338369347363316974173886988345467035365375213586" } Wed Mar 23 11:20:22 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 7 --data Nw== $ date { "ShardId": "shardId-000000000008", "SequenceNumber": "49627894338525452579706688535878947299195189349725503618" } Wed Mar 23 11:20:34 CET 2022 $ sleep 60 $ aws kinesis put-record --stream-name hp-16 --partition-key 8 --data OA== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811029600823255837371928900796610" } Wed Mar 23 11:21:27 CET 2022

Essa entrada deve resultar em cinco janelas de sessão: evento 1, 2, 3; evento 4,5; evento 6; evento 7; evento 8. No entanto, o programa só produz as primeiras quatro janelas.

11:59:21,529 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 5 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 5 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,531 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 4 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 4 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:23,209 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,244 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 event: 6; timestamp: 1648030822428, 2022-03-23T10:20:22.428Z 11:59:23,377 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,405 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,581 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,586 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:24,790 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 4; timestamp: 1648030809282, 2022-03-23T10:20:09.282Z event: 3; timestamp: 1648030797697, 2022-03-23T10:19:57.697Z event: 5; timestamp: 1648030810871, 2022-03-23T10:20:10.871Z 11:59:24,907 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 7; timestamp: 1648030834105, 2022-03-23T10:20:34.105Z event: 1; timestamp: 1648030794441, 2022-03-23T10:19:54.441Z event: 2; timestamp: 1648030796122, 2022-03-23T10:19:56.122Z event: 8; timestamp: 1648030887171, 2022-03-23T10:21:27.171Z XXXXXXXXXXXXXX Window with 3 events; Watermark: 1648030809281, 2022-03-23T10:20:09.281Z 3 1 2 XXXXXXXXXXXXXX Window with 2 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 4 5 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 6 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030887170, 2022-03-23T10:21:27.170Z 7

A saída mostra apenas quatro janelas (faltando a última janela contendo o evento 8). Isso se deve ao horário do evento e à estratégia de marcas d’água. A última janela não pode ser fechada porque, com as estratégias de marcas d’água pré-criadas, o tempo nunca avança além do horário do último evento que foi lido a partir do fluxo. Mas, para que a janela se feche, o tempo precisa avançar mais de dez segundos após o último evento. Nesse caso, a última marca d’água é 2022-03-23T10:21:27.170Z, mas, para que a janela da sessão se feche, é necessária uma marca d’água de 10 segundos e 1 ms depois.

Se a opção withIdleness for removida da estratégia de marcas d’água, nenhuma janela da sessão será fechada, pois a “marca d’água global” do operador da janela não pode avançar.

Observe que, quando o aplicativo Flink é iniciado (ou se houver distorção de dados), alguns fragmentos podem ser consumidos mais rapidamente do que outros. Isso pode fazer com que algumas marcas d’água de uma subtarefa sejam emitidas muito cedo (a subtarefa pode emitir a marca d’água com base no conteúdo de um fragmento sem ter sido consumida dos outros fragmentos nos quais está inscrita). As formas de mitigar isso é uma estratégia diferente de marcas d’água que adicione um buffer de segurança (forBoundedOutOfOrderness(Duration.ofSeconds(30)) ou permita explicitamente a chegada tardia de eventos (allowedLateness(Time.minutes(5)).

Defina um UUID para todos os operadores

Quando o Managed Service for Apache Flink inicia um trabalho do Flink para um aplicativo com um snapshot, o trabalho do Flink pode falhar ao iniciar devido a certos problemas. Um deles é a incompatibilidade de IDs do operador. O Flink espera um operador explícito e consistente IDs para os operadores do gráfico de tarefas do Flink. Se não for definido explicitamente, o Flink gera automaticamente um ID para os operadores. Isso ocorre porque o Flink usa esses operadores IDs para identificar exclusivamente os operadores em um gráfico de tarefas e os usa para armazenar o estado de cada operador em um ponto de salvamento.

O problema de incompatibilidade de ID do operador ocorre quando o Flink não encontra um mapeamento 1:1 entre o operador IDs de um gráfico de tarefas e o operador IDs definido em um ponto de salvamento. Isso acontece quando operadores consistentes explícitos não IDs são definidos e o Flink gera automaticamente um operador IDs que pode não ser consistente com cada criação de gráfico de tarefas. A probabilidade de os aplicativos enfrentarem esse problema é alta durante as operações de manutenção. Para evitar isso, recomendamos que os clientes definam UUID para todos os operadores no código flink. Para obter mais informações, consulte o tópico Definir a UUID para todos os operadores em Preparação para produção.

Adicionar ServiceResourceTransformer ao plugin Maven Shade

O Flink usa as interfaces de provedor de serviços (SPI) do Java para carregar componentes como conectores e formatos. O uso de várias dependências do Flink SPI pode causar conflitos no uber-jar e comportamentos inesperados do aplicativo. É recomendável adicionar o plugin ServiceResourceTransformerde sombra Maven, definido no pom.xml

<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <id>shade</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- ... --> </transformers> </configuration> </execution> </executions> </plugin>