Flink オートスケーラー - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Flink オートスケーラー

Amazon EMR リリース 6.15.0 以降では、Flink オートスケーラーがサポートされています。ジョブオートスケーラー機能は、実行中の Flink ストリーミングジョブからメトリクスを収集し、個々のジョブ頂点を自動的にスケーリングします。これにより、バックプレッシャーが軽減され、設定した使用率目標を達成できます。

詳細については、「Apache Flink Kubernetes Operator ドキュメント」の「オートスケーラー」セクションを参照してください。

  • Flink オートスケーラーは、Amazon EMR 6.15.0 以降でサポートされています。

  • Flink Autoscaler は、ストリーミングジョブでのみサポートされています。

  • アダプティブスケジューラーのみがサポートされます。デフォルトのスケジューラーはサポートされていません。

  • 動的なリソースプロビジョニングを可能にするために、クラスタースケーリングを有効化することを推奨します。メトリクスの評価は 5~10 秒ごとに行われるため、Amazon EMR Managed Scaling が推奨されます。この間隔では、クラスターは必要なクラスターリソースの変化により簡単に適応できます。

EC2 クラスターで Amazon EMR を作成するときに、次のステップを使用して Flink オートスケーラーを有効にします。

  1. Amazon EMR コンソールで、EMR クラスターを新規作成します。

    1. Amazon EMR リリース emr-6.15.0 以降を選択します。[Flink] アプリケーションバンドルを選択し、クラスターに含めるその他のアプリケーションを選択します。

      Application bundle options for Amazon EMRクラスター, with Flink highlighted and selected.
    2. [クラスターのスケーリングとプロビジョニング][EMR マネージドスケーリングを使用] を選択します。

      クラスター scaling options: manual, EMR-managed (selected), or custom automatic scaling.
  2. [ソフトウェア設定] セクションで、次の設定を入力して Flink オートスケーラーを有効にします。テストシナリオでは、検証を容易にするために、決定間隔、メトリクスウィンドウ間隔、および安定化間隔を低い値に設定し、ジョブがすぐにスケーリングを決定できるようにします。

    [ { "Classification": "flink-conf", "Properties": { "job.autoscaler.enabled": "true", "jobmanager.scheduler": "adaptive", "job.autoscaler.stabilization.interval": "60s", "job.autoscaler.metrics.window": "60s", "job.autoscaler.decision.interval": "10s", "job.autoscaler.debug.logs.interval": "60s" } } ]
  3. 必要に応じて他の設定を選択または構成し、Flink オートスケーラー対応クラスターを作成します。

このセクションでは、特定のニーズに基づいて変更できる構成のうちほとんどについて説明します。

注記

timeintervalwindow 設定のような時間ベースの構成では、単位が指定されていない場合のデフォルト単位はミリ秒です。そのため、サフィックスなしの 30 の値は 30 ミリ秒になります。他の時間単位には、秒には s、分には m、時間には h という適切なサフィックスを含めてください。

オートスケーラーは、設定可能な数回の時間間隔ごとにジョブ頂点レベルのメトリクスを取得してスケールアクションに変換し、新しいジョブ頂点の並列処理を推定して、ジョブスケジューラーに推奨します。メトリクスは、ジョブの再起動時間とクラスターの安定化間隔が過ぎた後にのみ収集されます。

設定キー デフォルト値 説明 値の例
job.autoscaler.enabled false Flink クラスターでオートスケーリングを有効にします。 true, false
job.autoscaler.decision.interval 60s オートスケーラーの決定間隔。 30 (デフォルト単位はミリ秒)、5m1h
job.autoscaler.restart.time 3m オペレーターが履歴から確実に再起動を判断できるようになるまでの予想される再起動時間。 30 (デフォルト単位はミリ秒)、5m1h
job.autoscaler.stabilization.interval 300s 新しいスケーリングが実行されない安定化期間。 30 (デフォルト単位はミリ秒)、5m1h
job.autoscaler.debug.logs.interval 300s オートスケーラーのデバッグログの間隔。 30 (デフォルト単位はミリ秒)、5m1h

