翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
EKS 上の Amazon EMR によるタスクリカバリとスケーリング操作のための Flink ジョブの再起動時間の最適化
タスクが失敗したり、スケーリング操作が発生したりすると、Flink は最後に完了したチェックポイントからタスクを再実行しようとします。チェックポイントの状態のサイズと並列タスクの数によっては、再起動プロセスの実行に 1 分以上かかる場合があります。再起動中は、ジョブのバックログタスクが蓄積されることがあります。ただし、Flink が実行グラフの回復と再開の速度を最適化してジョブの安定性を高める方法はいくつかあります。
このページでは、Amazon EMR Flink がスポットインスタンスでタスクリカバリまたはスケーリング操作中のジョブの再起動時間を改善できるいくつかの方法について説明します。スポットインスタンスとは、割引価格で利用可能な未使用のコンピューティング能力のことです。これには、不定期に中断するなど独自の動作があるため、EKS 上の Amazon EMR が廃止やジョブの再起動を実行する方法を含めて、Amazon EMR on EKS がこれらをどのように処理するかを理解することが重要です。
トピック
タスクローカルリカバリ
注記
EKS 6.14.0 以降では、Amazon EMR 上の Flink によるタスクローカルリカバリをサポートしています。
Flink チェックポイントでは、各タスクが Flink が Amazon S3 などの分散ストレージに書き込む状態のスナップショットを作成します。復旧時には、タスクは分散ストレージから状態を復元します。分散ストレージはすべてのノードからアクセスできるため、耐障害性があり、再スケーリング中に状態を再分散できます。
ただし、リモート分散ストアには欠点もあります。すべてのタスクはネットワーク経由でリモートロケーションから状態を読み取る必要があります。そのため、タスクの回復やスケーリング操作中に大きな状態の回復時間が長くなる可能性があります。
リカバリ時間が長いというこの問題は、タスクローカルリカバリによって解決できます。タスクはチェックポイントの状態を、ローカルディスクなど、タスクのローカルにあるセカンダリストレージに書き込みます。また、プライマリストレージ(この場合は Amazon S3)に状態が保存されます。復元中、スケジューラーは、タスクが以前に実行されていたのと同じタスクマネージャー上でタスクをスケジュールし、リモート状態ストアから読み取るのではなく、ローカル状態ストアから復元できるようにします。詳細については、「Apache Flink ドキュメント」の「タスクローカルリカバリ
サンプルジョブを使ったベンチマークテストでは、タスクローカルリカバリを有効にすると、リカバリ時間が数分から数秒に短縮されたことがわかりました。
タスクローカルリカバリを有効にするには、flink-conf.yaml
ファイルで次の設定を行います。チェックポイント間隔の値をミリ秒単位で指定します。
state.backend.local-recovery: true state.backend:
hasmap or rocksdb
state.checkpoints.dir: s3://STORAGE-BUCKET-PATH
/checkpoint execution.checkpointing.interval:15000
Amazon EBS ボリュームマウントによるタスクローカルリカバリ
注記
EKS 6.15.0 以降では、Amazon EMR 上の Flink を用いた Amazon EBS でのタスクローカルリカバリをサポートしています。
EKS 上の Amazon EMR で Flink を使用すると、Amazon EBS ボリュームを TaskManager ポッドに自動的にプロビジョニングして、タスクローカルリカバリを行うことができます。デフォルトのオーバーレイマウントには 10 GB のボリュームが付属しており、状態の低いジョブには十分です。状態が大きいジョブでは、EBS ボリュームの自動マウントオプションを有効にできます。TaskManager ポッドはポッド作成時に自動的に作成およびマウントされ、ポッドの削除時に削除されます。
以下のステップを使用して、EKS 上の Amazon EMR 内の Flink 用に自動 EBS ボリュームマウントを有効にします。
-
次のステップで使用する以下の変数の値をエクスポートします。
export AWS_REGION=
aa-example-1
export FLINK_EKS_CLUSTER_NAME=my-cluster
export AWS_ACCOUNT_ID=111122223333
-
クラスター用の
kubeconfig
YAML ファイルを作成または更新します。aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
-
お使いの Amazon EKS クラスター上の Amazon EBS Container Storage Interface (CSI) ドライバー 用 IAM サービスアカウントを作成します。
eksctl create iamserviceaccount \ --name ebs-csi-controller-sa \ --namespace kube-system \ --region $AWS_REGION \ --cluster $FLINK_EKS_CLUSTER_NAME\ --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \ --role-only \ --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \ --approve
-
以下のコマンドで、Amazon EBS CSI ドライバーを作成します。
eksctl create addon \ --name aws-ebs-csi-driver \ --region $AWS_REGION \ --cluster $FLINK_EKS_CLUSTER_NAME \ --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
-
以下のコマンドで、Amazon EBS ストレージクラスを作成します。
cat ≪ EOF ≫ storage-class.yaml apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: ebs-sc provisioner: ebs.csi.aws.com volumeBindingMode: WaitForFirstConsumer EOF
次に、クラスを適用します。
kubectl apply -f storage-class.yaml
-
サービスアカウントを作成するオプションを使用して、Amazon EMR Flink Kubernetes オペレーターを Helm インストールします。これにより、Flink デプロイで使用する
emr-containers-sa-flink
を作成します。helm install flink-kubernetes-operator flink-kubernetes-operator/ \ --set jobServiceAccount.create=true \ --set rbac.jobRole.create=true \ --set rbac.jobRoleBinding.create=true
-
Flink ジョブを送信してタスクローカルリカバリ用 EBS ボリュームの自動プロビジョニングを有効にするには、お使いの
flink-conf.yaml
ファイルに以下の構成を設定します。ジョブの状態サイズに合わせてサイズ制限を調整します。serviceAccount
をemr-containers-sa-flink
に設定します。チェックポイント間隔の値をミリ秒単位で指定します。executionRoleArn
は省略してください。flinkConfiguration: task.local-recovery.ebs.enable: true kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi state.checkpoints.dir: s3://
BUCKET-PATH
/checkpoint state.backend.local-recovery: true state.backend:hasmap or rocksdb
state.backend.incremental: "true" execution.checkpointing.interval:15000
serviceAccount: emr-containers-sa-flink
Amazon EBS CSI ドライバープラグインを削除する準備ができたら、以下のコマンドを使用します。
# Detach Attached Policy aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy # Delete the created Role aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} # Delete the created service account eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION # Delete Addon eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION # Delete the EBS storage class kubectl delete -f storage-class.yaml
汎用ログベースのインクリメンタルチェックポイント
注記
EKS 6.14.0 以降では、Amazon EMR 上の Flink による汎用ログベースのインクリメンタルチェックポイントをサポートしています。
チェックポイントの速度を向上させるため、汎用ログベースのインクリメンタルチェックポイントが Flink 1.16 に追加されました。チェックポイント間隔を短くすると、回復後に再処理する必要のあるイベントが少なくなるため、多くの場合回復作業が削減されます。詳細については、「Apache Flink ブログ」の「汎用ログベースのインクリメンタルチェックポイントによるチェックポイントの速度と安定性の向上
サンプルジョブを使用したベンチマークテストでは、汎用ログベースのインクリメンタルチェックポイントを使用すると、チェックポイントにかかる時間が数分から数秒に短縮されたことがわかりました。
汎用ログベースのインクリメンタルチェックポイントを有効にするには、flink-conf.yaml
ファイルで次の設定を行います。チェックポイント間隔の値をミリ秒単位で指定します。
state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://
bucket-path
/changelog state.backend.local-recovery: true state.backend: rocksdb state.checkpoints.dir: s3://bucket-path
/checkpoint execution.checkpointing.interval:15000
きめ細かなリカバリ
注記
EKS 6.14.0 以降では、Amazon EMR 上の Flink を用いたデフォルトスケジューラーでのきめ細かなリカバリをサポートしています。EKS 6.15.0 以降では、Amazon EMR 上の Flink を用いたアダプティブスケジューラー内のきめ細かなリカバリをサポートしています。
実行中にタスクが失敗した場合、Flink は実行グラフ全体をリセットし、最後に完了したチェックポイントから完全な再実行をトリガーします。これは、失敗したタスクを単に再実行するよりもコストがかかります。きめ細かい復元では、失敗したタスクのパイプラインに接続されたコンポーネントのみを再起動します。次の例では、ジョブグラフには 5 つの頂点 (A
から E
) があります。頂点間のすべての接続はポイントごとの分散でパイプライン化され、ジョブの parallelism.default
は 2
に設定されます。
A → B → C → D → E
この例では、合計 10 個のタスクが実行されています。最初のパイプライン (a1
から e1
) は TaskManager (TM1
) で実行され、2 番目のパイプライン (a2
から e2
) は別の TaskManager (TM2
) 上で実行されます。
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
パイプライン接続されたコンポーネントには、a1 → e1
と a2 →
e2
の 2 つがあります。TM1
または TM2
のどちらか一方で障害が発生しても、影響を受けるのは、その TaskManager が実行されていたパイプライン内の 5 つのタスクのみです。再起動戦略では、影響を受けるパイプラインコンポーネントのみを起動します。
きめ細かいリカバリは、完全に並列した Flink ジョブでのみ機能します。keyBy()
または redistribute()
オペレーションではサポートされていません。詳細については、「Flink 改善提案」Jira プロジェクトの「FLIP-1: タスク障害からのきめ細かな回復
きめ細かい復元を有効にするには、flink-conf.yaml
ファイルで次の設定を行います。
jobmanager.execution.failover-strategy: region restart-strategy:
exponential-delay or fixed-delay
アダプティブスケジューラーに組み込まれた再起動メカニズム
注記
EKS 6.15.0 以降では、Amazon EMR 上の Flink を用いたアダプティブスケジューラー内の複合再起動メカニズムをサポートしています。
アダプティブスケジューラーは、使用可能なスロットに基づいてジョブの並列処理を調整できます。設定したジョブの並列処理を満たすだけの十分なスロットがない場合は、自動的に並列処理を減らします 新しいスロットが使用可能になると、ジョブは設定されたジョブの並列処理に合わせて再びスケールアップされます。適応型スケジューラーは、利用可能なリソースが十分になくなってもジョブのダウンタイムを回避します。これは Flink Autoscaler でサポートされているスケジューラーです。これらの理由から、Amazon EMR Flink を使用するアダプティブスケジューラーをお勧めします。ただし、アダプティブスケジューラーは、新しいリソースが追加されるたびに 1 回再起動するなど、短期間に複数の再起動を行う場合があります。これにより、ジョブのパフォーマンスが低下する可能性があります。
Amazon EMR 6.15.0 以降では、Flink のアダプティブスケジューラーに再起動メカニズムが組み合わされています。このメカニズムは、最初のリソースが追加されると再起動ウィンドウを開き、設定したウィンドウ間隔(デフォルトの 1 分)まで待機します。並列処理を設定してジョブを実行するのに十分なリソースがあるとき、または間隔がタイムアウトになったときに、1 回再起動します。
サンプルジョブを使用したベンチマークテストでは、アダプティブスケジューラーと Flink オートスケーラーを使用すると、この機能はデフォルトの動作よりも 10% 多くのレコードを処理することがわかりました。
複合再起動メカニズムを有効にするには、flink-conf.yaml
ファイルで以下の設定を行います。
jobmanager.adaptive-scheduler.combined-restart.enabled: true jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m