Flink 운영자 및 Flink 애플리케이션을 위한 고가용성(HA) 사용 - Amazon EMR

Flink 운영자 및 Flink 애플리케이션을 위한 고가용성(HA) 사용

이 주제에서는 고가용성을 구성하는 방법 및 몇 가지 사용 사례에 대한 작동 방식을 설명합니다. 여기에는 작업 관리자를 사용하는 경우와 Flink 네이티브 kubernetes를 사용하는 경우가 포함됩니다.

Flink 운영자의 고가용성을 활성화하여 장애 발생 시 대기 중인 Flink 운영자로 장애 조치하여 운영자 제어 루프에서 가동 중단을 최소화할 수 있습니다. 고가용성은 기본적으로 활성화되어 있으며 시작 운영자 복제본의 기본 수는 2입니다. 차트 Helm에 대해 values.yaml 파일의 복제본 필드를 구성할 수 있습니다.

다음 필드를 사용자 지정 가능합니다.

  • replicas(선택 사항, 기본값: 2): 이 숫자를 1보다 크게 설정하면 다른 대기 운영자가 생성되고 작업을 더 빠르게 복구할 수 있습니다.

  • highAvailabilityEnabled(선택 사항, 기본값: true): HA 활성화 여부를 제어합니다. 이 파라미터를 true로 지정하면 다중 AZ 배포를 지원하고 올바른 flink-conf.yaml 파라미터를 설정할 수 있습니다.

values.yaml 파일에서 다음 구성을 설정하여 운영자의 HA를 비활성화할 수 있습니다.

... imagePullSecrets: [] replicas: 1 # set this to false if you don't want HA highAvailabilityEnabled: false ...

다중 AZ 배포

여러 가용 영역에서 운영자 포드를 생성합니다. 이는 약한 제약 조건이며, 다른 AZ에 충분한 리소스가 없는 경우 운영자 포드가 동일한 AZ에서 예약됩니다.

리더 복제본 결정

HA가 활성화된 경우 복제본은 리스 기능을 사용하여 어떤 JM이 리더인지 결정하고 리더 선택에 K8s Lease를 사용합니다. 리스 기능을 설명하고 .Spec.Holder Identity 필드를 확인하여 현재 리더를 결정할 수 있습니다.

kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"

Flink-S3 상호 작용

액세스 보안 인증 구성

S3 버킷에 액세스할 수 있는 적절한 IAM 권한으로 IRSA를 구성했는지 확인하세요.

S3 애플리케이션 모드에서 작업 jar 가져오기

Flink 운영자는 S3에서 애플리케이션 jar 가져오기도 지원합니다. FlinkDeployment 사양에서 JarURI의 S3 위치를 제공하면 됩니다.

이 기능을 사용하여 PyFlink 스크립트와 같은 다른 아티팩트를 다운로드할 수도 있습니다. 결과 Python 스크립트는 /opt/flink/usrlib/ 경로 아래에 배치됩니다.

다음 예제에서는 PyFlink 작업에 대해 이 기능을 사용하는 방법을 보여줍니다. jarURI 및 args 필드를 기록합니다.

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: image: <YOUR CUSTOM PYFLINK IMAGE> emrReleaseLabel: "emr-6.12.0-flink-latest" flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" serviceAccount: flink jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"] parallelism: 1 upgradeMode: stateless

Flink S3 커넥터

Flink는 두 개의 S3 커넥터(아래 목록 참조)와 함께 제공됩니다. 다음 섹션에서는 어떤 커넥터를 언제 사용해야 하는지를 설명합니다.

검사: Presto S3 커넥터

  • S3 스키마를 s3p://로 설정

  • s3에 대한 검사에 사용할 권장 커넥터. 자세한 내용은 Apache Flink 설명서의 S3-specific을 참조하세요.

FlinkDeployment 사양 예제:

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/

S3: Hadoop S3 커넥터에 대한 읽기 및 쓰기

  • S3 스키마를 s3:// 또는 s3a://로 설정

  • S3에서 파일을 읽고 쓰는 데 권장되는 커넥터(Flinks 파일 시스템 인터페이스를 구현하는 S3 커넥터만 해당).

  • 기본적으로 flink-conf.yaml 파일에서 fs.s3a.aws.credentials.provider를 설정합니다(com.amazonaws.auth.WebIdentityTokenCredentialsProvider). 기본값(flink-conf)을 완전히 재정의하고 S3와 상호 작용하는 경우 이 제공업체를 사용해야 합니다.

FlinkDeployment 사양 예제

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ] parallelism: 2 upgradeMode: stateless