オートスケーラーはメトリクスを取得し、時間ベースのスライディングウィンドウに沿って集計し、評価してスケーリングを決定します。各ジョブ頂点のスケーリング決定履歴は、新しい並列処理の見積もりに利用されます。これらには、時間ベースの有効期限と履歴サイズ (少なくとも 1) の両方があります。

設定キー デフォルト値 説明 値の例
job.autoscaler.metrics.window 600s Scaling metrics aggregation window size. 30 (デフォルト単位はミリ秒)、5m1h
job.autoscaler.history.max.count 3 頂点ごとに保持できる過去のスケーリング決定の最大数。 1Integer.MAX_VALUE
job.autoscaler.history.max.age 24h 頂点ごとに保持する過去のスケーリング決定の最小数。 30 (デフォルト単位はミリ秒)、5m1h

各ジョブの頂点の並列処理は、ターゲットの使用率に基づいて変更され、最小/最大並列処理の制限によって制限されます。目標使用率を 100% に近い値 (つまり 1) に設定することは推奨されず、使用率境界は中間の負荷変動を処理するバッファーの役割を果たします。

設定キー デフォルト値 説明 値の例
job.autoscaler.target.utilization 0.7 目標とする頂点使用率。 0 - 1
job.autoscaler.target.utilization.boundary 0.4 目標とする頂点利用率境界。現在の処理速度が [target_rate / (target_utilization - boundary) および (target_rate / (target_utilization + boundary)] 以内の場合、スケーリングは実行されません。 0 - 1
job.autoscaler.vertex.min-parallelism 1 オートスケーラーが使用できる最低限の並列処理。 0 - 200
job.autoscaler.vertex.max-parallelism 200 オートスケーラーが使用できる最大並列処理。この制限は、Flink 構成または各オペレーターで直接構成された最大並列処理よりも高い場合、無視されることに注意してください。 0 - 200

ジョブ頂点には、スケール操作期間中に蓄積される保留中のイベント、つまりバックログを処理するための追加リソースが必要です。これは catch-up 期間とも呼ばれます。バックログの処理時間が設定された lag -threshold 値を超えると、ジョブ頂点ターゲットの使用率は最大レベルまで増加します。これにより、バックログの処理中に不要なスケーリング操作が行われるのを防ぐことができます。

設定キー デフォルト値 説明 値の例
job.autoscaler.backlog-processing.lag-threshold 5m ラグのしきい値。ラグの原因となる保留中のメッセージを削除しながら、不要なスケーリングを防止します。 30 (デフォルト単位はミリ秒)、5m1h
job.autoscaler.catch-up.duration 15m スケーリング操作後にバックログを完全に処理するまでの目標時間。0 に設定すると、バックログベースのスケーリングが無効になります。 30 (デフォルト単位はミリ秒)、5m1h

オートスケーラーは、猶予期間内のスケールアップ操作の直後には、スケールダウン操作を実行しません。これにより、一時的な負荷変動によって発生する、スケールアップ、スケールダウンが繰り返し発生する不要なサイクルを防ぐことができます。

スケールダウン操作比率を利用して並列処理を徐々に減らし、一時的な負荷の急上昇に対応するためにリソースを解放することができます。また、大規模なスケールダウンの後に不要なマイナーなスケールアップ操作が行われるのを防ぐのにも役立ちます。

過去のジョブ頂点スケーリング決定履歴に基づいて無効なスケールアップ操作を検出し、さらなる並列処理の変更を防ぐことができます。

設定キー デフォルト値 説明 値の例
job.autoscaler.scale-up.grace-period 1h 頂点をスケールアップした後に、その頂点をスケールダウンできない期間。 30 (デフォルト単位はミリ秒)、5m1h
job.autoscaler.scale-down.max-factor 0.6 最大スケールダウン係数。1 の値は、スケールダウンに制限がなく、0.6 はジョブを元の並列処理の 60% でのみスケールダウンできることを意味します。 0 - 1
job.autoscaler.scale-up.max-factor 100000. 最大スケールアップ率。2.0 の値は、現在の並列処理の 200% でのみジョブをスケールアップできることを意味します。 0 - Integer.MAX_VALUE
job.autoscaler.scaling.effectiveness.detection.enabled false 効果のないスケーリング操作の検出を有効にし、オートスケーラーが今後のスケールアップをブロックできるようにするかどうか。 true, false