

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# ランタイムのトラブルシューティング
<a name="troubleshooting-runtime"></a>

このセクションには、Apache Flink アプリケーション用 Managed Serviceの実行時問題の診断と修正に関する情報が含まれています。

**Topics**
+ [トラブルシューティングツール](#troubleshooting-tools)
+ [アプリケーションに関する問題](troubleshooting-symptoms.md)
+ [アプリケーションが再起動している](troubleshooting-rt-restarts.md)
+ [スループットが遅すぎる](troubleshooting-rt-throughput.md)
+ [際限のない状態の成長](troubleshooting-rt-stateleaks.md)
+ [I/O バウンドオペレーター](troubleshooting-io-bound-operators.md)
+ [Kinesis データストリームからのアップストリームまたはソーススロットリング](troubleshooting-source-throttling.md)
+ [チェックポイント](troubleshooting-checkpoints.md)
+ [チェックポイントがタイムアウトしています。](troubleshooting-chk-timeout.md)
+ [Apache Beam アプリケーションのチェックポイント障害](troubleshooting-chk-failure-beam.md)
+ [バックプレッシャー](troubleshooting-backpressure.md)
+ [データスキュー機能](troubleshooting-data-skew.md)
+ [ステートスキュー機能](troubleshooting-state-skew.md)
+ [異なるリージョンのリソースと統合する](troubleshooting-resources-in-different-regions.md)

## トラブルシューティングツール
<a name="troubleshooting-tools"></a>

アプリケーションの問題を検出するための主要なツールは CloudWatch アラームです。CloudWatch アラームを使用すると、アプリケーションのエラーまたはボトルネック状態を示す CloudWatch メトリクスのしきい値を設定できます。推奨されるCloudWatchアラームについては、「[Amazon Managed Service for Apache Flink で CloudWatch アラームを使用する](monitoring-metrics-alarms.md)」を参照してください。

# アプリケーションに関する問題
<a name="troubleshooting-symptoms"></a>

このセクションには、Apache Flink アプリケーション用 Managed Serviceで発生する可能性のあるエラー状態に対する解決策が含まれています。

**Topics**
+ [アプリケーションが一時的な状態で停止している](#troubleshooting-rt-stuck)
+ [スナップショットの作成に失敗する](#troubleshooting-rt-snapshots)
+ [VPC 内のリソースにアクセスできない](#troubleshooting-rt-vpc)
+ [Amazon S3 バケットの書き込み時にデータが失われる](#troubleshooting-rt-s3)
+ [アプリケーションは RUNNING ステータスにありますが、データを処理していない](#troubleshooting-rt-processing)
+ [スナップショット、アプリケーション更新、またはアプリケーション停止エラー: InvalidApplicationConfigurationException](#troubleshooting-rt-appconfigexception)
+ [java.nio.File.noSuchFileException: /usr/local/openjdk-8/lib/security/cacerts](#troubleshooting-rt-fnf)

## アプリケーションが一時的な状態で停止している
<a name="troubleshooting-rt-stuck"></a>

アプリケーションが一時的なステータス (`STARTING`、`UPDATING`、`STOPPING` または `AUTOSCALING`) のままの場合は、「[StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)」アクションを使用して `Force` パラメータを`true`に設定してアプリケーションを停止できます。この `DELETING` ステータスのアプリケーションを強制停止することはできません。または、アプリケーションが `UPDATING` または `AUTOSCALING` ステータスの場合は、実行中の以前のバージョンにロールバックできます。アプリケーションをロールバックすると、前回成功したスナップショットの状態データがロードされます。アプリケーションにスナップショットがない場合、Apache Flink 用 Managed Serviceはロールバックリクエストを拒否します。アプリケーションのロールバックについて詳しくは、「[rollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html)」アクションを参照してください。

**注記**  
アプリケーションを強制停止すると、データが失われたり重複したりする可能性があります。アプリケーションの再起動時にデータが失われたり、データが重複して処理されたりするのを防ぐため、アプリケーションのスナップショットを頻繁に撮ることをお勧めします。

アプリケーションが停止する原因には次のようなものがあります。
+ 「**アプリケーションの状態が大きすぎる:**」アプリケーションの状態が大きすぎたり永続的すぎたりすると、チェックポイントまたはスナップショット操作中にアプリケーションが停止する可能性があります。アプリケーションの `lastCheckpointDuration` と `lastCheckpointSize` メトリックをチェックして、値が着実に増加しているか、または異常に高い値がないかを確認してください。
+ 「**アプリケーションコードが大きすぎる:**」アプリケーションの JAR ファイルが 512 MB 未満であることを確認してください。512 MB を超える JAR ファイルはサポートされていません。
+ 「**アプリケーションスナップショットの作成に失敗する:**」Managed Service for Apache Flinkが [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) または [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StopApplication.html) リクエスト中にアプリケーションのスナップショットを取得します。その後、サービスはこのスナップショット状態を使用し、更新されたアプリケーション設定を使用してアプリケーションを復元し、「1 回のみ」の処理セマンティクスを実現します。自動スナップショット作成が失敗した場合は、 [スナップショットの作成に失敗する](#troubleshooting-rt-snapshots) 以下を参照してください。
+ 「**スナップショットからの復元が失敗する:**」アプリケーションの更新でオペレータを削除または変更した後に、スナップショットから復元しようとした場合、スナップショットに欠落しているオペレータの状態データが含まれていると、復元はデフォルトで失敗します。さらに、アプリケーションは `STOPPED` または `UPDATING` ステータスのままになります。この動作を変更して復元を正常に行うには、アプリケーションの「[FlinkRunConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_FlinkRunConfiguration.html)」の「AllowNonRestoredState」パラメータを `true` に変更します。これにより、新しいプログラムにマッピングできない状態データを再開操作がスキップできます。
+ 「**アプリケーションの初期化に時間がかかる:**」Apache Flink 用 Managed Service fは、Flink ジョブの開始を待つ間、5 分間の内部タイムアウト (ソフト設定) を使用します。このタイムアウト内にジョブが開始されない場合、次のような CloudWatch ログが表示されます。

  ```
  Flink job did not start within a total timeout of 5 minutes for application: %s under account: %s
  ```

   上記のエラーが発生した場合、Flink ジョブの `main` メソッドで定義されている操作に 5 分以上かかっているため、Apache Flink のマネージドサービス側で Flink ジョブの作成がタイムアウトになります。Flink「**JobManager**」のログとアプリケーションコードをチェックして、 `main` メソッドにこのような遅延が予想されるかどうかを確認することをお勧めします。そうでない場合は、5 分以内に完了するように問題に対処する必要があります。

「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplications.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplications.html)」または「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplication.html)」のアクションを使用して申請状況を確認できます。

## スナップショットの作成に失敗する
<a name="troubleshooting-rt-snapshots"></a>

Apache Flink 用 Managed Serviceは、以下の状況ではスナップショットを作成できません。
+ アプリケーションがスナップショットの制限を超えました。スナップショットの上限は 1,000 件です。詳細については、「[スナップショットを使用してアプリケーションバックアップを管理する](how-snapshots.md)」を参照してください。
+ アプリケーションには、ソースまたはシンクにアクセスするアクセス許可がありません。
+ アプリケーションコードが正しく機能していない。
+ アプリケーションには他の構成上の問題も発生しています。

アプリケーションの更新中にスナップショットを作成しているとき、またはアプリケーションを停止しているときに例外が発生した場合は、アプリケーションの「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html)」の `SnapshotsEnabled` プロパティを `false` に設定して、リクエストを再試行してください。

アプリケーションのオペレータが適切にプロビジョニングされていないと、スナップショットが失敗する可能性があります。オペレータのパフォーマンスのチューニングについては、 [オペレータースケーリング](performance-improving.md#performance-improving-scaling-op) を参照してください。

アプリケーションが正常な状態に戻ったら、アプリケーションの `SnapshotsEnabled` プロパティを `true` に設定することをお勧めします。

## VPC 内のリソースにアクセスできない
<a name="troubleshooting-rt-vpc"></a>

アプリケーションが Amazon VPC 上で実行されている VPC を使用している場合は、以下を実行して、アプリケーションがそのリソースにアクセスできることを確認します。
+ CloudWatch ログをチェックして、次のエラーがないかどうかを確認します。このエラーは、アプリケーションが VPC 内のリソースにアクセスできないことを示しています。

  ```
  org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
  ```

  このエラーが表示された場合は、ルートテーブルが正しく設定されていることと、コネクタの接続設定が正しいことを確認してください。

  CloudWatch ログのセットアップと分析については、 [Amazon Managed Service for Apache Flink でのロギングとモニタリング](monitoring-overview.md) を参照してください。

## Amazon S3 バケットの書き込み時にデータが失われる
<a name="troubleshooting-rt-s3"></a>

Apache Flink バージョン 1.6.2 を使用して Amazon S3 バケットに出力を書き込むと、一部のデータが失われる可能性があります。Amazon S3 を直接出力に使用する場合は、サポートされている最新バージョンの Apache Flink を使用することをお勧めします。Apache Flink 1.6.2 を使用して Amazon S3 バケットに書き込むには、Firehose を使用することをお勧めします。Managed Service for Apache Flink で Firehose を使用する方法の詳細については、「[Firehose シンク](earlier.md#get-started-exercise-fh)」を参照してください。

## アプリケーションは RUNNING ステータスにありますが、データを処理していない
<a name="troubleshooting-rt-processing"></a>

アプリケーションのステータスは、「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplications.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplications.html)」または「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplication.html)」のアクションを使用して確認できます。アプリケーションが `RUNNING` ステータスに入ったがシンクにデータを書き込んでいない場合は、Amazon CloudWatch ログストリームをアプリケーションに追加することで問題をトラブルシューティングできます。詳細については、「[アプリケーション CloudWatch ロギングオプションを使用する](cloudwatch-logs.md#adding_cloudwatch)」を参照してください。ログストリームには、アプリケーションの問題のトラブルシューティングに使用できるメッセージが含まれています。

## スナップショット、アプリケーション更新、またはアプリケーション停止エラー: InvalidApplicationConfigurationException
<a name="troubleshooting-rt-appconfigexception"></a>

スナップショット操作中、またはアプリケーションの更新や停止など、スナップショットを作成する操作中に、次のようなエラーが発生することがあります。

```
An error occurred (InvalidApplicationConfigurationException) when calling the UpdateApplication operation: 

Failed to take snapshot for the application xxxx at this moment. The application is currently experiencing downtime. 
Please check the application's CloudWatch metrics or CloudWatch logs for any possible errors and retry the request. 
You can also retry the request after disabling the snapshots in the Managed Service for Apache Flink console or by updating 
the ApplicationSnapshotConfiguration through the AWS SDK
```

このエラーは、アプリケーションがスナップショットを作成できない場合に発生します。

スナップショット操作またはスナップショットを作成する操作中にこのエラーが発生した場合は、次の操作を実行してください。
+ アプリケーションのスナップショットを無効にします。これは、Apache Flink のマネージドサービスコンソールで行うことも、「[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)」アクションの `SnapshotsEnabledUpdate` パラメータを使用して行うこともできます。
+ スナップショットを作成できない理由を調べてください。詳細については、「[アプリケーションが一時的な状態で停止している](#troubleshooting-rt-stuck)」を参照してください。
+ アプリケーションが正常な状態に戻ったら、スナップショットを再度有効にします。

## java.nio.File.noSuchFileException: /usr/local/openjdk-8/lib/security/cacerts
<a name="troubleshooting-rt-fnf"></a>

SSL トラストストアの場所は以前のデプロイで更新されました。代わりに、以下の値を `ssl.truststore.location` パラメータに使用します

```
/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts
```

# アプリケーションが再起動している
<a name="troubleshooting-rt-restarts"></a>

アプリケーションが正常でない場合、その Apache Flink ジョブは繰り返し失敗して再起動します。このセクションでは、この状態の症状とトラブルシューティングの手順について説明します。

## 症状
<a name="troubleshooting-rt-restarts-symptoms"></a>

この状態では、次の症状が発生する可能性があります。
+ `FullRestarts` 指標はゼロではない。このメトリクスは、アプリケーションを起動してからアプリケーションのジョブが再開された回数を表します。
+ `Downtime` 指標はゼロではない。このメトリクスは、アプリケーションが `FAILING` または `RESTARTING` のステータスにあるミリ秒数を表します。
+ アプリケーションログには、`RESTARTING` または `FAILED` へのステータス変更が含まれます。以下の CloudWatch Logs Insights のクエリを使用して、これらのステータス変更についてアプリケーションログをクエリできます: [エラーの分析: アプリケーションタスク関連の障害](cloudwatch-logs-reading.md#cloudwatch-logs-reading-apps)。

## 原因と解決策
<a name="troubleshooting-rt-restarts-causes"></a>

次のような状況になると、アプリケーションが不安定になり、再起動を繰り返す可能性があります。
+ **オペレータが例外をスローしている:** アプリケーション内のオペレータの例外が処理されない場合、アプリケーションは (オペレータがその失敗を処理できないと解釈して) フェイルオーバーします。「一度だけ」の処理セマンティクスを維持するため、アプリケーションは最新のチェックポイントから再起動します。そのため、この再起動中は `Downtime` は 0 ではありません。これを防ぐには、アプリケーションコード内の再試行可能な例外をすべて処理することをお勧めします。

  この状態の原因は、アプリケーションの状態が `RUNNING` から `FAILED` に変更されていないかアプリケーションのログをクエリして調べることができます。詳細については、「[エラーの分析: アプリケーションタスク関連の障害](cloudwatch-logs-reading.md#cloudwatch-logs-reading-apps)」を参照してください。
+ **Kinesis Data Streams が適切にプロビジョニングされていない:** アプリケーションのソースまたはシンクが Kinesis データストリームの場合は、ストリームの [メトリクス](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html) に `ReadProvisionedThroughputExceeded` または `WriteProvisionedThroughputExceeded` エラーがないか確認してください。

  これらのエラーが表示される場合は、ストリームのシャード数を増やすことで Kinesis ストリームの利用可能なスループットを増やすことができます。詳細については、「[Kinesis データストリームで開いているシャードの数を変更するにはどうすればよいですか](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-streams-open-shards/)」を参照してください。
+ **他のソースまたはシンクが適切にプロビジョニングされていないか、使用できない:** アプリケーションがソースとシンクを正しくプロビジョニングしていることを確認してください。アプリケーションで使用されるソースまたはシンク (他の AWS サービス、外部ソースまたは送信先など) が適切にプロビジョニングされているか、読み取りまたは書き込みスロットリングが発生していないか、または定期的に使用できないことを確認します。

  依存するサービスでスループット関連の問題が発生している場合は、それらのサービスが利用できるリソースを増やすか、エラーや利用不能の原因を調査してください。
+ **オペレータが適切にプロビジョニングされていない:** アプリケーション内のいずれかのオペレータのスレッドのワークロードが正しく分散されていないと、オペレータが過負荷になり、アプリケーションがクラッシュする可能性があります。オペレータの並列処理のチューニングについては、「[オペレータースケーリングの適切な管理](performance-improving.md#performance-improving-scaling-op)」を参照してください。
+ **アプリケーションが DaemonException で失敗する:** 1.11 より前のバージョンの Apache Flink を使用している場合、このエラーはアプリケーションログに表示されます。0.14 以降の KPL バージョンを使用するには、Apache Flink の新しいバージョンへのアップグレードが必要な場合があります。
+ **TimeoutException、FlinkException、または RemoteTransportException でアプリケーションが失敗する:** これらのエラーは、タスクマネージャがクラッシュした場合にアプリケーションログに表示されることがあります。アプリケーションが過負荷になると、タスクマネージャーに CPU やメモリのリソースが圧迫され、タスクマネージャーが機能しなくなる可能性があります。

  これらのエラーは次のようになります。
  + `java.util.concurrent.TimeoutException: The heartbeat of JobManager with id xxx timed out`
  + `org.apache.flink.util.FlinkException: The assigned slot xxx was removed`
  + `org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager`

  この問題のトラブルシューティングを行うには、以下を確認します。
  + CloudWatch メトリクスをチェックして、CPU やメモリの使用量が異常に急増していないかを確認してください。
  + アプリケーションにスループットの問題がないかチェックしてください。詳細については、「[パフォーマンスの問題をトラブルシューティングする](performance-troubleshooting.md)」を参照してください。
  + アプリケーションログを調べて、アプリケーションコードで発生させている未処理の例外がないか調べてください。
+ **JaxBanNotationModule Not Found エラーでアプリケーションが失敗する:** このエラーは、アプリケーションが Apache Beam を使用しているが、正しい依存関係または依存バージョンがない場合に発生します。Apache Beam を使用する Apache Flink 用 Managed Serviceアプリケーションでは、以下のバージョンの依存関係を使用する必要があります。

  ```
  <jackson.version>2.10.2</jackson.version>
  ...
  <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-jaxb-annotations</artifactId>
      <version>2.10.2</version>
  </dependency>
  ```

  `jackson-module-jaxb-annotations` の正しいバージョンを明示的な依存関係として指定しないと、アプリケーションはそれを環境の依存関係から読み込み、バージョンが一致しないため、実行時にアプリケーションがクラッシュします。

  Apache Beam 向けの Apache Flink 用 Managed Service の使用に関する詳細については、[CloudFormation を使用するApache Beam を使用してアプリケーションを作成する](examples-beam.md) を参照してください。
+ 「**アプリケーションが java.io.IOException: ネットワークバッファの数が不十分で失敗する**」

  アプリケーションに十分なメモリがネットワークバッファに割り当てられていない場合に発生します。ネットワークバッファはサブタスク間の通信を容易にします。ネットワーク経由で送信する前にレコードを保存したり、受信データをレコードに分解してサブタスクに渡す前に保存したりするために使用されます。必要なネットワークバッファの数は、ジョブグラフの並列処理と複雑さに直接影響します。この問題を軽減する方法はいくつかあります。
  + `parallelismPerKpu` を低く設定することで、サブタスクやネットワーク・バッファごとに割り当てられるメモリーを増やすことができます。`parallelismPerKpu`を下げると KPU が増加し、したがってコストも増加することに注意してください。これを避けるには、並列処理を同じ係数だけ下げることで、同じ量の KPU を維持できます。
  + オペレータの数を減らすか、オペレータをチェーン化して必要なバッファの数を減らすことで、ジョブグラフを簡略化できます。
  + それ以外の場合は、https://aws.amazon.com/premiumsupport/ に連絡して、カスタムネットワークバッファー構成を依頼することもできます。

# スループットが遅すぎる
<a name="troubleshooting-rt-throughput"></a>

アプリケーションが受信ストリーミングデータを十分な速さで処理しないと、パフォーマンスが低下し、不安定になります。このセクションでは、この状態の症状とトラブルシューティングの手順について説明します。

## 症状
<a name="troubleshooting-rt-throughput-symptoms"></a>

この状態では、次の症状が発生する可能性があります。
+ アプリケーションのデータソースが Kinesis ストリームの場合、ストリームの `millisbehindLatest` メトリクスは継続的に増加します。
+ アプリケーションのデータソースが Amazon MSK クラスターの場合、クラスターのコンシューマーラグメトリクスは増え続けます。詳細については、「[Amazon MSK 開発者ガイド](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html)」の「[コンシューマーラグモニタリング](https://docs.aws.amazon.com/msk/latest/developerguide/consumer-lag.html)」を参照してください。
+ アプリケーションのデータソースが別のサービスまたはソースである場合は、入手可能なコンシューマーラグのメトリクスまたはデータを確認してください。

## 原因と解決策
<a name="troubleshooting-rt-throughput-causes"></a>

アプリケーションのスループットが遅くなる原因は多数あります。アプリケーションが入力に追いついていない場合は、次の点を確認してください。
+ スループットラグが急上昇し、その後次第に減少する場合は、アプリケーションが再起動しているかどうかを確認してください。アプリケーションは再起動中に入力の処理を停止するため、遅延が急増します。アプリケーションの障害については、「[アプリケーションが再起動している](troubleshooting-rt-restarts.md)」を参照してください。
+ スループットの遅延がずっと続く場合は、アプリケーションのパフォーマンスが最適化されているかどうかを確認してください。アプリケーションのパフォーマンスを最適化する方法については、 [パフォーマンスの問題をトラブルシューティングする](performance-troubleshooting.md) を参照してください。
+ スループットラグが急上昇しているのではなく継続的に増加していて、アプリケーションのパフォーマンスが最適化されている場合は、アプリケーションリソースを増やす必要があります。アプリケーションリソースを増やす方法については、 [アプリケーションスケーリングを実装する](how-scaling.md) を参照してください。
+ アプリケーションが別のリージョンの Kafka クラスターから読み取りを行ったり、コンシューマーラグが大きいにもかかわらず `FlinkKafkaConsumer` または `KafkaSource` がほとんどアイドル状態 (高い `idleTimeMsPerSecond` または低い `CPUUtilization`) になっている場合は、2097152 など `receive.buffer.byte` の値を増やすことができます。詳細については、「[カスタム MSK 設定](https://docs.aws.amazon.com/msk/latest/developerguide/msk-configuration-properties.html)」の高レイテンシー環境セクションを参照してください。

アプリケーションソースのスループットが低下したり、コンシューマーラグが増加したりする場合のトラブルシューティング手順については、 [パフォーマンスの問題をトラブルシューティングする](performance-troubleshooting.md) を参照してください。

# 際限のない状態の成長
<a name="troubleshooting-rt-stateleaks"></a>

アプリケーションが古い状態情報を適切に処理しないと、情報が継続的に蓄積され、アプリケーションのパフォーマンスや安定性の問題が発生します。このセクションでは、この状態の症状とトラブルシューティングの手順について説明します。

## 症状
<a name="troubleshooting-rt-stateleaks-symptoms"></a>

この状態では、次の症状が発生する可能性があります。
+ `lastCheckpointDuration` 指標は徐々に増加しているか、急上昇しています。
+ `lastCheckpointSize` 指標は徐々に増加しているか、急上昇しています。

## 原因と解決策
<a name="troubleshooting-rt-stateleaks-causes"></a>

次のような状況では、アプリケーションに状態データが蓄積される可能性があります。
+ アプリケーションが必要以上に長く状態データを保持している。
+ アプリケーションがウィンドウクエリを使用していて、時間が長すぎる。
+ ステートデータに TTL を設定していません。詳細については、「Apache Flink ドキュメント」の「[State Time-To-Live (TTL)](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl)」を参照してください。
+ Apache Beam バージョン 2.25.0 以降に依存するアプリケーションを実行している。主要な実験と値 `use_deprecated_read` で「[BeamApplicationPropertiesを拡張](https://docs.aws.amazon.com/managed-flink/latest/java/examples-beam.html#examples-beam-configure)」することにより、新バージョンの読み取り変換を拒否することができます。詳細については、[Apache Beam Documentation](https://beam.apache.org/blog/beam-2.25.0/#highlights) を参照してください。

アプリケーションがステートサイズの拡大に直面することがありますが、これは長期的には持続不可能です（結局、Flink アプリケーションは無期限に実行されます）。場合によっては、アプリケーションがデータをそのままの状態で保存していて、古い情報を適切にエージングアウトしていないことが原因であることもあります。しかし、Flink が提供できることに対して、単に理不尽な期待が寄せられることもあります。アプリケーションでは、数日から数週間に及ぶ長い時間枠にわたってアグリゲーションを使用することがあります。インクリメンタルな集計が可能な「[AggregateFunctions](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/#aggregatefunction)」を使用しない限り、Flink はウィンドウ全体のイベントをそのままの状態に保つ必要があります。

さらに、プロセス関数を使用してカスタムオペレータを実装する場合、アプリケーションはビジネスロジックで不要になったデータを状態から削除する必要があります。その場合は、「[ステートの有効期間](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl)」を利用して、処理時間に基づいてデータを自動的にエージングアウトできます。Apache Flink のマネージドサービスはインクリメンタルチェックポイントを使用しているため、ステート ttl は 「[RocksDB コンパクション](https://github.com/facebook/rocksdb/wiki/Compaction)」に基づいています。ステートサイズ (チェックポイントサイズで示される) が実際に減少するのを確認できるのは、コンパクション操作が行われた後だけです。特に 200 MB 未満のチェックポイントサイズでは、ステートの有効期限が切れることによってチェックポイントのサイズが減少することはほとんどありません。ただし、セーブポイントは古いデータを含まない状態のクリーンコピーに基づいているため、Apache Flink 用 Managed Serviceでスナップショットをトリガーして、古い状態を強制的に削除できます。

デバッグ目的では、チェックポイントのサイズが実際に減少または安定することをより迅速に検証する (そして ROCKSBS でのコンパクションの影響を避ける) ために、インクリメンタルチェックポイントを無効にするのが理にかなっています。ただし、これにはサービスチームへのチケットが必要です。

# I/O バウンドオペレーター
<a name="troubleshooting-io-bound-operators"></a>

データパス上の外部システムへの依存を避けた方がいいです。個々のイベントを充実させるために外部システムに問い合わせるよりも、参照データセットを状態にしておく方がはるかにパフォーマンスが高くなることが多いです。ただし、Amazon Sagemaker でホストされている機械学習モデルでイベントを充実させたい場合など、状態に簡単に移行できない依存関係がある場合もあります。

ネットワークを介して外部システムとやり取りするオペレーターはボトルネックになり、バックプレッシャーの原因となる可能性があります。機能の実装には「[AsyncIO](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio/)」を使用することを強くお勧めします。これにより、個々の呼び出しの待ち時間を短縮し、アプリケーション全体の処理速度が低下するのを防ぐことができます。

さらに、I/O バウンドオペレーターを使用するアプリケーションでは、Apache Flink アプリケーション用 Managed Service の「[ParallelismPerKPU](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ParallelismConfiguration.html)」設定を増やすことも意味があります。このコンフィギュレーションは、アプリケーションがKPU（Kinesis Processing Unit）ごとに実行できる並列サブタスクの数を記述します。この値をデフォルトの 1 からたとえば 4 に増やすと、アプリケーションは同じリソース (同じコスト) を利用しますが、並列度を 4 倍に拡張できます。これは I/O バインドアプリケーションには有効ですが、I/O バインドでないアプリケーションにはさらなるオーバーヘッドを引き起こします。

# Kinesis データストリームからのアップストリームまたはソーススロットリング
<a name="troubleshooting-source-throttling"></a>

**症状**: アプリケーションがアップストリームのソース Kinesis データストリームから `LimitExceededExceptions` を受信しています。

**考えられる原因**: Apache Flink ライブラリ Kinesis コネクタのデフォルト設定は、Kinesis データストリームソースから読み込むように設定されており、`GetRecords` 呼び出しごとにフェッチされるレコードの最大数は非常にアグレッシブなデフォルト設定になっています。Apache Flink はデフォルトで、`GetRecords` 呼び出しごとに 10,000 レコードを取得するように設定されています (この呼び出しはデフォルトで 200 ミリ秒ごとに行われます)。ただし、シャードごとの上限は 1,000 レコードのみです。

このデフォルトの動作により、Kinesis データストリームからデータを使用しようとするとスロットリングが発生することがあり、アプリケーションのパフォーマンスと安定性に影響が及びます。

これは、CloudWatch `ReadProvisionedThroughputExceeded` メトリクスをチェックし、このメトリックスがゼロより大きい期間持続していることで確認できます。

これは、Amazon Managed Service for Apache Flink アプリケーションの CloudWatch Logs でも、`LimitExceededException` エラーが続いているのを把握することで確認できます。

**解決策**: このシナリオを解決するために、次の 2 つの方法のいずれかを実行できます。
+ `GetRecords` 呼び出しごとに取得されるレコード数のデフォルト制限を下げる
+ Amazon Managed Service for Apache Flink アプリケーションでアダプティブリードを有効にします。アダプティブリード機能の詳細については、「[SHARD\$1USE\$1ADAPTIVE\$1READS](https://nightlies.apache.org/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.html#SHARD_USE_ADAPTIVE_READS)」を参照してください。

# チェックポイント
<a name="troubleshooting-checkpoints"></a>

チェックポイントは、アプリケーションの状態をフォールトトレラントに保つための Flink のメカニズムです。このメカニズムにより、ジョブが失敗した場合に Flink はオペレーターの状態を回復でき、アプリケーションには障害のない実行と同じセマンティクスが与えられます。Apache Flink 用 Managed Serviceでは、アプリケーションの状態は RocksDB に保存されます。RocksDB は組み込みのキー/バリューストアで、動作状態をディスク上に保持します。チェックポイントを取得すると、その状態は Amazon S3 にもアップロードされるため、ディスクが失われた場合でも、チェックポイントを使用してアプリケーションの状態を復元できます。

詳細については、「[状態スナップショットの仕組み](https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/fault_tolerance/#how-does-state-snapshotting-work)」を参照してください。

## チェックポイントステージ
<a name="troubleshooting-checkpointing-stages"></a>

Flink のチェックポインティングオペレーターサブタスクには、主に 5 つのステージがあります。
+ Waiting 「**Start Delay**」— Flink はストリームに挿入されたチェックポイントバリアを使用するため、このステージの時間はオペレータがチェックポイントバリアに到達するのを待つ時間です。
+ アライメント「**Alignment Duration**」 — この段階では、サブタスクは1つのバリアに達しましたが、他の入力ストリームからのバリアを待っています。
+ 同期チェックポイント 「**同期時間**」 — この段階は、サブタスクが実際にオペレータの状態のスナップショットを撮り、サブタスク上の他のすべてのアクティビティをブロックする段階です。
+ 非同期チェックポイント 「**非同期時間**」 — この段階の大部分は、Amazon S3 に状態をアップロードするサブタスクです。この段階では、サブタスクはブロックされなくなり、レコードを処理できるようになります。
+ 確認 — 通常は短い段階で、サブタスクがJobManager に承認を送信し、コミットメッセージ (Kafkaシンクなど) を実行するだけです。

 これらの各段階（確認は除く）は、Flink WebUIから入手できるチェックポイントの期間メトリックに対応しており、チェックポイントが長くなる原因の特定に役立ちます。

チェックポイントで利用できる各メトリックの正確な定義を確認するには、「[履歴タブ](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/checkpoint_monitoring/#history-tab)」を参照してください。

## 調査中
<a name="troubleshooting-checkpoints-investigating"></a>

長いチェックポイント期間を調査する場合、決定すべき最も重要なのはチェックポイントのボトルネック、つまりどのオペレーターとサブタスクがチェックポイントに最も時間がかかっているのか、そのサブタスクのどの段階に長時間かかっているのかを判断することです。これは、ジョブチェックポイントタスクの Flink WebUI を使用して確認できます。Flink の Web インターフェースには、チェックポイントの問題の調査に役立つデータや情報が表示されます。詳細については、「[チェックポイントの監視](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/checkpoint_monitoring/)」を参照してください。

 まず、Job グラフ内の各オペレータの「**エンドツーエンド期間**」を確認して、どのオペレータがチェックポイントに時間がかかっているかを判断し、さらに調査する必要があります。Flink のドキュメントによると、所要時間の定義は次のとおりです。

「トリガーのタイムスタンプから最新の確認応答までの期間（確認応答をまだ受け取っていない場合はn/a）。」 チェックポイントが完了するまでのこの終了までの期間は、チェックポイントを確認した最後のサブタスクによって決まります。「通常、この時間は 1 つのサブタスクが実際に状態をチェックポイントするのに必要な時間よりも長くなります。」

チェックポイントの他の時間でも、その時間がどこに費やされているかについて、より詳細な情報が得られます。

「**Sync Duration**」の値が大きい場合は、スナップショット作成中に何かが発生していることを示しています。この段階では `snapshotState()` がSnapshotState インターフェースを実装するクラスが呼び出されます。これはユーザーコードである可能性があるため、スレッドダンプはこれを調査するのに役立ちます。

「**非同期時間**」が長い場合は、Amazon S3 への状態のアップロードに多くの時間が費やされていると考えられます。これは、状態が大きい場合や、アップロードされる状態ファイルが多数ある場合に発生する可能性があります。このような場合は、アプリケーションがどのように状態を使用しているかを調べ、可能な限り Flink のネイティブデータ構造が使用されていることを確認する必要があります (「[Using Keyed State](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#using-keyed-state)」)。Apache Flink 用 Managed Service では、Amazon S3 の呼び出し回数を最小限に抑え、時間がかかりすぎないように Flink を設定します。以下は、オペレーターのチェックポイント統計の例です。前述のオペレータチェックポイント統計に比べて、「**非同期時間**」が比較的長いことがわかります。

![\[チェックポイント機能の調査\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/checkpoint.png)


「**Start Delay**」の値が大きい場合は、チェックポイントの障壁がオペレータに到達するのを待つ時間の大半が費やされていることがわかります。これは、アプリケーションがレコードを処理するのに時間がかかっていることを示しています。つまり、バリアがジョブグラフ内をゆっくりと流れているということです。これは通常、Job にバックプレッシャーがかかっている場合や、オペレーターが常に忙しい場合に発生します。以下は、2 番目の KeyedProcess オペレータがビジー状態になっている JobGraph の例です。

![\[チェックポイント機能の調査\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/checkpoint2.png)


Flink フレームグラフまたは TaskManager スレッドダンプを使用して、何がそんなに時間がかかっているのかを調べることができます。ボトルネックが特定されたら、フレームグラフまたはスレッドダンプを使用してさらに調査できます。

## スレッドダンプ
<a name="troubleshooting-checkpoints-investigating-thread-dumps"></a>

スレッドダンプは、フレームグラフよりもレベルが少し低いもう 1 つのデバッグツールです。スレッドダンプは、ある時点でのすべてのスレッドの実行状態を出力します。Flink は JVM スレッドダンプを受け取ります。これは Flink プロセス内のすべてのスレッドの実行状態です。スレッドの状態は、スレッドのスタックトレースといくつかの追加情報によって示されます。フレームグラフは、実際には複数のスタックトレースを短時間で連続して取得して構築されます。グラフはこれらのトレースを視覚化したもので、一般的なコードパスを簡単に識別できます。

```
"KeyedProcess (1/3)#0" prio=5 Id=1423 RUNNABLE
    at app//scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:154)
    at $line33.$read$$iw$$iw$ExpensiveFunction.processElement(<console>>19)
    at $line33.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:14)
    at app//org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at app//org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
    at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at app//org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    ...
```

上の図は、Flink UI から取得した単一スレッドのスレッドダンプのスニペットです。1 行目には、このスレッドに関する次のような一般的な情報が含まれています。
+ スレッド名「KeyedProcess (1/3) \$10」
+ スレッドの優先度「prio=5」
+ 一意のスレッド「Id=1423」
+ スレッド状態:「実行可能」

 通常、スレッドの名前からそのスレッドの一般的な目的に関する情報が得られます。オペレータースレッドはオペレータと同じ名前を持ち、それがどのサブタスクに関連しているかを示すので、オペレータースレッドは名前で識別できます。たとえば、「KeyedProcess (1/3) \$10」スレッドは「KeyedProcess」オペレータからのもので、1 番目 (3 つのうち) のサブタスクからのものです。

スレッドは、次に示す状態のいずれかになります。
+ NEW — スレッドは作成されましたが、まだ処理されていません。
+ RUNNABLE — スレッドは CPU 上で実行されています
+ BLOCKED — スレッドは別のスレッドがロックを解放するのを待っている
+ WAITING — スレッドは`wait()`、`join()`、または `park()` メソッドを使用して待機している
+ TIMED\$1WAITING — スレッドはスリープ、ウェイト、ジョイン、パークの各メソッドを使用して待機していますが、待機時間は最大です。

**注記**  
Flink 1.13 では、スレッドダンプ内の 1 つのスタックトレースの最大深度は 8 に制限されています。

**注記**  
スレッドダンプは読み取りが難しく、複数のサンプルを採取して手動で分析する必要があるため、Flink アプリケーションのパフォーマンス問題をデバッグする最後の手段はスレッドダンプです。できる限り、フレームグラフを使用するのが望ましいです。

### Flink のスレッドダンプです。
<a name="troubleshooting-checkpoints-investigating-thread-dumps-flink"></a>

Flink では、Flink UI の左側のナビゲーションバーで「**タスクマネージャ**」 オプションを選択し、特定のタスクマネージャーを選択して [スレッドダンプ] タブに移動すると、「**スレッドダンプ**」を実行できます。スレッドダンプは、ダウンロードしたり、お気に入りのテキストエディター（またはスレッドダンプアナライザー）にコピーしたり、Flink Web UIのテキストビュー内で直接分析したりできます（ただし、この最後のオプションは少し扱いにくい場合があります）。

どのタスクマネージャーを使用するかを判断するには、特定のオペレータを選択したときに「**TaskManagers**」タブのスレッドダンプを使用できます。これは、オペレータがオペレータのさまざまなサブタスクで実行されており、異なるタスクマネージャーでも実行できることを示しています。

![\[スレッドダンプを使用する\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/checkpoint4.png)


ダンプは複数のスタックトレースで構成されます。ただし、ダンプを調べるときには、オペレータに関連するものが最も重要です。オペレータースレッドにはオペレータと同じ名前があり、どのサブタスクに関連しているかがわかるので、これらは簡単に見つかります。たとえば、次のスタックトレースは「KeyedProcess」オペレータからのもので、最初のサブタスクです。

```
"KeyedProcess (1/3)#0" prio=5 Id=595 RUNNABLE
    at app//scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:155)
    at $line360.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:19)
    at $line360.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:14)
    at app//org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at app//org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
    at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at app//org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    ...
```

同じ名前のオペレータが複数あると混乱するかもしれませんが、オペレータに名前を付けることでこの問題を回避できます。例えば、次のようになります。

```
....
.process(new ExpensiveFunction).name("Expensive function")
```

## 「[フレームグラフ](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/)」
<a name="troubleshooting-checkpoints-investigating-flame-graphs"></a>

フレームグラフは、ターゲットコードのスタックトレースを視覚化する便利なデバッグツールです。これにより、最も頻繁に使用されるコードパスを特定できます。スタックトレースを何度もサンプリングして作成されます。フレームグラフの X 軸にはさまざまなスタックプロファイルが表示され、Y 軸にはスタックの深さとスタックトレースの呼び出しが表示されます。フレームグラフの 1 つの長方形はスタックフレームを表し、フレームの幅はスタック内での出現頻度を示します。フレームグラフとその使用方法の詳細については、「[フレームグラフ](https://www.brendangregg.com/flamegraphs.html)」を参照してください。

Flink では、オペレータを選択して FlameGraph タブを選択すると、Web UI からオペレータの「**フレームグラフ**」にアクセスできます。十分な数のサンプルが収集されると、フレームグラフが表示されます。以下は、チェックポイントまで時間がかかっていた ProcessFunction のフレームグラフです。

![\[フレームグラフを使用する\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/checkpoint3.png)


これは非常に単純なフレームグラフで、すべてのCPU時間が `processElement` ExpensiveFunctionオペレータ内の各ルックに費やされていることを示しています。また、コード内のどこで実行されているかを判断するのに役立つ行番号も表示されます。

# チェックポイントがタイムアウトしています。
<a name="troubleshooting-chk-timeout"></a>

アプリケーションが最適化されていなかったり、適切にプロビジョニングされていなかったりすると、チェックポイントが失敗する可能性があります。このセクションでは、この状態の症状とトラブルシューティングの手順について説明します。

## 症状
<a name="troubleshooting-chk-timeout-symptoms"></a>

アプリケーションのチェックポイントに障害が発生すると、`numberOfFailedCheckpoints` が 0 より大きくなります。

チェックポイントが失敗するのは、アプリケーションエラーなどの直接的な障害でも、アプリケーションリソース不足などの一時的な障害でもかまいません。アプリケーションログとメトリクスをチェックして、次の症状がないか調べてください。
+ コード内のエラー。
+ アプリケーションの依存サービスへのアクセス中にエラーが発生しました。
+ データのシリアル化中にエラーが発生しました。デフォルトのシリアライザーがアプリケーションデータをシリアル化できない場合、アプリケーションは失敗します。アプリケーションでカスタムシリアライザーを使用する方法については、「Apache Flink ドキュメント」の「[Data Types and Serialization](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/)」を参照してください。
+ メモリ不足のエラー
+ 以下の指標が急上昇または着実に増加しています。
  + `heapMemoryUtilization`
  + `oldGenerationGCTime`
  + `oldGenerationGCCount`
  + `lastCheckpointSize`
  + `lastCheckpointDuration`

チェックポイントの監視の詳細については、「Apache Flink ドキュメント」の「[Monitoring Checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/monitoring/checkpoint_monitoring/)」を参照してください。

## 原因と解決策
<a name="troubleshooting-chk-timeout-causes"></a>

アプリケーションログのエラーメッセージには、直接的な障害の原因が示されます。一時的な障害には以下の原因が考えられます。
+ アプリケーションの KPU プロビジョニングが不十分。アプリケーションのプロビジョニングを引き上げる方法については、 [アプリケーションスケーリングを実装する](how-scaling.md) を参照してください。
+ アプリケーションの状態サイズが大きすぎる。`lastCheckpointSize` メトリクスを使用してアプリケーションの状態サイズを監視できます。
+ アプリケーションの状態データはキー間で不均等に分散されます。アプリケーションで `KeyBy` オペレータを使用する場合は、受信データがキー間で均等に分割されていることを確認してください。ほとんどのデータが 1 つのキーに割り当てられていると、障害の原因となるボトルネックになります。
+ アプリケーションにメモリやガベージコレクションのバックプレッシャが発生しています。アプリケーションの`heapMemoryUtilization`、`oldGenerationGCTime`、`oldGenerationGCCount` の値が急上昇していないか、または着実に増加していないかを監視します。

# Apache Beam アプリケーションのチェックポイント障害
<a name="troubleshooting-chk-failure-beam"></a>

Beam アプリケーションが「[ShutdownSourcesAfterIdlems](https://beam.apache.org/documentation/runners/flink/#:~:text=shutdownSourcesAfterIdleMs)」を 0 ミリ秒に設定して構成されている場合、タスクが「FINISHED」状態になっているためにチェックポイントがトリガーされないことがあります。このセクションでは、この状態の症状と解決策について説明します。

## 症状
<a name="troubleshooting-chk-failure-beam-symptoms"></a>

Apache Flink アプリケーション用 Managed Service の CloudWatch logs に移動し、次のログメッセージが記録されているかどうかを確認します。次のログメッセージは、一部のタスクが完了したためにチェックポイントがトリガーされなかったことを示しています。

```
                {
                "locationInformation": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:888)",
                "logger": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator",
                "message": "Failed to trigger checkpoint for job your job ID since some tasks of job your job ID has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.",
                "threadName": "Checkpoint Timer",
                "applicationARN": your application ARN,
                "applicationVersionId": "5",
                "messageSchemaVersion": "1",
                "messageType": "INFO"
                }
```

一部のタスクが「FINISHED」状態になり、チェックポイントを設定できなくなった Flink ダッシュボードでも確認できます。

![\[「FINISHED」状態のタスク\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/beam_checkpoint_failure.png)


## 原因
<a name="troubleshooting-chk-failure-beam-causes"></a>

ShutdownSourcesAfterIdlems は、設定したミリ秒の間アイドル状態だったソースをシャットダウンするビーム設定変数です。ソースがシャットダウンされると、チェックポイント設定はできなくなります。これにより、「[チェックポイント障害](https://issues.apache.org/jira/browse/FLINK-2491)」が発生する可能性があります。

タスクが「FINISED」状態になる原因の 1 つは、ShutdownSourcesAfterIdlems が 0 ミリ秒に設定されている場合です。つまり、アイドル状態のタスクはすぐにシャットダウンされます。

## ソリューション
<a name="troubleshooting-chk-failure-beam-solution"></a>

タスクがすぐに「FINISHED」状態にならないようにするには、ShutdownSourcesAfterIdlems を Long.max\$1Value に設定します。これには 2 つの方法で実行できます。
+ オプション 1: Apache Flink 用 Managed Service のアプリケーション設定ページでビーム設定が設定されている場合は、新しいキー値のペアを追加して ShutDpwnSourcesAfterIdlems を次のように設定できます。  
![\[IdleMS 後のシャットダウンソースを Long.MAX_VALUE に設定します\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/beam_checkpoint_failure_solution.png)
+ オプション 2: JAR ファイルでビーム構成が設定されている場合は、ShutdownSourcesAfterIdlems を次のように設定できます。

  ```
                          FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class); // Initialize Beam Options object
  
                          options.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE); // set shutdownSourcesAfterIdleMs to Long.MAX_VALUE
                          options.setRunner(FlinkRunner.class);
  
                          Pipeline p = Pipeline.create(options); // attach specified options to Beam pipeline
  ```

# バックプレッシャー
<a name="troubleshooting-backpressure"></a>

Flink はバックプレッシャーを使用して個々のオペレーターの処理速度を調整します。

オペレーターは、さまざまな理由から、受信したメッセージ量の処理を続けるのに苦労することがあります。操作には、オペレータが使用できる量よりも多くの CPU リソースが必要となる場合があります。オペレータは I/O 操作が完了するまで待つ場合があります。オペレータがイベントを十分な速さで処理できない場合、処理速度が遅いオペレータに供給する上流のオペレータにバックプレッシャーがかかります。これにより、アップストリームのオペレーターの速度が低下し、バックプレッシャーがソースにさらに伝わり、ソースも速度を落とすことでアプリケーション全体のスループットに適応するようになります。バックプレッシャーとその仕組みについて詳しくは、「[Apache Flink™ がバックプレッシャーを処理する方法](https://www.ververica.com/blog/how-flink-handles-backpressure)」を参照してください。

アプリケーション内のどのオペレータが遅いのかを知ることで、アプリケーションのパフォーマンス問題の根本原因を理解するための重要な情報を得ることができます。バックプレッシャ情報は「[Flink ダッシュボードを通じて公開されます](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/monitoring/back_pressure/)」。処理速度が遅いオペレータを特定するには、シンクに最も近い背圧値が高いオペレータ (次の例ではオペレータ B) を探します。その場合、速度低下の原因となっているオペレータは、ダウンストリームのオペレータの 1 つ (この例ではオペレータ C) です。B はイベントをより速く処理できますが、実際の処理速度が遅いオペレータ C に出力を転送できないため、バックプレッシャーがかかっています。

```
A (backpressured 93%) -> B (backpressured 85%) -> C (backpressured 11%) -> D (backpressured 0%)
```

処理が遅いオペレータを特定したら、そのオペレータが遅い理由を理解するように努めてください。理由は無数にあるかもしれませんが、何が問題なのかが明らかではなく、解決までに何日ものデバッグとプロファイリングが必要な場合もあります。以下に、明らかで一般的な理由をいくつか挙げます。その一部を以下で詳しく説明します。
+ オペレータが、ネットワークコールなどの低速な I/O を実行している (代わりに AsyncIO の使用を検討してください)。
+ データに偏りがあり、1 人のオペレーターが他のオペレーターよりも多くのイベントを受信しています (Flink ダッシュボードの個々のサブタスク (つまり、同じオペレーターのインスタンス) の送受信メッセージ数を確認して確認してください。
+ リソースを大量に消費する（データ・スキューがない場合、CPU/メモリ・バウンドの作業ではスケールアウトを、I/Oバウンドの作業では `ParallelismPerKPU` を増やすことを検討する）。
+ オペレータへの広範囲なロギング (実稼働アプリケーションではロギングを最小限に抑えるか、代わりにデバッグ出力をデータストリームに送信することを検討してください)。

## 廃棄シンクによるスループットのテスト
<a name="troubleshooting-testing-throughput"></a>

「[Discarding Sink](https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.html)」は、アプリケーションの実行中に受信したすべてのイベントを単に無視します (シンクがないアプリケーションは実行に失敗します)。これは、スループットのテスト、プロファイリング、およびアプリケーションが適切にスケーリングされているかどうかの検証に非常に役立ちます。また、シンクがバックプレッシャーの原因になっているのか、それともアプリケーションなのかを検証するための、非常に実用的なサニティチェックでもあります (ただし、バックプレッシャのメトリクスをチェックするだけでも簡単でわかりやすい場合が多いです)。

アプリケーションのすべてのシンクを廃棄シンクに置き換え、本番データに似たデータを生成するモックソースを作成することで、特定の並列度設定におけるアプリケーションの最大スループットを測定できます。さらに、並列度を増やして、アプリケーションが適切にスケーリングされ、スループットが高くなると (データスキューなどにより) 発生するボトルネックがないことを確認できます。

# データスキュー機能
<a name="troubleshooting-data-skew"></a>

Flink アプリケーションはクラスター上で分散的に実行されます。Flink は複数のノードにスケールアウトするために、キー付きストリームの概念を採用しています。つまり、ストリームのイベントは、顧客 ID などの特定のキーに従って分割され、Flink はノードごとに異なるパーティションを処理できるということです。その後、「[キー付きウィンドウ](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/)」、「[プロセス関数](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function/)」、「[非同期 I/O](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio/)」など、多くの Flink オペレータがこれらのパーティションに基づいて評価されます。

パーティションキーの選択は、ビジネスロジックによって決まることがよくあります。同時に、「[DynamoDB](https://aws.amazon.com/dynamodb/)」や Spark などのベストプラクティスの多くが Flink にも同様に適用されます。たとえば、次のようなものがあります。
+ パーティションキーのカーディナリティを高く保つこと
+ パーティション間のイベントボリュームの偏りを回避

 Flink ダッシュボードでサブタスク (つまり、同じオペレータのインスタンス) の送受信レコードを比較することで、パーティション内のスキューを特定できます。さらに、Apache Flink 用 Managed Service モニタリングでは、`numRecordsIn/Out` と `numRecordsInPerSecond/OutPerSecond` のメトリクスをサブタスク・レベルでも公開するように設定できます。

# ステートスキュー機能
<a name="troubleshooting-state-skew"></a>

ステートフルオペレータ、つまりウィンドウなどのビジネスロジックの状態を維持するオペレータの場合、データスキューは常にステートスキューにつながります。サブタスクの中には、データに偏りがあるために他のサブタスクよりも多くのイベントを受け取り、そのため状態を維持するデータも多くなるものがあります。ただし、パーティションのバランスが均等なアプリケーションでも、その状態で保持されるデータの量には偏りがある可能性があります。たとえば、セッションウィンドウでは、一部のユーザーとセッションがそれぞれ他のユーザーよりもずっと長くなることがあります。長いセッションが同じパーティションに属していると、同じオペレーターの異なるサブタスクが保持するステートサイズのバランスが崩れてしまう可能性があります。

 ステートスキューは、個々のサブタスクに必要なメモリとディスクリソースを増やすだけでなく、アプリケーション全体のパフォーマンスを低下させる可能性もあります。アプリケーションがチェックポイントまたはセーブポイントを取得しているとき、オペレータの状態は Amazon S3 に保持され、ノードまたはクラスターの障害から状態を保護します。このプロセスの間 (特に Apache Flink 用 Managed Serviceでデフォルトで有効になっている 1 回限りのセマンティクスの場合)、チェックポイント/セーブポイントが完了するまで、外部から処理が停止します。データに偏りがある場合、操作を完了するまでの時間は、特に大量の状態を蓄積した 1 つのサブタスクによって制限される可能性があります。極端なケースでは、1 つのサブタスクが状態を維持できないことが原因で、チェックポイントやセーブポイントの取得に失敗することがあります。

 データスキューと同様に、ステートスキューはアプリケーションの処理速度を大幅に低下させる可能性があります。

 ステートスキューを特定するには、Flink ダッシュボードを活用できます。最近のチェックポイントまたはセーブポイントを見つけて、詳細内の個々のサブタスクに保存されているデータ量を比較します。

# 異なるリージョンのリソースと統合する
<a name="troubleshooting-resources-in-different-regions"></a>

Flink 設定のクロスリージョンレプリケーションに必要な設定により、Apache Flink アプリケーション用 Managed Serviceとは異なるリージョンの Amazon S3 バケットへの書き込みに `StreamingFileSink` を使用できるようになります。そのためには、「[AWS サポート Center](https://console.aws.amazon.com/support/home#/)」にサポートチケットを提出してください。