Apache Beam 애플리케이션의 체크포인트 실패 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려졌습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Apache Beam 애플리케이션의 체크포인트 실패

Beam 애플리케이션이 0ms로 shutdownSourcesAfterIdleMs설정되도록 구성된 경우 작업이 "" 상태이기 때문에 체크포인트가 트리거되지 않을 수 있습니다. FINISHED 이 섹션에서는 그러한 상황의 증상과 해결 방법을 설명합니다.

증상

Apache Flink용 관리 서비스 애플리케이션 CloudWatch 로그로 이동하여 다음 로그 메시지가 기록되었는지 확인하십시오. 다음 로그 메시지는 일부 작업이 완료되어 체크포인트가 트리거되지 않았음을 나타냅니다.

{ "locationInformation": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:888)", "logger": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator", "message": "Failed to trigger checkpoint for job your job ID since some tasks of job your job ID has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.", "threadName": "Checkpoint Timer", "applicationARN": your application ARN, "applicationVersionId": "5", "messageSchemaVersion": "1", "messageType": "INFO" }

일부 작업이 "FINISHED" 상태로 전환되어 더 이상 체크포인트를 지정할 수 없는 Flink 대시보드에서도 확인할 수 있습니다.

“" 상태의 작업은 FINISHED

원인

shutdownSourcesAfterIdleMs 구성된 시간 (밀리초) 동안 유휴 상태였던 소스를 종료하는 Beam 구성 변수입니다. 일단 소스가 종료되면 체크포인트를 더 이상 사용할 수 없습니다. 이로 인해 체크포인트 오류가 발생할 수 있습니다.

작업이 "FINISHED" 상태로 전환되는 원인 중 하나는 0ms로 설정된 shutdownSourcesAfter IdleMs 경우입니다. 즉, 유휴 상태인 작업은 즉시 종료됩니다.

Solution

작업이 즉시 "FINISHED" 상태로 전환되지 않도록 하려면 Long으로 설정하십시오 shutdownSourcesAfterIdleMs . MAX_VALUE. 이것은 두 가지 방법으로 수행될 수 있습니다.

  • 옵션 1: Apache Flink용 관리 서비스 애플리케이션 구성 페이지에서 빔 구성을 설정한 경우 다음과 같이 새 키 값 쌍을 추가하여 shutdpwnSourcesAfteridle Ms를 설정할 수 있습니다.

    shutdownSourcesAfterIdleMs Long으로 설정합니다. MAX_ VALUE
  • 옵션 2: JAR 파일에 빔 구성이 설정되어 있는 경우 다음과 shutdownSourcesAfter IdleMs 같이 설정할 수 있습니다.

    FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class); // Initialize Beam Options object options.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE); // set shutdownSourcesAfterIdleMs to Long.MAX_VALUE options.setRunner(FlinkRunner.class); Pipeline p = Pipeline.create(options); // attach specified options to Beam pipeline