Flink 배포용 고가용성(HA)을 통해 일시적인 오류가 발생하여 JobManager가 충돌하더라도 작업을 계속 진행할 수 있습니다. 작업은 HA가 활성화된 상태에서 마지막으로 성공한 체크포인트부터 다시 시작됩니다. HA를 활성화하지 않으면 Kubernetes는 JobManager를 다시 시작하지만 작업은 새 작업으로 시작되고 진행 상황은 사라집니다. HA를 구성한 후에 JobManager에서 일시적인 오류가 발생할 경우 참조할 수 있도록 HA 메타데이터를 영구 스토리지에 저장한 다음, 마지막으로 성공한 체크포인트에서 작업을 재개하도록 Kubernetes에 지시할 수 있습니다.

Flink 작업에 대해 HA는 기본적으로 활성화되어 있습니다(복제본 수는 2로 설정되어 있으며, 이 경우 HA 메타데이터가 지속되려면 S3 스토리지 위치를 제공해야 함).

HA 구성

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" executionRoleArn: "<JOB EXECUTION ROLE ARN>" emrReleaseLabel: "emr-6.13.0-flink-latest" jobManager: resource: memory: "2048m" cpu: 1 replicas: 2 highAvailabilityEnabled: true storageDir: "s3://<S3 PERSISTENT STORAGE DIR>" taskManager: resource: memory: "2048m" cpu: 1

다음은 작업 관리자(.spec.JobManager에서 정의됨)에서 위 HA 구성에 대한 설명입니다.

  • highAvailabilityEnabled(선택 사항, 기본값: true): HA를 활성화하지 않고 제공된 HA 구성을 사용하지 않으려면 이 옵션을 false 로 설정합니다. 여전히 '복제본' 필드를 조작하여 HA를 수동으로 구성할 수 있습니다.

  • replicas(선택 사항, 기본값: 2): 이 숫자를 1보다 크게 설정하면 다른 대기 JobManagers가 생성되고 작업을 더 빠르게 복구할 수 있습니다. HA를 비활성화하는 경우 복제본 수를 1로 설정해야 합니다. 그렇지 않으면 검증 오류가 계속 발생합니다(HA가 활성화되지 않은 경우 복제본 1개만 지원됨).

  • storageDir(필수): 기본적으로 복제본 수를 2로 사용하기 때문에 영구 storageDir을 제공해야 합니다. 현재 이 필드에는 스토리지 위치로 S3 경로만 허용합니다.

포드 지역성

또한 HA를 활성화하는 경우 동일한 AZ에 파드를 배치하려고 시도하므로 성능이 개선됩니다(동일한 AZ에 포드를 배치하여 네트워크 지연 시간 감소). 이는 최대한의 원칙이 적용되는 프로세스입니다. 즉, 대부분의 포드가 예약된 AZ에 충분한 리소스가 없는 경우 나머지 포드는 여전히 예약되지만 결국 이 AZ 외부의 노드에 배치될 수 있습니다.

리더 복제본 결정

HA가 활성화된 경우 복제본은 리스 기능을 통해 어떤 JM이 리더인지 결정하고 이 메타데이터를 저장할 데이터 스토어로 K8s Configmap을 사용합니다. 리더를 결정하려면 Configmap의 콘텐츠를 살펴보고 데이터 아래에 있는 org.apache.flink.k8s.leader.restserver 키를 확인하여 IP 주소가 있는 K8s 포드를 찾을 수 있습니다. 다음과 같은 bash 명령을 사용할 수 있습니다.

ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"

Amazon EMR 6.13.0 이상은 Amazon EKS 클러스터에서 고가용성 모드의 Flink 애플리케이션을 실행하기 위한 Flink 네이티브 Kubernetes를 지원합니다.

참고

Flink 작업을 제출할 때 고가용성 메타데이터를 저장할 Amazon S3 버킷을 생성해야 합니다. 이 기능을 사용하고 싶지 않은 경우 비활성화할 수 있습니다. 기본적으로 활성화됩니다.

Flink 고가용성 특성을 활성화하려면 run-application CLI 명령을 실행할 때 다음 Flink 파라미터를 입력하세요. 파라미터는 예제 아래에 정의되어 있습니다.

-Dhigh-availability.type=kubernetes \ -Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \ -Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \ -Dkubernetes.jobmanager.replicas=3 \ -Dkubernetes.cluster-id=example-cluster
  • Dhigh-availability.storageDir – 작업을 위한 고가용성 메타데이터를 저장할 Amazon S3 버킷입니다.

    Dkubernetes.jobmanager.replicas1보다 큰 정수로 생성할 작업 관리자 포드의 수입니다.

    Dkubernetes.cluster-id – Flink 클러스터를 식별하는 고유한 ID입니다.