翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
タスク回復とスケーリング操作のためのジョブ再起動時間の最適化
タスクが失敗したり、スケーリング操作が発生したりすると、Flink は最後に完了したチェックポイントからタスクを再実行しようとします。チェックポイントの状態のサイズと並列タスクの数によっては、再起動プロセスの実行に 1 分以上かかる場合があります。再起動中は、ジョブのバックログタスクが蓄積されることがあります。ただし、Flink が実行グラフの回復と再開の速度を最適化してジョブの安定性を高める方法はいくつかあります。
このページでは、Amazon EMR Flink がタスクリカバリまたはスケーリング操作中のジョブの再起動時間を改善できるいくつかの方法について説明します。
タスクローカルリカバリ
注記
タスクローカルリカバリは、Amazon EMR 6.0.0 以降でサポートされています。
Flink チェックポイントでは、各タスクが Flink が Amazon S3 などの分散ストレージに書き込む状態のスナップショットを作成します。復旧時には、タスクは分散ストレージから状態を復元します。分散ストレージはすべてのノードからアクセスできるため、耐障害性があり、再スケーリング中に状態を再分散できます。
ただし、リモート分散ストアには欠点もあります。すべてのタスクはネットワーク経由でリモートロケーションから状態を読み取る必要があります。そのため、タスクの回復やスケーリング操作中に大きな状態の回復時間が長くなる可能性があります。
リカバリ時間が長いというこの問題は、タスクローカルリカバリによって解決できます。タスクはチェックポイントの状態を、ローカルディスクなど、タスクのローカルにあるセカンダリストレージに書き込みます。また、プライマリストレージ(この場合は Amazon S3)に状態が保存されます。復元中、スケジューラーは、タスクが以前に実行されていたのと同じタスクマネージャー上でタスクをスケジュールし、リモート状態ストアから読み取るのではなく、ローカル状態ストアから復元できるようにします。詳細については、「Apache Flink ドキュメント」の「タスクローカルリカバリ
サンプルジョブを使ったベンチマークテストでは、タスクローカルリカバリを有効にすると、リカバリ時間が数分から数秒に短縮されたことがわかりました。
タスクローカルリカバリを有効にするには、flink-conf.yaml
ファイルで次の設定を行います。チェックポイント間隔の値をミリ秒単位で指定します。
state.backend.local-recovery: true state.backend:
hasmap or rocksdb
state.checkpoints.dir: s3://storage-location-bucket-path
/checkpoint execution.checkpointing.interval:15000
汎用ログベースのインクリメンタルチェックポイント
注記
Amazon EMR 6.10.0 以降では、汎用ログベースのインクリメンタルチェックポイントがサポートされています。
チェックポイントの速度を向上させるため、汎用ログベースのインクリメンタルチェックポイントが Flink 1.16 に追加されました。チェックポイント間隔を短くすると、回復後に再処理する必要のあるイベントが少なくなるため、多くの場合回復作業が削減されます。詳細については、「Apache Flink ブログ」の「汎用ログベースのインクリメンタルチェックポイントによるチェックポイントの速度と安定性の向上
サンプルジョブを使用したベンチマークテストでは、汎用ログベースのインクリメンタルチェックポイントを使用すると、チェックポイントにかかる時間が数分から数秒に短縮されたことがわかりました。
汎用ログベースのインクリメンタルチェックポイントを有効にするには、flink-conf.yaml
ファイルで次の設定を行います。チェックポイント間隔の値をミリ秒単位で指定します。
state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://
bucket-path
/changelog state.backend.local-recovery: true state.backend: rocksdb state.checkpoints.dir: s3://bucket-path
/checkpoint execution.checkpointing.interval:15000
きめ細かなリカバリ
注記
Amazon EMR 6.0.0 以降では、デフォルトスケジューラーのきめ細かな復元がサポートされています。Amazon EMR 6.15.0 以降では、アダプティブスケジューラーでのきめ細かなリカバリのサポートを利用できます。
実行中にタスクが失敗した場合、Flink は実行グラフ全体をリセットし、最後に完了したチェックポイントから完全な再実行をトリガーします。これは、失敗したタスクを単に再実行するよりもコストがかかります。きめ細かい復元では、失敗したタスクのパイプラインに接続されたコンポーネントのみを再起動します。次の例では、ジョブグラフには 5 つの頂点 (A
から E
) があります。頂点間のすべての接続はポイントごとの分散でパイプライン化され、ジョブの parallelism.default
は 2
に設定されます。
A → B → C → D → E
この例では、合計 10 個のタスクが実行されています。最初のパイプライン (a1
から e1
) は TaskManager (TM1
) で実行され、2 番目のパイプライン (a2
から e2
) は別の TaskManager (TM2
) 上で実行されます。
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
パイプライン接続されたコンポーネントには、a1 → e1
と a2 →
e2
の 2 つがあります。TM1
または TM2
のどちらか一方で障害が発生しても、影響を受けるのは、その TaskManager が実行されていたパイプライン内の 5 つのタスクのみです。再起動戦略では、影響を受けるパイプラインコンポーネントのみを起動します。
きめ細かいリカバリは、完全に並列した Flink ジョブでのみ機能します。keyBy()
または redistribute()
オペレーションではサポートされていません。詳細については、「Flink 改善提案」Jira プロジェクトの「FLIP-1: タスク障害からのきめ細かな回復
きめ細かい復元を有効にするには、flink-conf.yaml
ファイルで次の設定を行います。
jobmanager.execution.failover-strategy: region restart-strategy:
exponential-delay or fixed-delay
アダプティブスケジューラーに組み込まれた再起動メカニズム
注記
アダプティブスケジューラーの複合再起動メカニズムは、Amazon EMR 6.15.0 以降でサポートされています。
アダプティブスケジューラーは、使用可能なスロットに基づいてジョブの並列処理を調整できます。設定したジョブの並列処理を満たすだけの十分なスロットがない場合は、自動的に並列処理を減らします 新しいスロットが使用可能になると、ジョブは設定されたジョブの並列処理に合わせて再びスケールアップされます。適応型スケジューラーは、利用可能なリソースが十分になくなってもジョブのダウンタイムを回避します。これは Flink Autoscaler でサポートされているスケジューラーです。これらの理由から、Amazon EMR Flink を使用するアダプティブスケジューラーをお勧めします。ただし、アダプティブスケジューラーは、新しいリソースが追加されるたびに 1 回再起動するなど、短期間に複数の再起動を行う場合があります。これにより、ジョブのパフォーマンスが低下する可能性があります。
Amazon EMR 6.15.0 以降では、Flink のアダプティブスケジューラーに再起動メカニズムが組み合わされています。このメカニズムは、最初のリソースが追加されると再起動ウィンドウを開き、設定したウィンドウ間隔(デフォルトの 1 分)まで待機します。並列処理を設定してジョブを実行するのに十分なリソースがあるとき、または間隔がタイムアウトになったときに、1 回再起動します。
サンプルジョブを使用したベンチマークテストでは、アダプティブスケジューラーと Flink オートスケーラーを使用すると、この機能はデフォルトの動作よりも 10% 多くのレコードを処理することがわかりました。
複合再起動メカニズムを有効にするには、flink-conf.yaml
ファイルで以下の設定を行います。
jobmanager.adaptive-scheduler.combined-restart.enabled: true jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m