

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Managed Service for Apache Flink をトラブルシューティングする
<a name="troubleshooting"></a>

次のトピックは、Amazon Managed Service for Apache Flink で発生する可能性のある問題のトラブルシューティングに役立ちます。

適切なトピックを選択して、ソリューションを確認します。

**Topics**
+ [開発におけるトラブルシューティング](troubleshooting-development.md)
+ [ランタイムのトラブルシューティング](troubleshooting-runtime.md)

# 開発におけるトラブルシューティング
<a name="troubleshooting-development"></a>

このセクションには、Managed Service for Apache Flink アプリケーションにおける開発時の問題の診断と修正に関する情報を提供します。

**Topics**
+ [システムロールバックのベストプラクティス](troubleshooting-system-rollback.md)
+ [Hudi 設定のベストプラクティス](troubleshooting-hudi.md)
+ [Apache Flink Flame Graphs](troubleshooting-update-flamegraphs.md)
+ [EFO コネクタ 1.15.2 における認証情報プロバイダーの問題](troubleshooting-credential-provider.md)
+ [サポートされていない Kinesis コネクタを使用するアプリケーション](troubleshooting-unsupported-kinesis-connectors.md)
+ [コンパイルエラー:「プロジェクトの依存関係を解決できませんでした」](troubleshooting-compile.md)
+ [無効な選択肢:「kinesisanalyticsv2」](troubleshooting-cli-update.md)
+ [UpdateApplication アクションがアプリケーションコードをリロードしない](troubleshooting-update.md)
+ [S3 ストリーミングファイルシンク:ファイルが見つかりません (例外)](troubleshooting-s3sink.md)
+ [FlinkKafka: セーブポイントによる停止に関するコンシューマの問題](troubleshooting-FlinkKafkaConsumer.md)
+ [Flink 1.15 非同期シンクデッドロック](troubleshooting-async-deadlock.md)
+ [Amazon Kinesis Data Streams のソース処理がリシャーディング中に順序通りに処理されない](troubleshooting-kinesis-data-streams-processing-out-of-order.md)
+ [リアルタイムベクトル埋め込みブループリントに関するよくある質問とトラブルシューティング](troubleshooting-blueprints.md)

# システムロールバックのベストプラクティス
<a name="troubleshooting-system-rollback"></a>

Amazon Managed Service for Apache Flink の自動システムロールバックとオペレーションの可視化機能により、アプリケーションの問題を特定して解決できます。

## システムロールバック
<a name="troubleshooting-unsupported-kinesis-connectors-error"></a>

コードのバグやアクセス許可の問題など、顧客側のエラーが原因でアプリケーションの更新またはスケーリング操作が失敗した場合、この機能にオプトインしていれば、Amazon Managed Service for Apache Flink は自動で以前の稼働バージョンへロールバックを試みます。詳細については、「[Managed Service for Apache Flink アプリケーションのシステムロールバックを有効にする](how-system-rollbacks.md)」を参照してください。この自動ロールバックが失敗した場合、またはオプトインまたはオプトアウトしていない場合、アプリケーションは `READY` 状態になります。アプリケーションを更新するには、次のステップを実行します。   Amazon Managed Service for Apache Flink のコンソールをチェックするか、`DescribeApplicationOperation` API を利用して、エラーの説明を表示し、操作が失敗した理由を確認します。   完全なエラースタックの場合は、[CloudWatch logs](https://docs.aws.amazon.com/managed-flink/latest/java/logging.html) を使用します。   一般的な問題には、アクセス許可の不足、互換性のないコード変更、インフラストラクチャの設定ミスがあります。根本的な問題を解決します。   `UpdateApplicaton` API を使用して、新しいアプリケーションバージョンを再デプロイします。   

## 手動ロールバック
<a name="troubleshooting-unsupported-kinesis-connectors-error"></a>

アプリケーションが進行しておらず、長時間一時的な状態である場合、またはアプリケーションが正常に `Running` に移行したが、正常に更新された Flink アプリケーションで処理エラーなどのダウンストリームの問題が表示される場合は、`RollbackApplication` API を使用して手動でロールバックできます。

1. `RollbackApplication` の呼び出し- これにより、以前の稼働バージョンに戻り、以前の状態が復元されます。

1. `DescribeApplicationOperation` API を使用してロールバック操作をモニタリングします。

1. ロールバックが失敗した場合は、前のシステムロールバックステップを使用します。

## 操作の可視性
<a name="troubleshooting-unsupported-kinesis-connectors-error"></a>

`ListApplicationOperations` API には、アプリケーションのすべての顧客とシステムオペレーションの履歴が表示されます。

1. 失敗した操作の *operationId* をリストから取得します。

1. `DescribeApplicationOperation` を呼び出し、ステータスと *statusDescription* を確認します。

1. もし操作が失敗した場合、その説明には調査対象となる可能性のあるエラーが記載されます。

**一般的なエラーコードのバグ:** ロールバック機能を使用して、最後の動作バージョンに戻ります。バグを解決し、更新を再試行します。

**アクセス許可の問題:** `DescribeApplicationOperation` を使用して、必要なアクセス許可を確認します。アプリケーションのアクセス許可を更新して再試行します。

**Amazon Managed Service for Apache Flink サービスの問題:** を確認する AWS Health Dashboard か、サポートケースを開きます。

# Hudi 設定のベストプラクティス
<a name="troubleshooting-hudi"></a>

Managed Service for Apache Flink で Hudi コネクタを実行するには、次の設定変更をお勧めします。

`hoodie.embed.timeline.server` の無効化

Flink の Hudi コネクタは、Flink ジョブマネージャー (JM) に埋め込みタイムライン (TM) サーバーをセットアップし、メタデータをキャッシュすることで、ジョブの並列度が高いときのパフォーマンスを向上させます。JM と TM 間の非 Flink 通信を無効にしているため、Managed Service for Apache Flink でこの埋め込みサーバーを無効にすることを推奨します。

このサーバーが有効になっている場合、Hudi 書き込みはまず JM の埋め込みサーバーへの接続を試み、Amazon S3 からのメタデータの読み取りにフォールバックします。Hudi が接続タイムアウトを発生させることで書き込み処理が遅れ、その結果 Managed Service for Apache Flink のパフォーマンスに影響を及ぼします。

# Apache Flink Flame Graphs
<a name="troubleshooting-update-flamegraphs"></a>

フレームグラフをサポートする Apache Flink バージョンのマネージドサービスのアプリケーションでは、フレームグラフがデフォルトで有効になっています。「[Flink ドキュメント](https://nightlies.apache.org/flink/flink-docs-release-1.15//docs/ops/debugging/flame_graphs/)」に記載されているように、フレームグラフを開いたままにしておくと、アプリケーションのパフォーマンスに影響する可能性があります。

 アプリケーションの Flame Graph を無効にする場合は、ケースを作成してアプリケーション ARN で無効化するようリクエストしてください。詳細については、この「[AWS サポートセンター](https://console.aws.amazon.com/support/home#/)」を参照してください。

# EFO コネクタ 1.15.2 における認証情報プロバイダーの問題
<a name="troubleshooting-credential-provider"></a>

1.15.2 以前のバージョンの Kinesis Data Streams EFO コネクタには、 `FlinkKinesisConsumer` が `Credential Provider` 設定を考慮しないという「[既知の問題](https://issues.apache.org/jira/browse/FLINK-29205)」があります。この問題により有効な構成が無視され、「`AUTO`」認証情報プロバイダーが使用されることになります。これにより、EFO コネクタを使用した Kinesis へのクロスアカウントアクセスで問題が発生する可能性があります。

このエラーを解決するには、EFO コネクタバージョン 1.15.3 以降を使用してください。

# サポートされていない Kinesis コネクタを使用するアプリケーション
<a name="troubleshooting-unsupported-kinesis-connectors"></a>

Apache Flink バージョン 1.15 以降向けの Managed Service for Apache Flink は、アプリケーション JAR またはアーカイブ (ZIP) にバンドルされているサポートされていない Kinesis Connector バージョン (バージョン 1.15.2 以前) を使用している場合、[アプリケーションの起動または更新を自動的に拒否](https://docs.aws.amazon.com/managed-flink/latest/java/flink-1-15-2.html) します。

## 拒否エラー
<a name="troubleshooting-unsupported-kinesis-connectors-error"></a>

アプリケーションの作成/更新コールを送信すると、次のようなエラーが表示されます：

```
An error occurred (InvalidArgumentException) when calling the CreateApplication operation: An unsupported Kinesis connector version has been detected in the application. Please update flink-connector-kinesis to any version equal to or newer than 1.15.2.
For more information refer to connector fix: https://issues.apache.org/jira/browse/FLINK-23528
```

## 修正手順
<a name="troubleshooting-unsupported-kinesis-connectors-steps-to-remediate"></a>
+ アプリケーションの `flink-connector-kinesis` への依存関係を更新します。Maven をプロジェクトのビルド・ツールとして使用している場合は、 [Maven の依存関係を更新してください。](#troubleshooting-unsupported-kinesis-connectors-update-maven-dependency) に従ってください。Gradle を使用している場合は、 [Gradle の依存関係を更新してください。](#troubleshooting-unsupported-kinesis-connectors-update-gradle-dependency) に従ってください。
+ アプリケーションをリパッケージする。
+ Amazon S3 バケットにアップロードします。
+ Amazon S3 バケットにアップロードしたばかりの改訂アプリケーションを使用して、アプリケーションの作成/更新リクエストを再送信します。
+ 同じエラーメッセージが引き続き表示される場合は、アプリケーションの依存関係を再確認してください。問題が解決しない場合は、サポートチケットを作成してください。

### Maven の依存関係を更新してください。
<a name="troubleshooting-unsupported-kinesis-connectors-update-maven-dependency"></a>

1. プロジェクトの `pom.xml` を開きます。

1. プロジェクトの依存関係を検索します。それらは以下のようになります。

   ```
   <project>
   
       ...
   
       <dependencies>
   
           ...
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-connector-kinesis</artifactId>
           </dependency>
   
           ...
   
       </dependencies>
   
       ...
   
   </project>
   ```

1. `flink-connector-kinesis` を1.15.2と同じバージョンまたはそれより新しいバージョンに更新します。例:

   ```
   <project>
   
       ...
   
       <dependencies>
   
           ...
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-connector-kinesis</artifactId>
               <version>1.15.2</version>
           </dependency>
   
           ...
   
       </dependencies>
   
       ...
   
   </project>
   ```

### Gradle の依存関係を更新してください。
<a name="troubleshooting-unsupported-kinesis-connectors-update-gradle-dependency"></a>

1. プロジェクト `build.gradle` (または Kotlin アプリケーションの場合は `build.gradle.kts` ) を開きます。

1. プロジェクトの依存関係を検索します。それらは以下のようになります。

   ```
   ...
   
   dependencies {
   
       ...
   
       implementation("org.apache.flink:flink-connector-kinesis")
   
       ...
   
   }
   
   ...
   ```

1. `flink-connector-kinesis` を1.15.2と同じバージョンまたはそれより新しいバージョンに更新します。例:

   ```
   ...
   
   dependencies {
   
       ...
   
       implementation("org.apache.flink:flink-connector-kinesis:1.15.2")
   
       ...
   
   }
   
   ...
   ```

# コンパイルエラー:「プロジェクトの依存関係を解決できませんでした」
<a name="troubleshooting-compile"></a>

Apache Flink 用マネージドサービスのサンプルアプリケーションをコンパイルするには、まず Apache Flink Kinesis コネクタをダウンロードしてコンパイルし、ローカルの Maven リポジトリに追加する必要があります。コネクタがリポジトリに追加されていない場合、次のようなコンパイルエラーが表示されます。

```
Could not resolve dependencies for project your project name: Failure to find org.apache.flink:flink-connector-kinesis_2.11:jar:1.8.2 in https://repo.maven.apache.org/maven2 was cached in the local repository, resolution will not be reattempted until the update interval of central has elapsed or updates are forced
```

このエラーを解決するには、コネクタの Apache Flink ソースコード (「[https://flink.apache.org/downloads.html](https://flink.apache.org/downloads.html)」からバージョン 1.8.2) をダウンロードする必要があります。Apache Flink ソースコードのダウンロード、コンパイル、インストールの方法については、 [Apache Flink Kinesis Streams コネクタを以前の Apache Flink バージョンで使用する](earlier.md#how-creating-apps-building-kinesis) を参照してください。

# 無効な選択肢:「kinesisanalyticsv2」
<a name="troubleshooting-cli-update"></a>

Apache Flink API 用 Managed Service の v2 を使用するには、「 AWS Command Line Interface 」(「AWS CLI」)の最新バージョンが必要です。

のアップグレードの詳細については AWS CLI、 *AWS Command Line Interface ユーザーガイド*[の「 AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/installing.html)のインストール」を参照してください。

# UpdateApplication アクションがアプリケーションコードをリロードしない
<a name="troubleshooting-update"></a>

「[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)」アクションは、S3 オブジェクトのバージョンが指定されていない場合、同じファイル名のアプリケーションコードをリロードしません。同じファイル名でアプリケーションコードをリロードするには、S3 バケットでバージョニングを有効にし、 `ObjectVersionUpdate` パラメータを使用して新しいオブジェクトバージョンを指定します。S3バケットでオブジェクトのバージョニングを有効にする方法の詳細については、「[バージョニングの有効化または無効化](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/enable-versioning.html)」を参照してください。

# S3 ストリーミングファイルシンク:ファイルが見つかりません (例外)
<a name="troubleshooting-s3sink"></a>

Apache Flink アプリケーション用 Managed Serviceでは、セーブポイントによって参照される進行中のパーツファイルが見つからない場合、スナップショットから開始すると、処理中のパーツファイル `FileNotFoundException` に遭遇する可能性があります。この障害モードが発生した場合、Apache Flink アプリケーション用 Managed Serviceのオペレータ状態は通常回復不能になり、 `SKIP_RESTORE_FROM_SNAPSHOT` を使用してスナップショットなしで再起動する必要があります。以下のスタックトレースの例を参照してください。

```
java.io.FileNotFoundException: No such file or directory: s3://amzn-s3-demo-bucket/pathj/INSERT/2023/4/19/7/_part-2-1234_tmp_12345678-1234-1234-1234-123456789012
        at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2231)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
        at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObject(HadoopS3AccessHelper.java:98)
        at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.recoverInProgressPart(S3RecoverableMultipartUploadFactory.java:97)
...
```

Flink `StreamingFileSink` は [File Systems](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/filesystems/overview/) がサポートするファイルシステムにレコードを書き込みます。受信ストリームは無制限であるため、データは有限サイズの部分ファイルに編成されて、データが書き込まれると新しいファイルが追加されます。パーツのライフサイクルとロールオーバーポリシーによって、パーツファイルのタイミング、サイズ、名前が決まります。

チェックポイントとセーブポイント (スナップショット) 中に、保留中のすべてのファイルの名前が変更され、コミットされます。ただし、処理中のパーツファイルはコミットされずに名前が変更され、その参照はチェックポイントまたはセーブポイントのメタデータ内に保持され、ジョブの復元時に使用されます。これらの処理中のパーツファイルは、最終的に Pending にロールオーバーされ、名前が変更され、後続のチェックポイントまたはセーブポイントによってコミットされます。

処理中のパーツファイルが見つからない場合の根本原因と緩和策は次のとおりです。
+ Apache Flink 用 Managed Service の起動に使用される古いスナップショット — Amazon S3 StreamingFileSink で Apache Flink 用 Managed Service を開始するには、アプリケーションの停止または更新時に作成された最新のシステムスナップショットのみを使用できます。このような障害を回避するには、最新のシステムスナップショットを使用してください。
  + たとえば、停止中または更新中に、システム・トリガーによるスナップショットではなく、 `CreateSnapshot` を使用して作成されたスナップショットを選択した場合に発生します。古いスナップショットのセーブポイントは、後続のチェックポイントまたはセーブポイントによって名前が変更されコミットされた、進行中のパーツファイルへの古い参照を保持します。
  + これは、システムがトリガーした最新の Stop/Update イベント以外のスナップショットが選択された場合にも発生する可能性があります。例えば、システム・スナップショットが無効になっているが、 `RESTORE_FROM_LATEST_SNAPSHOT` が設定されているアプリケーションです。一般的に、Amazon S3 StreamingFileSink を使用する Apache Flink アプリケーション用 Managed Service では、常にシステムスナップショットを有効にして `RESTORE_FROM_LATEST_SNAPSHOT` を設定する必要があります。
+ 進行中の部分ファイルの削除 — 処理中の部分ファイルは S3 バケットにあるため、バケットにアクセスできる他のコンポーネントやアクターによって削除される可能性があります。
  + これは、アプリを長時間停止していて、アプリのセーブポイントで参照されている進行中のパーツファイルが「[S3 バケットの MultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-abort-incomplete-mpu-lifecycle-config.html)」ライフサイクルポリシーによって削除された場合に発生する可能性があります。このような障害を回避するには、S3 Bucket MPU ライフサイクルポリシーがユースケースに十分に対応した期間を対象としていることを確認してください。
  + これは、処理中のパーツファイルが手動で削除された場合や、システムの別のコンポーネントによって削除された場合にも発生する可能性があります。このような不具合を回避するには、処理中のパーツファイルが他のアクターやコンポーネントによって削除されないようにしてください。
+ セーブポイントの後に自動チェックポイントがトリガーされる競合状態 — これは 1.13 以前の Apache Flink 用 Managed Service バージョンに影響します。この問題は Managed Service for Apache Flink バージョン 1.15 で修正されています。Managed Service for Apache Flink の最新バージョンにアプリケーションを移行して、再発を防ぎます。また、StreamingFileSink から「[FileSink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/filesystem/#file-sink)」に移行することもお勧めします。
  + アプリケーションが停止または更新されると、Apache Flink 用 Managed Service はセーブポイントをトリガーし、2 つのステップでアプリケーションを停止します。2 つのステップの間に自動チェックポイントがトリガーされると、処理中のパーツファイルの名前が変更され、コミットされる可能性があるため、セーブポイントは使用できなくなります。

# FlinkKafka: セーブポイントによる停止に関するコンシューマの問題
<a name="troubleshooting-FlinkKafkaConsumer"></a>

レガシー FlinkKafkaConsumer を使用している場合、システムスナップショットを有効にしていると、アプリケーションが更新、停止、またはスケーリングで動かなくなる可能性があります。この 「[問題](https://issues.apache.org/jira/browse/FLINK-28758)」 に対する修正は公開されていないため、この問題を軽減するために新しい 「[KafkaSource](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-source)」 にアップグレードすることをお勧めします。

スナップショットを有効にして `FlinkKafkaConsumer` を使用している場合、Flink ジョブが savepoint API リクエストで STOP を処理すると、 `ClosedException` がランタイムエラーで報告されて `FlinkKafkaConsumer` が失敗する可能性があります。このような状況では、Flink アプリケーションが停止し、チェックポイント失敗と表示されます。

# Flink 1.15 非同期シンクデッドロック
<a name="troubleshooting-async-deadlock"></a>

AsyncSink インターフェイスを実装する Apache Flink の AWS コネクタには[既知の問題](https://issues.apache.org/jira/browse/FLINK-32230)があります。これは、以下のコネクターで Flink 1.15 を使用するアプリケーションに影響します。
+ Java アプリケーションの場合:
  + KinesisStreamsSink — `org.apache.flink:flink-connector-kinesis`
  + KinesisStreamsSink — `org.apache.flink:flink-connector-aws-kinesis-streams`
  + KinesisFirehoseSink — `org.apache.flink:flink-connector-aws-kinesis-firehose`
  + DynamoDbSink — `org.apache.flink:flink-connector-dynamodb`
+ Flink SQL/テーブルAPI/Python アプリケーション:
  + Kinesis – `org.apache.flink:flink-sql-connector-kinesis`
  + Kinesis – `org.apache.flink:flink-sql-connector-aws-kinesis-streams`
  + firehose - `org.apache.flink:flink-sql-connector-aws-kinesis-firehose`
  + DynamoDB - `org.apache.flink:flink-sql-connector-dynamodb`

影響を受けるアプリケーションには次の症状があります。
+ Flink ジョブの `RUNNING` 状態は変わりませんが、データは処理されていません。
+ ジョブは再起動されません。
+ チェックポイントがタイムアウトしています。

この問題は、 AWS SDK の[バグ](https://github.com/aws/aws-sdk-java-v2/issues/4354)が原因で、非同期 HTTP クライアントの使用時に発信者に特定のエラーが発生しません。その結果、シンクはチェックポイントフラッシュ操作中に処理中のリクエストが完了するまで無期限に待機することになります。

この問題は、バージョン **2.20.144** 以降の AWS SDK で修正されました。

以下は、影響を受けるコネクタを更新してアプリケーションで新しいバージョンの AWS SDK を使用する方法の手順です。

**Topics**
+ [Java アプリケーションを更新する](troubleshooting-async-deadlock-update-java-apps.md)
+ [Python アプリケーションを更新する](troubleshooting-async-deadlock-update-python-apps.md)

# Java アプリケーションを更新する
<a name="troubleshooting-async-deadlock-update-java-apps"></a>

Java アプリケーションを更新するには、以下の手順に従います。

## フリンク・コネクター・キネシス
<a name="troubleshooting-async-deadlock-update-java-apps-flink-connector-kinesis"></a>

このアプリケーションは `flink-connector-kinesis` を使用します。

Kinesis コネクタはシェーディングを使用して、 AWS SDK を含む一部の依存関係をコネクタ jar にパッケージ化します。 AWS SDK バージョンを更新するには、次の手順を使用して、これらのシェードされたクラスを置き換えます。

------
#### [ Maven ]

1. プロジェクトの依存関係として Kinesis コネクタと必要な AWS SDK モジュールを追加します。

1. `maven-shade-plugin` を設定します。

   1. Kinesis コネクタ jar のコンテンツをコピーするときに、シェーディングされた AWS SDK クラスを除外するフィルターを追加します。

   1. Kinesis コネクタで予想されるように、更新された AWS SDK クラスをパッケージに移動する再配置ルールを追加します。

   **pom.xml** 

   ```
   <project>
       ...    
       <dependencies>
           ...
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-connector-kinesis</artifactId>
               <version>1.15.4</version>
           </dependency>
           
           <dependency>
               <groupId>software.amazon.awssdk</groupId>
               <artifactId>kinesis</artifactId>
               <version>2.20.144</version>
           </dependency>
           <dependency>
               <groupId>software.amazon.awssdk</groupId>
               <artifactId>netty-nio-client</artifactId>
               <version>2.20.144</version>
           </dependency>
           <dependency>
               <groupId>software.amazon.awssdk</groupId>
               <artifactId>sts</artifactId>
               <version>2.20.144</version>
           </dependency>
           ...
       </dependencies>
       ...
       <build>
           ...
           <plugins>
               ...
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-shade-plugin</artifactId>
                   <version>3.1.1</version>
                   <executions>
                       <execution>
                           <phase>package</phase>
                           <goals>
                               <goal>shade</goal>
                           </goals>
                           <configuration>
                               ...
                               <filters>
                                   ...
                                   <filter>
                                       <artifact>org.apache.flink:flink-connector-kinesis</artifact>
                                       <excludes>
                                           <exclude>org/apache/flink/kinesis/shaded/software/amazon/awssdk/**</exclude>
                                           <exclude>org/apache/flink/kinesis/shaded/org/reactivestreams/**</exclude>
                                           <exclude>org/apache/flink/kinesis/shaded/io/netty/**</exclude>
                                           <exclude>org/apache/flink/kinesis/shaded/com/typesafe/netty/**</exclude>
                                       </excludes>
                                   </filter>
                                   ...
                               </filters>
                               <relocations>
                                   ...
                                   <relocation>
                                       <pattern>software.amazon.awssdk</pattern>
                                       <shadedPattern>org.apache.flink.kinesis.shaded.software.amazon.awssdk</shadedPattern>
                                   </relocation>
                                   <relocation>
                                       <pattern>org.reactivestreams</pattern>
                                       <shadedPattern>org.apache.flink.kinesis.shaded.org.reactivestreams</shadedPattern>
                                   </relocation>
                                   <relocation>
                                       <pattern>io.netty</pattern>
                                       <shadedPattern>org.apache.flink.kinesis.shaded.io.netty</shadedPattern>
                                   </relocation>
                                   <relocation>
                                       <pattern>com.typesafe.netty</pattern>
                                       <shadedPattern>org.apache.flink.kinesis.shaded.com.typesafe.netty</shadedPattern>
                                   </relocation>
                                   ...
                               </relocations>
                              ...
                           </configuration>
                       </execution>
                   </executions>
               </plugin>
               ...
           </plugins>
           ... 
       </build>
   </project>
   ```

------
#### [ Gradle ]

1. プロジェクトの依存関係として Kinesis コネクタと必要な AWS SDK モジュールを追加します。

1. ShadowJar の設定を調整します。

   1. Kinesis コネクタ jar のコンテンツをコピーするときは、シェーディングされた AWS SDK クラスを除外します。

   1. 更新された AWS SDK クラスを Kinesis コネクタで想定されるパッケージに再配置します。

   「**グラドルをビルドする**」

   ```
   ...
   dependencies {
       ...
       flinkShadowJar("org.apache.flink:flink-connector-kinesis:1.15.4")
       
       flinkShadowJar("software.amazon.awssdk:kinesis:2.20.144")
       flinkShadowJar("software.amazon.awssdk:sts:2.20.144")
       flinkShadowJar("software.amazon.awssdk:netty-nio-client:2.20.144")
       ...
   }
   ...
   shadowJar {
       configurations = [project.configurations.flinkShadowJar]
   
       exclude("software/amazon/kinesis/shaded/software/amazon/awssdk/**/*")
       exclude("org/apache/flink/kinesis/shaded/org/reactivestreams/**/*.class")
       exclude("org/apache/flink/kinesis/shaded/io/netty/**/*.class")
       exclude("org/apache/flink/kinesis/shaded/com/typesafe/netty/**/*.class")
       
       relocate("software.amazon.awssdk", "org.apache.flink.kinesis.shaded.software.amazon.awssdk")
       relocate("org.reactivestreams", "org.apache.flink.kinesis.shaded.org.reactivestreams")
       relocate("io.netty", "org.apache.flink.kinesis.shaded.io.netty")
       relocate("com.typesafe.netty", "org.apache.flink.kinesis.shaded.com.typesafe.netty")
   }
   ...
   ```

------

## 影響を受けるその他のコネクター
<a name="troubleshooting-async-deadlock-update-java-apps-flink-another-connector"></a>

影響を受ける別のコネクタをアプリケーションで使用する場合:

 AWS SDK バージョンを更新するには、プロジェクトビルド設定で SDK バージョンを適用する必要があります。

------
#### [ Maven ]

 AWS SDK 部品表 (BOM) を`pom.xml`ファイルの依存関係管理セクションに追加して、プロジェクトの SDK バージョンを適用します。

**pom.xml**

```
<project>
    ...    
    <dependencyManagement>
        <dependencies>
            ...
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>bom</artifactId>
                <version>2.20.144</version>
                <scope>import</scope>
                <type>pom</type>
            </dependency>
            ...
        </dependencies>
    </dependencyManagement>
    ...
</project>
```

------
#### [ Gradle ]

 AWS SDK 部品表 (BOM) にプラットフォームの依存関係を追加して、プロジェクトの SDK バージョンを適用します。これには Gradle 5.0 以降が必要です。

「**グラドルをビルドする**」

```
...
dependencies {
    ...
    flinkShadowJar(platform("software.amazon.awssdk:bom:2.20.144"))
    ...
}
...
```

------

# Python アプリケーションを更新する
<a name="troubleshooting-async-deadlock-update-python-apps"></a>

Python アプリケーションでは、コネクタと他の Java 依存関係を単一の uber-jar の一部としてパッケージ化する方法と、コネクタ jar を直接使用する方法の 2 つの方法でコネクタを使用できます。Async Sink デッドロックの影響を受けるアプリケーションを修正するには:
+ アプリケーションが uber jar を使用している場合は、 [Java アプリケーションを更新する](troubleshooting-async-deadlock-update-java-apps.md) の指示に従ってください。
+ コネクタ JAR をソースから再構築するには、以下の手順に従います。

「**ソースからコネクタを構築:**」

「[Flink のビルド要件と同様の前提条件:](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/flinkdev/building/#build-flink)」
+ Java 11
+ Maven 3.2.5

## Flink-sql-コネクタ-キネシス
<a name="troubleshooting-async-deadlock-update-python-apps-flink-sql-connector-kinesis"></a>

1. Flink 1.15.4 のソースコードのダウンロード:

   ```
   wget https://archive.apache.org/dist/flink/flink-1.15.4/flink-1.15.4-src.tgz
   ```

1. ソースコードの解凍:

   ```
   tar -xvf flink-1.15.4-src.tgz
   ```

1. Kinesis コネクタディレクトリに移動します。

   ```
   cd flink-1.15.4/flink-connectors/flink-connector-kinesis/
   ```

1. コネクタ jar をコンパイルしてインストールし、必要な AWS SDK バージョンを指定します。`-DskipTests` ビルド時間を短縮するには、 `-Dfast` テスト実行をスキップして追加のソースコードチェックをスキップします。

   ```
   mvn clean install -DskipTests -Dfast -Daws.sdkv2.version=2.20.144
   ```

1. Kinesis コネクタディレクトリに移動します。

   ```
   cd ../flink-sql-connector-kinesis
   ```

1. SQL コネクタ jar をコンパイルしてインストールします。

   ```
   mvn clean install -DskipTests -Dfast
   ```

1. 作成された jar は次の場所で入手できます。

   ```
   target/flink-sql-connector-kinesis-1.15.4.jar
   ```

## フリンク-sql-コネクタ-aws-kinesis-ストリーム
<a name="troubleshooting-async-deadlock-update-python-apps-flink-sql-connector-aws-kinesis-streams"></a>

1. Flink 1.15.4 のソースコードのダウンロード:

   ```
   wget https://archive.apache.org/dist/flink/flink-1.15.4/flink-1.15.4-src.tgz
   ```

1. ソースコードの解凍:

   ```
   tar -xvf flink-1.15.4-src.tgz
   ```

1. Kinesis コネクタディレクトリに移動します。

   ```
   cd flink-1.15.4/flink-connectors/flink-connector-aws-kinesis-streams/
   ```

1. コネクタ jar をコンパイルしてインストールし、必要な AWS SDK バージョンを指定します。`-DskipTests` ビルド時間を短縮するには、 `-Dfast` テスト実行をスキップして追加のソースコードチェックをスキップします。

   ```
   mvn clean install -DskipTests -Dfast -Daws.sdk.version=2.20.144
   ```

1. Kinesis コネクタディレクトリに移動します。

   ```
   cd ../flink-sql-connector-aws-kinesis-streams
   ```

1. SQL コネクタ jar をコンパイルしてインストールします。

   ```
   mvn clean install -DskipTests -Dfast
   ```

1. 作成された jar は次の場所で入手できます。

   ```
   target/flink-sql-connector-aws-kinesis-streams-1.15.4.jar
   ```

## フリンク-sql-コネクタ-aws-kinesis-firehose
<a name="troubleshooting-async-deadlock-update-python-apps-flink-sql-connector-kinesis-firehose"></a>

1. Flink 1.15.4 のソースコードのダウンロード:

   ```
   wget https://archive.apache.org/dist/flink/flink-1.15.4/flink-1.15.4-src.tgz
   ```

1. ソースコードの解凍:

   ```
   tar -xvf flink-1.15.4-src.tgz
   ```

1. コネクタディレクトリに移動

   ```
   cd flink-1.15.4/flink-connectors/flink-connector-aws-kinesis-firehose/
   ```

1. コネクタ jar をコンパイルしてインストールし、必要な AWS SDK バージョンを指定します。`-DskipTests` ビルド時間を短縮するには、 `-Dfast` テスト実行をスキップして追加のソースコードチェックをスキップします。

   ```
   mvn clean install -DskipTests -Dfast -Daws.sdk.version=2.20.144
   ```

1. SQL コネクタディレクトリに移動します。

   ```
   cd ../flink-sql-connector-aws-kinesis-firehose
   ```

1. SQL コネクタ jar をコンパイルしてインストールします。

   ```
   mvn clean install -DskipTests -Dfast
   ```

1. 作成された jar は次の場所で入手できます。

   ```
   target/flink-sql-connector-aws-kinesis-firehose-1.15.4.jar
   ```

## flink-sql-connector-dynamodb
<a name="troubleshooting-async-deadlock-update-python-apps-flink-sql-connector-dynamodb"></a>

1. Flink 1.15.4 のソースコードのダウンロード:

   ```
   wget https://archive.apache.org/dist/flink/flink-connector-aws-3.0.0/flink-connector-aws-3.0.0-src.tgz
   ```

1. ソースコードの解凍:

   ```
   tar -xvf flink-connector-aws-3.0.0-src.tgz
   ```

1. コネクタディレクトリに移動

   ```
   cd flink-connector-aws-3.0.0
   ```

1. コネクタ jar をコンパイルしてインストールし、必要な AWS SDK バージョンを指定します。`-DskipTests` ビルド時間を短縮するには、 `-Dfast` テスト実行をスキップして追加のソースコードチェックをスキップします。

   ```
   mvn clean install -DskipTests -Dfast -Dflink.version=1.15.4 -Daws.sdk.version=2.20.144
   ```

1. 作成された jar は次の場所で入手できます。

   ```
   flink-sql-connector-dynamodb/target/flink-sql-connector-dynamodb-3.0.0.jar
   ```

# Amazon Kinesis Data Streams のソース処理がリシャーディング中に順序通りに処理されない
<a name="troubleshooting-kinesis-data-streams-processing-out-of-order"></a>

現在の FlinkKinesisConsumer 実装では、Kinesis シャード間の強力な順序保証は提供されていません。これにより、Kinesis Stream のリシャーディング時、特に処理遅延が発生している Flink アプリケーションでは、処理の順序が狂う可能性があります。たとえば、イベント時間に基づくウィンドウオペレーターのような状況下では、遅延が原因でイベントが破棄される可能性があります。

![\[Diagram showing shards and shard consumers with time progression and trim horizon.\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/flink-ts.png)


これはオープンソースの Flink の[既知の問題](https://issues.apache.org/jira/browse/FLINK-6349)です。コネクタの修正が利用可能になるまで、Flink アプリケーションが Kinesis データストリームのリパーティショニング中に遅れないようにすることが重要です。Flink アプリケーションが処理遅延を許容できるようにすることで、処理順序が狂った場合の影響とデータ損失のリスクを最小限に抑えることができます。

# リアルタイムベクトル埋め込みブループリントに関するよくある質問とトラブルシューティング
<a name="troubleshooting-blueprints"></a>

次のよくある質問とトラブルシューティングセクションを確認して、リアルタイムのベクトル埋め込みブループリントの問題をトラブルシューティングします。リアルタイムベクトル埋め込みブループリントの詳細については、「[Real-time vector embedding blueprints](https://docs.aws.amazon.com/msk/latest/developerguide/ai-vector-embedding-integration-learn-more.html)」を参照してください。

Managed Service for Apache Flink アプリケーションの一般的なトラブルシューティングについては、「[https://docs.aws.amazon.com/managed-flink/latest/java/troubleshooting-runtime.html](https://docs.aws.amazon.com/managed-flink/latest/java/troubleshooting-runtime.html)」を参照してください。

**Topics**
+ [リアルタイムベクトル埋め込みブループリント - よくある質問](troubleshooting-blueprints-FAQ.md)
+ [リアルタイムベクトル埋め込みブループリント - トラブルシューティング](troubleshooting-blueprints-TS.md)

# リアルタイムベクトル埋め込みブループリント - よくある質問
<a name="troubleshooting-blueprints-FAQ"></a>

リアルタイムベクトル埋め込みブループリントに関する次のよくある質問を確認します。リアルタイムベクトル埋め込みブループリントの詳細については、「[Real-time vector embedding blueprints](https://docs.aws.amazon.com/msk/latest/developerguide/ai-vector-embedding-integration-learn-more.html)」を参照してください。

**Topics**
+ [このブループリントはどのような AWS リソースを作成しますか?](#troubleshooting-blueprints-1)
+ [AWS CloudFormation スタックのデプロイが完了した後のアクションは何ですか?](#troubleshooting-blueprints-2)
+ [ソース Amazon MSK トピック内のデータ構造はどのようにすべきですか?](#troubleshooting-blueprints-3)
+ [埋め込むメッセージの一部を指定できますか?](#troubleshooting-blueprints-4)
+ [複数の Amazon MSK トピックからデータを読み取ることはできますか?](#troubleshooting-blueprints-5)
+ [正規表現を使用して Amazon MSK トピック名を設定できますか?](#troubleshooting-blueprints-6)
+ [Amazon MSK トピックから読み取ることができるメッセージの最大サイズを教えてください。](#troubleshooting-blueprints-7)
+ [どのタイプの OpenSearch がサポートされていますか?](#troubleshooting-blueprints-8)
+ [ベクトル検索コレクション、ベクトルインデックスを使用し、OpenSearch Serverless コレクションにベクトルフィールドを追加する必要があるのはなぜですか?](#troubleshooting-blueprints-9)
+ [ベクトルフィールドのディメンションとして設定すべきものは何ですか?](#troubleshooting-blueprints-10)
+ [設定された OpenSearch インデックスの出力はどのようなものですか?](#troubleshooting-blueprints-11)
+ [OpenSearch インデックスに保存されているドキュメントに追加するメタデータフィールドを指定できますか?](#troubleshooting-blueprints-12)
+ [OpenSearch インデックスに重複するエントリを想定すべきですか?](#troubleshooting-blueprints-13)
+ [複数の OpenSearch インデックスにデータを送信できますか?](#troubleshooting-blueprints-14)
+ [複数のリアルタイムベクトル埋め込みアプリケーションを 1 つの AWS アカウントにデプロイできますか?](#troubleshooting-blueprints-15)
+ [複数のリアルタイムベクトル埋め込みアプリケーションが同じデータソースまたはシンクを使用できますか?](#troubleshooting-blueprints-16)
+ [アプリケーションはクロスアカウント接続をサポートしていますか?](#troubleshooting-blueprints-17)
+ [アプリケーションはクロスリージョン接続をサポートしていますか?](#troubleshooting-blueprints-18)
+ [Amazon MSK クラスターと OpenSearch コレクションを異なる VPC またはサブネットに配置することはできますか?](#troubleshooting-blueprints-19)
+ [アプリケーションではどのような埋め込みモデルがサポートされていますか?](#troubleshooting-blueprints-20)
+ [ワークロードに基づいてアプリケーションのパフォーマンスをファインチューニングできますか?](#troubleshooting-blueprints-21)
+ [サポートされている Amazon MSK 認証タイプは何ですか?](#troubleshooting-blueprints-22)
+ [`sink.os.bulkFlushIntervalMillis` とは何ですか? どのように設定すればよいですか?](#troubleshooting-blueprints-23)
+ [Managed Service for Apache Flink アプリケーションをデプロイすると、Amazon MSK トピックのどの時点からメッセージの読み取りが開始されますか?](#troubleshooting-blueprints-24)
+ [`source.msk.starting.offset` の使用方法は?](#troubleshooting-blueprints-25)
+ [どのようなチャンク化戦略がサポートされていますか?](#troubleshooting-blueprints-26)
+ [ベクトルデータストアのレコードを読み取るにはどうすればよいですか?](#troubleshooting-blueprints-27)
+ [ソースコードの新しい更新はどこで確認できますか?](#troubleshooting-blueprints-28)
+ [AWS CloudFormation テンプレートを変更して Managed Service for Apache Flink アプリケーションを更新できますか?](#troubleshooting-blueprints-29)
+ [私の代わりにアプリケーションを AWS モニタリングおよび保守しますか?](#troubleshooting-blueprints-30)
+ [このアプリケーションはデータを AWS アカウントの外部に移動させますか?](#troubleshooting-blueprints-31)

## このブループリントはどのような AWS リソースを作成しますか?
<a name="troubleshooting-blueprints-1"></a>

アカウントにデプロイされたリソースを検索するには、 AWS CloudFormation コンソールに移動し、Managed Service for Apache Flink アプリケーションに指定した名前で始まるスタック名を特定します。**[リソース]** タブを選択して、スタックの一部として作成されたリソースを確認します。以下は、スタックが作成するキーリソースです。
+ リアルタイムベクトル埋め込みの Managed Service for Apache Flink アプリケーション
+ リアルタイムベクトル埋め込みアプリケーションのソースコードを保持するための Amazon S3 バケット
+ ログを保存するための CloudWatch のロググループとログストリーム
+ リソースをフェッチおよび作成するための Lambda 関数
+ Lambda 用の IAM ロールとポリシー、Managed Service for Apache Flink アプリケーション、Amazon Bedrock と Amazon OpenSearch Service へのアクセス
+ Amazon OpenSearch Service のデータアクセスポリシー
+ Amazon Bedrock と Amazon OpenSearch Service にアクセスするための VPC エンドポイント

## AWS CloudFormation スタックのデプロイが完了した後のアクションは何ですか?
<a name="troubleshooting-blueprints-2"></a>

 AWS CloudFormation スタックのデプロイが完了したら、 Managed Service for Apache Flink コンソールにアクセスし、設計図 Managed Service for Apache Flink アプリケーションを見つけます。**[構成]** タブを選択し、すべてのランタイムプロパティが正しく設定されていることを確認します。次のページにオーバーフローする可能性があります。設定に自信が持てたら、**[実行]** を選択します。アプリケーションはトピックからのメッセージの取り込みを開始します。

新しいリリースを確認するには、「[https://github.com/awslabs/real-time-vectorization-of-streaming-data/releases](https://github.com/awslabs/real-time-vectorization-of-streaming-data/releases)」を参照してください。

## ソース Amazon MSK トピック内のデータ構造はどのようにすべきですか?
<a name="troubleshooting-blueprints-3"></a>

現在、構造化ソースデータと非構造化ソースデータをサポートしています。
+ 非構造化データは、`source.msk.data.type` の `STRING` によって示されます。データは受信メッセージからそのまま読み込まれます。
+ 現在、`source.msk.data.type` で `JSON` が示す構造化 JSON データをサポートしています。データは常に JSON 形式である必要があります。アプリケーションが不正な形式の JSON を受信すると、アプリケーションは失敗します。
+ ソースデータ型として JSON を使用する場合は、すべてのソーストピックのすべてのメッセージが有効な JSON であることを確認します。この設定で JSON オブジェクトを含まないトピックを 1 つ以上サブスクライブすると、アプリケーションは失敗します。1 つ以上のトピックに構造化データと非構造化データが混在している場合は、Managed Service for Apache Flink アプリケーションでソースデータを非構造化として設定することをお勧めします。

## 埋め込むメッセージの一部を指定できますか?
<a name="troubleshooting-blueprints-4"></a>
+ `source.msk.data.type` が `STRING` である非構造化入力データの場合、アプリケーションは常にメッセージ全体を埋め込み、そのメッセージ全体を設定された OpenSearch インデックスに保存します。
+ `source.msk.data.type` が `JSON` である構造化入力データの場合、埋め込み用に JSON オブジェクトのどのフィールドを選択するかを指定するように `embed.input.config.json.fieldsToEmbed` を設定できます。これは最上位の JSON フィールドでのみ機能し、ネストされた JSON や JSON 配列を含むメッセージでは機能しません。「.\$1」を使用して JSON 全体を埋め込みます。

## 複数の Amazon MSK トピックからデータを読み取ることはできますか?
<a name="troubleshooting-blueprints-5"></a>

はい、このアプリケーションで複数の Amazon MSK トピックからデータを読み取ることができます。すべてのトピックのデータは同じタイプ (STRING または JSON) である必要があります。そうしないと、アプリケーションが失敗する可能性があります。すべてのトピックのデータは、常に 1 つの OpenSearch インデックスに保存されます。

## 正規表現を使用して Amazon MSK トピック名を設定できますか?
<a name="troubleshooting-blueprints-6"></a>

`source.msk.topic.names` は正規表現のリストをサポートしていません。トピック名のカンマ区切りリスト、またはすべてのトピックを含めるための `.*` 正規表現のいずれかをサポートしています。

## Amazon MSK トピックから読み取ることができるメッセージの最大サイズを教えてください。
<a name="troubleshooting-blueprints-7"></a>

処理できるメッセージの最大サイズは、Amazon Bedrock InvokeModel 本文制限によって制限されており、現在は 25,000,000 に設定されています。詳細については、「[InvokeModel](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html#API_runtime_InvokeModel_RequestBody)」を参照してください。

## どのタイプの OpenSearch がサポートされていますか?
<a name="troubleshooting-blueprints-8"></a>

OpenSearch ドメインとコレクションの両方がサポートされています。OpenSearch コレクションを使用している場合は、ベクトルコレクションを使用し、このアプリケーションに使用するベクトルインデックスを作成してください。これにより、OpenSearch ベクトルデータベース機能を使用してデータをクエリできます。詳細については、[「Amazon OpenSearch Service のベクトルデータベース機能の説明](https://aws.amazon.com/blogs/big-data/amazon-opensearch-services-vector-database-capabilities-explained/)」を参照してください。

## ベクトル検索コレクション、ベクトルインデックスを使用し、OpenSearch Serverless コレクションにベクトルフィールドを追加する必要があるのはなぜですか?
<a name="troubleshooting-blueprints-9"></a>

OpenSearch Serverless の*ベクトル検索*コレクションタイプは、スケーラブルで高性能な類似検索機能を提供します。最新の機械学習 (ML) を活用した検索エクスペリエンスや生成 AI アプリケーションの構築が効率化されます。詳細については、「[Working with vector search collections](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-vector-search.html?icmpid=docs_console_unmapped)」を参照してください。

## ベクトルフィールドのディメンションとして設定すべきものは何ですか?
<a name="troubleshooting-blueprints-10"></a>

使用する埋め込みモデルに基づいてベクトルフィールドのディメンションを設定します。次の表を参照して、それぞれのドキュメントからこれらの値を確認します。


**ベクトルフィールドディメンション**  

| Amazon Bedrock ベクトル埋め込みモデル名 | モデルが提供する出力ディメンションのサポート | 
| --- | --- | 
|  Amazon Titan Text Embeddings V1  | 1,536 | 
|  Amazon Titan Text Embeddings V2  | 1,024 (デフォルト)、384、256 | 
|  Amazon Titan Multimodal Embeddings G1  | 1,024 (デフォルト)、384、256 | 
|  Cohere Embed English  | 1,024 | 
|  Cohere Embed Multilingual  | 1,024 | 

## 設定された OpenSearch インデックスの出力はどのようなものですか?
<a name="troubleshooting-blueprints-11"></a>

OpenSearch インデックス内のすべてのドキュメントには、次のフィールドが含まれます。
+ **original\$1data**: 埋め込みの生成に使用されたデータ。STRING タイプの場合、それはメッセージ全体です。JSON オブジェクトの場合、埋め込みに使用された JSON オブジェクトです。メッセージ内の JSON 全体でも、JSON 内の指定されたフィールドでもかまいません。例えば、受信メッセージから埋め込まれるように名前を選択した場合、出力は次のようになります。

  ```
  "original_data": "{\"name\":\"John Doe\"}"
  ```
+ **embedded\$1data**: Amazon Bedrock によって生成された埋め込みのベクトル浮動小数点配列
+ **date**: ドキュメントが OpenSearch に保存された UTC タイムスタンプ

## OpenSearch インデックスに保存されているドキュメントに追加するメタデータフィールドを指定できますか?
<a name="troubleshooting-blueprints-12"></a>

いいえ。現在、OpenSearch インデックスに保存されている最終ドキュメントへのフィールドの追加はサポートされていません。

## OpenSearch インデックスに重複するエントリを想定すべきですか?
<a name="troubleshooting-blueprints-13"></a>

アプリケーションの設定方法によっては、インデックスに重複するメッセージが表示される場合があります。一般的な理由の 1 つは、アプリケーションの再起動です。デフォルトでは、アプリケーションはソーストピックの最も古いメッセージからの読み取りを開始するように設定されています。構成を変更すると、アプリケーションは再起動し、トピック内のすべてのメッセージを再処理します。再処理を回避するには、source.msk.starting.offset の使用方法に関するドキュメントを参照して、アプリケーションの開始オフセットを正しく設定します。

## 複数の OpenSearch インデックスにデータを送信できますか?
<a name="troubleshooting-blueprints-14"></a>

いいえ。このアプリケーションは、単一の OpenSearch インデックスへのデータの保存をサポートしています。複数のインデックスにベクトル化出力を設定するには、個別の Managed Service for Apache Flink アプリケーションをデプロイする必要があります。

## 複数のリアルタイムベクトル埋め込みアプリケーションを 1 つの AWS アカウントにデプロイできますか?
<a name="troubleshooting-blueprints-15"></a>

はい。すべてのアプリケーションに一意の名前がある場合、複数のリアルタイムベクトル埋め込み Managed Service for Apache Flink アプリケーションを 1 つの AWS アカウント にデプロイできます。

## 複数のリアルタイムベクトル埋め込みアプリケーションが同じデータソースまたはシンクを使用できますか?
<a name="troubleshooting-blueprints-16"></a>

はい。同じトピックからデータを読み取るか、同じインデックスにデータを保存する複数のリアルタイムベクトル埋め込み Managed Service for Apache Flink アプリケーションを作成できます。

## アプリケーションはクロスアカウント接続をサポートしていますか?
<a name="troubleshooting-blueprints-17"></a>

いいえ。アプリケーションが正常に実行されるには、Amazon MSK クラスターと OpenSearch コレクションが、Managed Service for Apache Flink アプリケーションをセットアップしようとしている AWS アカウント のと同じ にある必要があります。

## アプリケーションはクロスリージョン接続をサポートしていますか?
<a name="troubleshooting-blueprints-18"></a>

いいえ。アプリケーションでは、Amazon MSK クラスターと OpenSearch コレクションを使用して、Managed Service for Apache Flink アプリケーションの同じリージョンにのみ Managed Service for Apache Flink アプリケーションをデプロイできます。

## Amazon MSK クラスターと OpenSearch コレクションを異なる VPC またはサブネットに配置することはできますか?
<a name="troubleshooting-blueprints-19"></a>

はい。異なる VPC やサブネットにある Amazon MSK クラスターと OpenSearch コレクションでも、同じ AWS アカウント内であればサポートしています。セットアップが正しいことを確認するには、(一般的な MSF のトラブルシューティング) を参照してください。

## アプリケーションではどのような埋め込みモデルがサポートされていますか?
<a name="troubleshooting-blueprints-20"></a>

現在、アプリケーションは Bedrock でサポートされているすべてのモデルをサポートしています。具体的には次のとおりです。
+ Amazon Titan Embeddings G1 - Text
+  Amazon Titan Text Embeddings V2
+  Amazon Titan Multimodal Embeddings G1 
+  Cohere Embed English 
+  Cohere Embed Multilingual 

## ワークロードに基づいてアプリケーションのパフォーマンスをファインチューニングできますか?
<a name="troubleshooting-blueprints-21"></a>

はい。アプリケーションのスループットは、さまざまな要因によって異なります。これらはすべてお客様が制御できます。

1. **AWS MSF KPUs**: アプリケーションは、デフォルトの並列処理係数 2 と KPU 1 あたりの並列処理でデプロイされ、自動スケーリングが有効になっています。ただし、ワークロードに応じて Managed Service for Apache Flink アプリケーションのスケーリングを設定することをお勧めします。詳細については、「[Review Managed Service for Apache Flink application resources](https://docs.aws.amazon.com/managed-flink/latest/java/how-resources.html)」を参照してください。

1. **Amazon Bedrock**: 選択した Amazon Bedrock オンデマンドモデルに基づいて、異なるクォータが適用される場合があります。Bedrock のサービスクォータを確認して、サービスが処理できるワークロードを把握します。詳細については、「[Quotas for Amazon Bedrock](https://docs.aws.amazon.com/bedrock/latest/userguide/quotas.html)」を参照してください。

1. **Amazon OpenSearch Service**: さらに、状況によっては、OpenSearch がパイプラインのボトルネックであることに気付く場合があります。スケーリングの詳細については、OpenSearch のスケーリングの「[Sizing Amazon OpenSearch Service domains](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/sizing-domains.html)」を参照してください。

## サポートされている Amazon MSK 認証タイプは何ですか?
<a name="troubleshooting-blueprints-22"></a>

IAM MSK 認証タイプのみをサポートしています。

## `sink.os.bulkFlushIntervalMillis` とは何ですか? どのように設定すればよいですか?
<a name="troubleshooting-blueprints-23"></a>

Amazon OpenSearch Service にデータを送信するとき、一括フラッシュ間隔は、アクションの数やリクエストのサイズに関係なく、一括リクエストが実行される間隔を指します。デフォルト値は 1 ミリ秒に設定されています。

フラッシュ間隔を設定すると、データが適時にインデックス作成されるようにできますが、設定が低すぎるとオーバーヘッドが増加する可能性もあります。フラッシュ間隔を選択するときは、ユースケースとタイムリーなインデックス作成の重要性を考慮してください。

## Managed Service for Apache Flink アプリケーションをデプロイすると、Amazon MSK トピックのどの時点からメッセージの読み取りが開始されますか?
<a name="troubleshooting-blueprints-24"></a>

アプリケーションは、アプリケーションのランタイム構成で設定された `source.msk.starting.offset` 設定で指定されたオフセットで Amazon MSK トピックからのメッセージの読み取りを開始します。`source.msk.starting.offset` が明示的に設定されていない場合、アプリケーションのデフォルト動作は、トピック内で最も古い利用可能なメッセージから読み取りを開始します。

## `source.msk.starting.offset` の使用方法は?
<a name="troubleshooting-blueprints-25"></a>

目的の動作に基づいて、`ource.msk.starting.offset` を次のいずれかの値に明示的に設定します。


+  EARLIEST: デフォルト設定で、パーティション内の最も古いオフセットから読み取ります。これは、特に以下の場合に適しています。
  +  新しく作成した Amazon MSK トピックとコンシューマーアプリケーション。
  +  状態を構築または再構築するには、データを再生する必要があります。これは、イベントソーシングパターンを実装する場合や、データ履歴の完全なビューを必要とする新しいサービスを初期化する場合に当てはまります。
+ LATEST: Managed Service for Apache Flink アプリケーションは、パーティションの末尾からメッセージを読み込みます。このオプションは、新しく生成されるメッセージだけを扱い、過去のデータを処理する必要がない場合にお勧めします。この設定では、コンシューマーは既存のメッセージを無視し、アップストリームプロデューサーによって発行された新しいメッセージのみを読み込みます。
+ COMMITTED: Managed Service for Apache Flink アプリケーションは、コンシューマーグループのコミット済みオフセットからメッセージの消費を開始します。コミットされたオフセットが存在しない場合、EARLIEST リセット戦略が使用されます。

## どのようなチャンク化戦略がサポートされていますか?
<a name="troubleshooting-blueprints-26"></a>

[langchain](https://js.langchain.com/v0.1/docs/get_started/introduction/) ライブラリを使用して入力をチャンク化しています。チャンク化は、入力の長さが選択した `maxSegmentSizeInChars` より大きい場合にのみ適用されます。次の 5 つのチャンク化タイプがサポートされています。
+ `SPLIT_BY_CHARACTER`: 各チャンクには可能な限り文字を納めますが、チャンク長は maxSegmentSizeInChars を上限とします。空白文字を認識しないため、単語が途中で切れてしまうことがあります。
+ `SPLIT_BY_WORD`: 空白文字を見つけて、それを基準にチャンク化します。単語が途中で切れることはありません。
+ `SPLIT_BY_SENTENCE`: 文の境界は、Apache OpenNLP ライブラリの英語文モデルを用いて検出されます。
+ `SPLIT_BY_LINE`: 改行文字を検出し、それを基準にチャンク化します。
+ `SPLIT_BY_PARAGRAPH`: 連続する改行文字を検出し、それを基準にチャンク化します。

分割戦略は前の順序に従ってフォールバックし、`SPLIT_BY_PARAGRAPH` のようなより大きなチャンク化戦略は `SPLIT_BY_CHARACTER` にフォールバックします。例えば、`SPLIT_BY_LINE` を使用する場合、行が長すぎると、行が長すぎると、その行は文ごとにサブチャンク化され、各チャンクには可能な限り多くの文が収められます。長い文がある場合は、単語レベルでチャンク化されます。単語が長すぎると、文字単位で分割されます。

## ベクトルデータストアのレコードを読み取るにはどうすればよいですか?
<a name="troubleshooting-blueprints-27"></a>

1. `source.msk.data.type` が `STRING` の場合
   + **original\$1data**: Amazon MSK メッセージからの元の文字列全体。
   + **embedded\$1data**: 空でない場合 (チャンク化が適用された場合)、`chunk_data` から埋め込みベクトルが作成され、チャンク化が適用されていない場合は、`original_data` から埋め込みベクトルが作成されます。
   + **chunk\$1data**: 元のデータがチャンク化された場合にのみ存在します。`embedded_data` での埋め込みの作成に使用された元のメッセージのチャンクが含まれます。

1. `source.msk.data.type` が `JSON` の場合
   + **original\$1data**: JSON キーフィルタリングを適用した*後*の、Amazon MSK メッセージからの元の JSON 全体。
   + **embedded\$1data**: 空でない場合 (チャンク化が適用された場合)、`chunk_data` から埋め込みベクトルが作成され、チャンク化が適用されていない場合は、`original_data` から埋め込みベクトルが作成されます。
   + **chunk\$1key**: 元のデータがチャンク化された場合にのみ存在します。チャンクが `original_data` にある JSON キーが含まれます。例えば、`original_data` の例では、ネストされたキーまたは*メタデータ*の `jsonKey1.nestedJsonKeyA` ようになります。
   + **chunk\$1data**: 元のデータがチャンク化された場合にのみ存在します。`embedded_data` での埋め込みの作成に使用された元のメッセージのチャンクが含まれます。

はい、このアプリケーションで複数の Amazon MSK トピックからデータを読み取ることができます。すべてのトピックのデータは同じタイプ (STRING または JSON) である必要があります。そうしないと、アプリケーションが失敗する可能性があります。すべてのトピックのデータは、常に 1 つの OpenSearch インデックスに保存されます。

## ソースコードの新しい更新はどこで確認できますか?
<a name="troubleshooting-blueprints-28"></a>

「[https://github.com/awslabs/real-time-vectorization-of-streaming-data/releases](https://github.com/awslabs/real-time-vectorization-of-streaming-data/releases)」にアクセスして、新しいリリースを確認します。

## AWS CloudFormation テンプレートを変更して Managed Service for Apache Flink アプリケーションを更新できますか?
<a name="troubleshooting-blueprints-29"></a>

いいえ。 AWS CloudFormation テンプレートを変更しても、Managed Service for Apache Flink アプリケーションは更新されません。の新しい変更は AWS CloudFormation 、新しいスタックをデプロイする必要があることを意味します。

## 私の代わりにアプリケーションを AWS モニタリングおよび保守しますか?
<a name="troubleshooting-blueprints-30"></a>

いいえ。ユーザーに代わってこのアプリケーションをモニタリング、スケーリング、更新、またはパッチ AWS 適用しません。

## このアプリケーションはデータを AWS アカウントの外部に移動させますか?
<a name="troubleshooting-blueprints-31"></a>

Managed Service for Apache Flink アプリケーションによって読み取りおよび保存されるすべてのデータは、 内にとどまり AWS アカウント 、 アカウントを離れることはありません。

# リアルタイムベクトル埋め込みブループリント - トラブルシューティング
<a name="troubleshooting-blueprints-TS"></a>

リアルタイムベクトル埋め込みブループリントに関する以下のトラブルシューティングトピックを確認してください。リアルタイムベクトル埋め込みブループリントの詳細については、「[Real-time vector embedding blueprints](https://docs.aws.amazon.com/msk/latest/developerguide/ai-vector-embedding-integration-learn-more.html)」を参照してください。

**Topics**
+ [CloudFormation スタックのデプロイが失敗またはロールバックされました。修正するにはどうすればよいですか?](#troubleshooting-blueprints-deployment)
+ [アプリケーションが Amazon MSK トピックの先頭からメッセージの読み取りを開始しないようにします。何をすればよいですか?](#troubleshooting-blueprints-beginning)
+ [Managed Service for Apache Flink アプリケーションに問題があるかどうかを確認する方法と、それをデバッグする方法を教えてください。](#troubleshooting-blueprints-debug)
+ [Managed Service for Apache Flink アプリケーションでモニタリングする必要がある主要なメトリクスは何ですか?](#troubleshooting-blueprints-metrics)

## CloudFormation スタックのデプロイが失敗またはロールバックされました。修正するにはどうすればよいですか?
<a name="troubleshooting-blueprints-deployment"></a>
+ CFN スタックに移動して、スタック失敗の原因を確認します。これは、アクセス許可の欠落、 AWS リソース名の衝突などに関連している可能性があります。デプロイ失敗の根本原因を修正します。詳細については、「[CloudWatch トラブルシューティングガイド](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/troubleshooting.html#basic-ts-guide)」を参照してください。
+  [オプション] 1 つの VPC につきサービスごとに 1 つの VPC エンドポイントのみを使用できます。複数のリアルタイムベクトル埋め込みブループリントをデプロイして、同じ VPC 内の Amazon OpenSearch Service コレクションに書き込む場合、VPC エンドポイントを共有している可能性があります。これらは、VPC のアカウント内に既に存在している場合もあります。存在しない場合は、最初のリアルタイムベクトル埋め込みブループリントのスタックが Amazon Bedrock と Amazon OpenSearch Service の VPC エンドポイントを作成し、アカウントにデプロイされる他のスタックで利用されます。スタックが失敗した場合、そのスタックが Amazon Bedrock と Amazon OpenSearch Service の VPC エンドポイントを作成したかどうかを確認し、アカウント内の他の場所で使用されていない場合は削除します。VPC エンドポイントを削除する手順については、アプリケーションを安全に削除する方法に関するドキュメントを参照してください。
+ アカウント内の他のサービスやアプリケーションが、その VPC エンドポイントを使用している可能性があります。削除すると、他のサービスのネットワークが中断される可能性があります。これらのエンドポイントの削除には注意してください。

## アプリケーションが Amazon MSK トピックの先頭からメッセージの読み取りを開始しないようにします。何をすればよいですか?
<a name="troubleshooting-blueprints-beginning"></a>

目的の動作に応じて、次のいずれかの値に明示的に `source.msk.starting.offset` を設定する必要があります。
+ **最も古いオフセット**: パーティション内の最も古いオフセット。
+ **最新のオフセット**: コンシューマーはパーティションの末尾からメッセージを読み込みます。
+ **コミットオフセット**: コンシューマーがパーティション内で処理した最後のメッセージから読み取ります。

## Managed Service for Apache Flink アプリケーションに問題があるかどうかを確認する方法と、それをデバッグする方法を教えてください。
<a name="troubleshooting-blueprints-debug"></a>

「[Managed Service for Apache Flink トラブルシューティングガイド](https://docs.aws.amazon.com/managed-flink/latest/java/troubleshooting-runtime.html)」を使用して、アプリケーションに関する Managed Service for Apache Flink 関連の問題をデバッグします。

## Managed Service for Apache Flink アプリケーションでモニタリングする必要がある主要なメトリクスは何ですか?
<a name="troubleshooting-blueprints-metrics"></a>
+ 通常の Managed Service for Apache Flink アプリケーションで使用可能なすべてのメトリクスは、アプリケーションをモニタリングするのに役立ちます。詳細については、「[Metrics and dimensions in Managed Service for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/metrics-dimensions.html)」を参照してください。
+ Amazon Bedrock メトリクスをモニタリングするには、「[Amazon CloudWatch metrics for Amazon Bedrock](https://docs.aws.amazon.com/bedrock/latest/userguide/monitoring.html#runtime-cloudwatch-metrics)」を参照してください。
+ 埋め込みを生成するパフォーマンスを監視するために、新しいメトリクスを 2 つ追加しました。CloudWatch の `EmbeddingGeneration` オペレーション名で見つけられます。2 つのメトリクスは次のとおりです。
  + **BedrockTitanEmbeddingTokenCount**: Amazon Bedrock への 1 回のリクエストに存在するトークンの数。
  + **BedrockEmbeddingGenerationLatencyMs**: Amazon Bedrock からの埋め込み生成レスポンスの送受信にかかる時間をミリ秒単位でレポートします。
+ Amazon OpenSearch Service のサーバーレスコレクションでは、`IngestionDataRate` や `IngestionDocumentErrors` などのメトリクスを利用できます。詳細については、「[Monitoring OpenSearch Serverless with Amazon CloudWatch](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/monitoring-cloudwatch.html)」を参照してください。
+ OpenSearch でプロビジョニングされたメトリクスについては、「[Monitoring OpenSearch cluster metrics with Amazon CloudWatch](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/managedomains-cloudwatchmetrics.html)」を参照してください。

# ランタイムのトラブルシューティング
<a name="troubleshooting-runtime"></a>

このセクションには、Apache Flink アプリケーション用 Managed Serviceの実行時問題の診断と修正に関する情報が含まれています。

**Topics**
+ [トラブルシューティングツール](#troubleshooting-tools)
+ [アプリケーションに関する問題](troubleshooting-symptoms.md)
+ [アプリケーションが再起動している](troubleshooting-rt-restarts.md)
+ [スループットが遅すぎる](troubleshooting-rt-throughput.md)
+ [際限のない状態の成長](troubleshooting-rt-stateleaks.md)
+ [I/O バウンドオペレーター](troubleshooting-io-bound-operators.md)
+ [Kinesis データストリームからのアップストリームまたはソーススロットリング](troubleshooting-source-throttling.md)
+ [チェックポイント](troubleshooting-checkpoints.md)
+ [チェックポイントがタイムアウトしています。](troubleshooting-chk-timeout.md)
+ [Apache Beam アプリケーションのチェックポイント障害](troubleshooting-chk-failure-beam.md)
+ [バックプレッシャー](troubleshooting-backpressure.md)
+ [データスキュー機能](troubleshooting-data-skew.md)
+ [ステートスキュー機能](troubleshooting-state-skew.md)
+ [異なるリージョンのリソースと統合する](troubleshooting-resources-in-different-regions.md)

## トラブルシューティングツール
<a name="troubleshooting-tools"></a>

アプリケーションの問題を検出するための主要なツールは CloudWatch アラームです。CloudWatch アラームを使用すると、アプリケーションのエラーまたはボトルネック状態を示す CloudWatch メトリクスのしきい値を設定できます。推奨されるCloudWatchアラームについては、「[Amazon Managed Service for Apache Flink で CloudWatch アラームを使用する](monitoring-metrics-alarms.md)」を参照してください。

# アプリケーションに関する問題
<a name="troubleshooting-symptoms"></a>

このセクションには、Apache Flink アプリケーション用 Managed Serviceで発生する可能性のあるエラー状態に対する解決策が含まれています。

**Topics**
+ [アプリケーションが一時的な状態で停止している](#troubleshooting-rt-stuck)
+ [スナップショットの作成に失敗する](#troubleshooting-rt-snapshots)
+ [VPC 内のリソースにアクセスできない](#troubleshooting-rt-vpc)
+ [Amazon S3 バケットの書き込み時にデータが失われる](#troubleshooting-rt-s3)
+ [アプリケーションは RUNNING ステータスにありますが、データを処理していない](#troubleshooting-rt-processing)
+ [スナップショット、アプリケーション更新、またはアプリケーション停止エラー: InvalidApplicationConfigurationException](#troubleshooting-rt-appconfigexception)
+ [java.nio.File.noSuchFileException: /usr/local/openjdk-8/lib/security/cacerts](#troubleshooting-rt-fnf)

## アプリケーションが一時的な状態で停止している
<a name="troubleshooting-rt-stuck"></a>

アプリケーションが一時的なステータス (`STARTING`、`UPDATING`、`STOPPING` または `AUTOSCALING`) のままの場合は、「[StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)」アクションを使用して `Force` パラメータを`true`に設定してアプリケーションを停止できます。この `DELETING` ステータスのアプリケーションを強制停止することはできません。または、アプリケーションが `UPDATING` または `AUTOSCALING` ステータスの場合は、実行中の以前のバージョンにロールバックできます。アプリケーションをロールバックすると、前回成功したスナップショットの状態データがロードされます。アプリケーションにスナップショットがない場合、Apache Flink 用 Managed Serviceはロールバックリクエストを拒否します。アプリケーションのロールバックについて詳しくは、「[rollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html)」アクションを参照してください。

**注記**  
アプリケーションを強制停止すると、データが失われたり重複したりする可能性があります。アプリケーションの再起動時にデータが失われたり、データが重複して処理されたりするのを防ぐため、アプリケーションのスナップショットを頻繁に撮ることをお勧めします。

アプリケーションが停止する原因には次のようなものがあります。
+ 「**アプリケーションの状態が大きすぎる:**」アプリケーションの状態が大きすぎたり永続的すぎたりすると、チェックポイントまたはスナップショット操作中にアプリケーションが停止する可能性があります。アプリケーションの `lastCheckpointDuration` と `lastCheckpointSize` メトリックをチェックして、値が着実に増加しているか、または異常に高い値がないかを確認してください。
+ 「**アプリケーションコードが大きすぎる:**」アプリケーションの JAR ファイルが 512 MB 未満であることを確認してください。512 MB を超える JAR ファイルはサポートされていません。
+ 「**アプリケーションスナップショットの作成に失敗する:**」Managed Service for Apache Flinkが [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) または [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StopApplication.html) リクエスト中にアプリケーションのスナップショットを取得します。その後、サービスはこのスナップショット状態を使用し、更新されたアプリケーション設定を使用してアプリケーションを復元し、「1 回のみ」の処理セマンティクスを実現します。自動スナップショット作成が失敗した場合は、 [スナップショットの作成に失敗する](#troubleshooting-rt-snapshots) 以下を参照してください。
+ 「**スナップショットからの復元が失敗する:**」アプリケーションの更新でオペレータを削除または変更した後に、スナップショットから復元しようとした場合、スナップショットに欠落しているオペレータの状態データが含まれていると、復元はデフォルトで失敗します。さらに、アプリケーションは `STOPPED` または `UPDATING` ステータスのままになります。この動作を変更して復元を正常に行うには、アプリケーションの「[FlinkRunConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_FlinkRunConfiguration.html)」の「AllowNonRestoredState」パラメータを `true` に変更します。これにより、新しいプログラムにマッピングできない状態データを再開操作がスキップできます。
+ 「**アプリケーションの初期化に時間がかかる:**」Apache Flink 用 Managed Service fは、Flink ジョブの開始を待つ間、5 分間の内部タイムアウト (ソフト設定) を使用します。このタイムアウト内にジョブが開始されない場合、次のような CloudWatch ログが表示されます。

  ```
  Flink job did not start within a total timeout of 5 minutes for application: %s under account: %s
  ```

   上記のエラーが発生した場合、Flink ジョブの `main` メソッドで定義されている操作に 5 分以上かかっているため、Apache Flink のマネージドサービス側で Flink ジョブの作成がタイムアウトになります。Flink「**JobManager**」のログとアプリケーションコードをチェックして、 `main` メソッドにこのような遅延が予想されるかどうかを確認することをお勧めします。そうでない場合は、5 分以内に完了するように問題に対処する必要があります。

「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplications.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplications.html)」または「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplication.html)」のアクションを使用して申請状況を確認できます。

## スナップショットの作成に失敗する
<a name="troubleshooting-rt-snapshots"></a>

Apache Flink 用 Managed Serviceは、以下の状況ではスナップショットを作成できません。
+ アプリケーションがスナップショットの制限を超えました。スナップショットの上限は 1,000 件です。詳細については、「[スナップショットを使用してアプリケーションバックアップを管理する](how-snapshots.md)」を参照してください。
+ アプリケーションには、ソースまたはシンクにアクセスするアクセス許可がありません。
+ アプリケーションコードが正しく機能していない。
+ アプリケーションには他の構成上の問題も発生しています。

アプリケーションの更新中にスナップショットを作成しているとき、またはアプリケーションを停止しているときに例外が発生した場合は、アプリケーションの「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html)」の `SnapshotsEnabled` プロパティを `false` に設定して、リクエストを再試行してください。

アプリケーションのオペレータが適切にプロビジョニングされていないと、スナップショットが失敗する可能性があります。オペレータのパフォーマンスのチューニングについては、 [オペレータースケーリング](performance-improving.md#performance-improving-scaling-op) を参照してください。

アプリケーションが正常な状態に戻ったら、アプリケーションの `SnapshotsEnabled` プロパティを `true` に設定することをお勧めします。

## VPC 内のリソースにアクセスできない
<a name="troubleshooting-rt-vpc"></a>

アプリケーションが Amazon VPC 上で実行されている VPC を使用している場合は、以下を実行して、アプリケーションがそのリソースにアクセスできることを確認します。
+ CloudWatch ログをチェックして、次のエラーがないかどうかを確認します。このエラーは、アプリケーションが VPC 内のリソースにアクセスできないことを示しています。

  ```
  org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
  ```

  このエラーが表示された場合は、ルートテーブルが正しく設定されていることと、コネクタの接続設定が正しいことを確認してください。

  CloudWatch ログのセットアップと分析については、 [Amazon Managed Service for Apache Flink でのロギングとモニタリング](monitoring-overview.md) を参照してください。

## Amazon S3 バケットの書き込み時にデータが失われる
<a name="troubleshooting-rt-s3"></a>

Apache Flink バージョン 1.6.2 を使用して Amazon S3 バケットに出力を書き込むと、一部のデータが失われる可能性があります。Amazon S3 を直接出力に使用する場合は、サポートされている最新バージョンの Apache Flink を使用することをお勧めします。Apache Flink 1.6.2 を使用して Amazon S3 バケットに書き込むには、Firehose を使用することをお勧めします。Managed Service for Apache Flink で Firehose を使用する方法の詳細については、「[Firehose シンク](earlier.md#get-started-exercise-fh)」を参照してください。

## アプリケーションは RUNNING ステータスにありますが、データを処理していない
<a name="troubleshooting-rt-processing"></a>

アプリケーションのステータスは、「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplications.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplications.html)」または「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplication.html)」のアクションを使用して確認できます。アプリケーションが `RUNNING` ステータスに入ったがシンクにデータを書き込んでいない場合は、Amazon CloudWatch ログストリームをアプリケーションに追加することで問題をトラブルシューティングできます。詳細については、「[アプリケーション CloudWatch ロギングオプションを使用する](cloudwatch-logs.md#adding_cloudwatch)」を参照してください。ログストリームには、アプリケーションの問題のトラブルシューティングに使用できるメッセージが含まれています。

## スナップショット、アプリケーション更新、またはアプリケーション停止エラー: InvalidApplicationConfigurationException
<a name="troubleshooting-rt-appconfigexception"></a>

スナップショット操作中、またはアプリケーションの更新や停止など、スナップショットを作成する操作中に、次のようなエラーが発生することがあります。

```
An error occurred (InvalidApplicationConfigurationException) when calling the UpdateApplication operation: 

Failed to take snapshot for the application xxxx at this moment. The application is currently experiencing downtime. 
Please check the application's CloudWatch metrics or CloudWatch logs for any possible errors and retry the request. 
You can also retry the request after disabling the snapshots in the Managed Service for Apache Flink console or by updating 
the ApplicationSnapshotConfiguration through the AWS SDK
```

このエラーは、アプリケーションがスナップショットを作成できない場合に発生します。

スナップショット操作またはスナップショットを作成する操作中にこのエラーが発生した場合は、次の操作を実行してください。
+ アプリケーションのスナップショットを無効にします。これは、Apache Flink のマネージドサービスコンソールで行うことも、「[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)」アクションの `SnapshotsEnabledUpdate` パラメータを使用して行うこともできます。
+ スナップショットを作成できない理由を調べてください。詳細については、「[アプリケーションが一時的な状態で停止している](#troubleshooting-rt-stuck)」を参照してください。
+ アプリケーションが正常な状態に戻ったら、スナップショットを再度有効にします。

## java.nio.File.noSuchFileException: /usr/local/openjdk-8/lib/security/cacerts
<a name="troubleshooting-rt-fnf"></a>

SSL トラストストアの場所は以前のデプロイで更新されました。代わりに、以下の値を `ssl.truststore.location` パラメータに使用します

```
/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts
```

# アプリケーションが再起動している
<a name="troubleshooting-rt-restarts"></a>

アプリケーションが正常でない場合、その Apache Flink ジョブは繰り返し失敗して再起動します。このセクションでは、この状態の症状とトラブルシューティングの手順について説明します。

## 症状
<a name="troubleshooting-rt-restarts-symptoms"></a>

この状態では、次の症状が発生する可能性があります。
+ `FullRestarts` 指標はゼロではない。このメトリクスは、アプリケーションを起動してからアプリケーションのジョブが再開された回数を表します。
+ `Downtime` 指標はゼロではない。このメトリクスは、アプリケーションが `FAILING` または `RESTARTING` のステータスにあるミリ秒数を表します。
+ アプリケーションログには、`RESTARTING` または `FAILED` へのステータス変更が含まれます。以下の CloudWatch Logs Insights のクエリを使用して、これらのステータス変更についてアプリケーションログをクエリできます: [エラーの分析: アプリケーションタスク関連の障害](cloudwatch-logs-reading.md#cloudwatch-logs-reading-apps)。

## 原因と解決策
<a name="troubleshooting-rt-restarts-causes"></a>

次のような状況になると、アプリケーションが不安定になり、再起動を繰り返す可能性があります。
+ **オペレータが例外をスローしている:** アプリケーション内のオペレータの例外が処理されない場合、アプリケーションは (オペレータがその失敗を処理できないと解釈して) フェイルオーバーします。「一度だけ」の処理セマンティクスを維持するため、アプリケーションは最新のチェックポイントから再起動します。そのため、この再起動中は `Downtime` は 0 ではありません。これを防ぐには、アプリケーションコード内の再試行可能な例外をすべて処理することをお勧めします。

  この状態の原因は、アプリケーションの状態が `RUNNING` から `FAILED` に変更されていないかアプリケーションのログをクエリして調べることができます。詳細については、「[エラーの分析: アプリケーションタスク関連の障害](cloudwatch-logs-reading.md#cloudwatch-logs-reading-apps)」を参照してください。
+ **Kinesis Data Streams が適切にプロビジョニングされていない:** アプリケーションのソースまたはシンクが Kinesis データストリームの場合は、ストリームの [メトリクス](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html) に `ReadProvisionedThroughputExceeded` または `WriteProvisionedThroughputExceeded` エラーがないか確認してください。

  これらのエラーが表示される場合は、ストリームのシャード数を増やすことで Kinesis ストリームの利用可能なスループットを増やすことができます。詳細については、「[Kinesis データストリームで開いているシャードの数を変更するにはどうすればよいですか](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-streams-open-shards/)」を参照してください。
+ **他のソースまたはシンクが適切にプロビジョニングされていないか、使用できない:** アプリケーションがソースとシンクを正しくプロビジョニングしていることを確認してください。アプリケーションで使用されるソースまたはシンク (他の AWS サービス、外部ソースまたは送信先など) が適切にプロビジョニングされているか、読み取りまたは書き込みスロットリングが発生していないか、または定期的に使用できないことを確認します。

  依存するサービスでスループット関連の問題が発生している場合は、それらのサービスが利用できるリソースを増やすか、エラーや利用不能の原因を調査してください。
+ **オペレータが適切にプロビジョニングされていない:** アプリケーション内のいずれかのオペレータのスレッドのワークロードが正しく分散されていないと、オペレータが過負荷になり、アプリケーションがクラッシュする可能性があります。オペレータの並列処理のチューニングについては、「[オペレータースケーリングの適切な管理](performance-improving.md#performance-improving-scaling-op)」を参照してください。
+ **アプリケーションが DaemonException で失敗する:** 1.11 より前のバージョンの Apache Flink を使用している場合、このエラーはアプリケーションログに表示されます。0.14 以降の KPL バージョンを使用するには、Apache Flink の新しいバージョンへのアップグレードが必要な場合があります。
+ **TimeoutException、FlinkException、または RemoteTransportException でアプリケーションが失敗する:** これらのエラーは、タスクマネージャがクラッシュした場合にアプリケーションログに表示されることがあります。アプリケーションが過負荷になると、タスクマネージャーに CPU やメモリのリソースが圧迫され、タスクマネージャーが機能しなくなる可能性があります。

  これらのエラーは次のようになります。
  + `java.util.concurrent.TimeoutException: The heartbeat of JobManager with id xxx timed out`
  + `org.apache.flink.util.FlinkException: The assigned slot xxx was removed`
  + `org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager`

  この問題のトラブルシューティングを行うには、以下を確認します。
  + CloudWatch メトリクスをチェックして、CPU やメモリの使用量が異常に急増していないかを確認してください。
  + アプリケーションにスループットの問題がないかチェックしてください。詳細については、「[パフォーマンスの問題をトラブルシューティングする](performance-troubleshooting.md)」を参照してください。
  + アプリケーションログを調べて、アプリケーションコードで発生させている未処理の例外がないか調べてください。
+ **JaxBanNotationModule Not Found エラーでアプリケーションが失敗する:** このエラーは、アプリケーションが Apache Beam を使用しているが、正しい依存関係または依存バージョンがない場合に発生します。Apache Beam を使用する Apache Flink 用 Managed Serviceアプリケーションでは、以下のバージョンの依存関係を使用する必要があります。

  ```
  <jackson.version>2.10.2</jackson.version>
  ...
  <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-jaxb-annotations</artifactId>
      <version>2.10.2</version>
  </dependency>
  ```

  `jackson-module-jaxb-annotations` の正しいバージョンを明示的な依存関係として指定しないと、アプリケーションはそれを環境の依存関係から読み込み、バージョンが一致しないため、実行時にアプリケーションがクラッシュします。

  Apache Beam 向けの Apache Flink 用 Managed Service の使用に関する詳細については、[CloudFormation を使用するApache Beam を使用してアプリケーションを作成する](examples-beam.md) を参照してください。
+ 「**アプリケーションが java.io.IOException: ネットワークバッファの数が不十分で失敗する**」

  アプリケーションに十分なメモリがネットワークバッファに割り当てられていない場合に発生します。ネットワークバッファはサブタスク間の通信を容易にします。ネットワーク経由で送信する前にレコードを保存したり、受信データをレコードに分解してサブタスクに渡す前に保存したりするために使用されます。必要なネットワークバッファの数は、ジョブグラフの並列処理と複雑さに直接影響します。この問題を軽減する方法はいくつかあります。
  + `parallelismPerKpu` を低く設定することで、サブタスクやネットワーク・バッファごとに割り当てられるメモリーを増やすことができます。`parallelismPerKpu`を下げると KPU が増加し、したがってコストも増加することに注意してください。これを避けるには、並列処理を同じ係数だけ下げることで、同じ量の KPU を維持できます。
  + オペレータの数を減らすか、オペレータをチェーン化して必要なバッファの数を減らすことで、ジョブグラフを簡略化できます。
  + それ以外の場合は、https://aws.amazon.com/premiumsupport/ に連絡して、カスタムネットワークバッファー構成を依頼することもできます。

# スループットが遅すぎる
<a name="troubleshooting-rt-throughput"></a>

アプリケーションが受信ストリーミングデータを十分な速さで処理しないと、パフォーマンスが低下し、不安定になります。このセクションでは、この状態の症状とトラブルシューティングの手順について説明します。

## 症状
<a name="troubleshooting-rt-throughput-symptoms"></a>

この状態では、次の症状が発生する可能性があります。
+ アプリケーションのデータソースが Kinesis ストリームの場合、ストリームの `millisbehindLatest` メトリクスは継続的に増加します。
+ アプリケーションのデータソースが Amazon MSK クラスターの場合、クラスターのコンシューマーラグメトリクスは増え続けます。詳細については、「[Amazon MSK 開発者ガイド](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html)」の「[コンシューマーラグモニタリング](https://docs.aws.amazon.com/msk/latest/developerguide/consumer-lag.html)」を参照してください。
+ アプリケーションのデータソースが別のサービスまたはソースである場合は、入手可能なコンシューマーラグのメトリクスまたはデータを確認してください。

## 原因と解決策
<a name="troubleshooting-rt-throughput-causes"></a>

アプリケーションのスループットが遅くなる原因は多数あります。アプリケーションが入力に追いついていない場合は、次の点を確認してください。
+ スループットラグが急上昇し、その後次第に減少する場合は、アプリケーションが再起動しているかどうかを確認してください。アプリケーションは再起動中に入力の処理を停止するため、遅延が急増します。アプリケーションの障害については、「[アプリケーションが再起動している](troubleshooting-rt-restarts.md)」を参照してください。
+ スループットの遅延がずっと続く場合は、アプリケーションのパフォーマンスが最適化されているかどうかを確認してください。アプリケーションのパフォーマンスを最適化する方法については、 [パフォーマンスの問題をトラブルシューティングする](performance-troubleshooting.md) を参照してください。
+ スループットラグが急上昇しているのではなく継続的に増加していて、アプリケーションのパフォーマンスが最適化されている場合は、アプリケーションリソースを増やす必要があります。アプリケーションリソースを増やす方法については、 [アプリケーションスケーリングを実装する](how-scaling.md) を参照してください。
+ アプリケーションが別のリージョンの Kafka クラスターから読み取りを行ったり、コンシューマーラグが大きいにもかかわらず `FlinkKafkaConsumer` または `KafkaSource` がほとんどアイドル状態 (高い `idleTimeMsPerSecond` または低い `CPUUtilization`) になっている場合は、2097152 など `receive.buffer.byte` の値を増やすことができます。詳細については、「[カスタム MSK 設定](https://docs.aws.amazon.com/msk/latest/developerguide/msk-configuration-properties.html)」の高レイテンシー環境セクションを参照してください。

アプリケーションソースのスループットが低下したり、コンシューマーラグが増加したりする場合のトラブルシューティング手順については、 [パフォーマンスの問題をトラブルシューティングする](performance-troubleshooting.md) を参照してください。

# 際限のない状態の成長
<a name="troubleshooting-rt-stateleaks"></a>

アプリケーションが古い状態情報を適切に処理しないと、情報が継続的に蓄積され、アプリケーションのパフォーマンスや安定性の問題が発生します。このセクションでは、この状態の症状とトラブルシューティングの手順について説明します。

## 症状
<a name="troubleshooting-rt-stateleaks-symptoms"></a>

この状態では、次の症状が発生する可能性があります。
+ `lastCheckpointDuration` 指標は徐々に増加しているか、急上昇しています。
+ `lastCheckpointSize` 指標は徐々に増加しているか、急上昇しています。

## 原因と解決策
<a name="troubleshooting-rt-stateleaks-causes"></a>

次のような状況では、アプリケーションに状態データが蓄積される可能性があります。
+ アプリケーションが必要以上に長く状態データを保持している。
+ アプリケーションがウィンドウクエリを使用していて、時間が長すぎる。
+ ステートデータに TTL を設定していません。詳細については、「Apache Flink ドキュメント」の「[State Time-To-Live (TTL)](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl)」を参照してください。
+ Apache Beam バージョン 2.25.0 以降に依存するアプリケーションを実行している。主要な実験と値 `use_deprecated_read` で「[BeamApplicationPropertiesを拡張](https://docs.aws.amazon.com/managed-flink/latest/java/examples-beam.html#examples-beam-configure)」することにより、新バージョンの読み取り変換を拒否することができます。詳細については、[Apache Beam Documentation](https://beam.apache.org/blog/beam-2.25.0/#highlights) を参照してください。

アプリケーションがステートサイズの拡大に直面することがありますが、これは長期的には持続不可能です（結局、Flink アプリケーションは無期限に実行されます）。場合によっては、アプリケーションがデータをそのままの状態で保存していて、古い情報を適切にエージングアウトしていないことが原因であることもあります。しかし、Flink が提供できることに対して、単に理不尽な期待が寄せられることもあります。アプリケーションでは、数日から数週間に及ぶ長い時間枠にわたってアグリゲーションを使用することがあります。インクリメンタルな集計が可能な「[AggregateFunctions](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/#aggregatefunction)」を使用しない限り、Flink はウィンドウ全体のイベントをそのままの状態に保つ必要があります。

さらに、プロセス関数を使用してカスタムオペレータを実装する場合、アプリケーションはビジネスロジックで不要になったデータを状態から削除する必要があります。その場合は、「[ステートの有効期間](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl)」を利用して、処理時間に基づいてデータを自動的にエージングアウトできます。Apache Flink のマネージドサービスはインクリメンタルチェックポイントを使用しているため、ステート ttl は 「[RocksDB コンパクション](https://github.com/facebook/rocksdb/wiki/Compaction)」に基づいています。ステートサイズ (チェックポイントサイズで示される) が実際に減少するのを確認できるのは、コンパクション操作が行われた後だけです。特に 200 MB 未満のチェックポイントサイズでは、ステートの有効期限が切れることによってチェックポイントのサイズが減少することはほとんどありません。ただし、セーブポイントは古いデータを含まない状態のクリーンコピーに基づいているため、Apache Flink 用 Managed Serviceでスナップショットをトリガーして、古い状態を強制的に削除できます。

デバッグ目的では、チェックポイントのサイズが実際に減少または安定することをより迅速に検証する (そして ROCKSBS でのコンパクションの影響を避ける) ために、インクリメンタルチェックポイントを無効にするのが理にかなっています。ただし、これにはサービスチームへのチケットが必要です。

# I/O バウンドオペレーター
<a name="troubleshooting-io-bound-operators"></a>

データパス上の外部システムへの依存を避けた方がいいです。個々のイベントを充実させるために外部システムに問い合わせるよりも、参照データセットを状態にしておく方がはるかにパフォーマンスが高くなることが多いです。ただし、Amazon Sagemaker でホストされている機械学習モデルでイベントを充実させたい場合など、状態に簡単に移行できない依存関係がある場合もあります。

ネットワークを介して外部システムとやり取りするオペレーターはボトルネックになり、バックプレッシャーの原因となる可能性があります。機能の実装には「[AsyncIO](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio/)」を使用することを強くお勧めします。これにより、個々の呼び出しの待ち時間を短縮し、アプリケーション全体の処理速度が低下するのを防ぐことができます。

さらに、I/O バウンドオペレーターを使用するアプリケーションでは、Apache Flink アプリケーション用 Managed Service の「[ParallelismPerKPU](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ParallelismConfiguration.html)」設定を増やすことも意味があります。このコンフィギュレーションは、アプリケーションがKPU（Kinesis Processing Unit）ごとに実行できる並列サブタスクの数を記述します。この値をデフォルトの 1 からたとえば 4 に増やすと、アプリケーションは同じリソース (同じコスト) を利用しますが、並列度を 4 倍に拡張できます。これは I/O バインドアプリケーションには有効ですが、I/O バインドでないアプリケーションにはさらなるオーバーヘッドを引き起こします。

# Kinesis データストリームからのアップストリームまたはソーススロットリング
<a name="troubleshooting-source-throttling"></a>

**症状**: アプリケーションがアップストリームのソース Kinesis データストリームから `LimitExceededExceptions` を受信しています。

**考えられる原因**: Apache Flink ライブラリ Kinesis コネクタのデフォルト設定は、Kinesis データストリームソースから読み込むように設定されており、`GetRecords` 呼び出しごとにフェッチされるレコードの最大数は非常にアグレッシブなデフォルト設定になっています。Apache Flink はデフォルトで、`GetRecords` 呼び出しごとに 10,000 レコードを取得するように設定されています (この呼び出しはデフォルトで 200 ミリ秒ごとに行われます)。ただし、シャードごとの上限は 1,000 レコードのみです。

このデフォルトの動作により、Kinesis データストリームからデータを使用しようとするとスロットリングが発生することがあり、アプリケーションのパフォーマンスと安定性に影響が及びます。

これは、CloudWatch `ReadProvisionedThroughputExceeded` メトリクスをチェックし、このメトリックスがゼロより大きい期間持続していることで確認できます。

これは、Amazon Managed Service for Apache Flink アプリケーションの CloudWatch Logs でも、`LimitExceededException` エラーが続いているのを把握することで確認できます。

**解決策**: このシナリオを解決するために、次の 2 つの方法のいずれかを実行できます。
+ `GetRecords` 呼び出しごとに取得されるレコード数のデフォルト制限を下げる
+ Amazon Managed Service for Apache Flink アプリケーションでアダプティブリードを有効にします。アダプティブリード機能の詳細については、「[SHARD\$1USE\$1ADAPTIVE\$1READS](https://nightlies.apache.org/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.html#SHARD_USE_ADAPTIVE_READS)」を参照してください。

# チェックポイント
<a name="troubleshooting-checkpoints"></a>

チェックポイントは、アプリケーションの状態をフォールトトレラントに保つための Flink のメカニズムです。このメカニズムにより、ジョブが失敗した場合に Flink はオペレーターの状態を回復でき、アプリケーションには障害のない実行と同じセマンティクスが与えられます。Apache Flink 用 Managed Serviceでは、アプリケーションの状態は RocksDB に保存されます。RocksDB は組み込みのキー/バリューストアで、動作状態をディスク上に保持します。チェックポイントを取得すると、その状態は Amazon S3 にもアップロードされるため、ディスクが失われた場合でも、チェックポイントを使用してアプリケーションの状態を復元できます。

詳細については、「[状態スナップショットの仕組み](https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/fault_tolerance/#how-does-state-snapshotting-work)」を参照してください。

## チェックポイントステージ
<a name="troubleshooting-checkpointing-stages"></a>

Flink のチェックポインティングオペレーターサブタスクには、主に 5 つのステージがあります。
+ Waiting 「**Start Delay**」— Flink はストリームに挿入されたチェックポイントバリアを使用するため、このステージの時間はオペレータがチェックポイントバリアに到達するのを待つ時間です。
+ アライメント「**Alignment Duration**」 — この段階では、サブタスクは1つのバリアに達しましたが、他の入力ストリームからのバリアを待っています。
+ 同期チェックポイント 「**同期時間**」 — この段階は、サブタスクが実際にオペレータの状態のスナップショットを撮り、サブタスク上の他のすべてのアクティビティをブロックする段階です。
+ 非同期チェックポイント 「**非同期時間**」 — この段階の大部分は、Amazon S3 に状態をアップロードするサブタスクです。この段階では、サブタスクはブロックされなくなり、レコードを処理できるようになります。
+ 確認 — 通常は短い段階で、サブタスクがJobManager に承認を送信し、コミットメッセージ (Kafkaシンクなど) を実行するだけです。

 これらの各段階（確認は除く）は、Flink WebUIから入手できるチェックポイントの期間メトリックに対応しており、チェックポイントが長くなる原因の特定に役立ちます。

チェックポイントで利用できる各メトリックの正確な定義を確認するには、「[履歴タブ](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/checkpoint_monitoring/#history-tab)」を参照してください。

## 調査中
<a name="troubleshooting-checkpoints-investigating"></a>

長いチェックポイント期間を調査する場合、決定すべき最も重要なのはチェックポイントのボトルネック、つまりどのオペレーターとサブタスクがチェックポイントに最も時間がかかっているのか、そのサブタスクのどの段階に長時間かかっているのかを判断することです。これは、ジョブチェックポイントタスクの Flink WebUI を使用して確認できます。Flink の Web インターフェースには、チェックポイントの問題の調査に役立つデータや情報が表示されます。詳細については、「[チェックポイントの監視](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/checkpoint_monitoring/)」を参照してください。

 まず、Job グラフ内の各オペレータの「**エンドツーエンド期間**」を確認して、どのオペレータがチェックポイントに時間がかかっているかを判断し、さらに調査する必要があります。Flink のドキュメントによると、所要時間の定義は次のとおりです。

「トリガーのタイムスタンプから最新の確認応答までの期間（確認応答をまだ受け取っていない場合はn/a）。」 チェックポイントが完了するまでのこの終了までの期間は、チェックポイントを確認した最後のサブタスクによって決まります。「通常、この時間は 1 つのサブタスクが実際に状態をチェックポイントするのに必要な時間よりも長くなります。」

チェックポイントの他の時間でも、その時間がどこに費やされているかについて、より詳細な情報が得られます。

「**Sync Duration**」の値が大きい場合は、スナップショット作成中に何かが発生していることを示しています。この段階では `snapshotState()` がSnapshotState インターフェースを実装するクラスが呼び出されます。これはユーザーコードである可能性があるため、スレッドダンプはこれを調査するのに役立ちます。

「**非同期時間**」が長い場合は、Amazon S3 への状態のアップロードに多くの時間が費やされていると考えられます。これは、状態が大きい場合や、アップロードされる状態ファイルが多数ある場合に発生する可能性があります。このような場合は、アプリケーションがどのように状態を使用しているかを調べ、可能な限り Flink のネイティブデータ構造が使用されていることを確認する必要があります (「[Using Keyed State](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#using-keyed-state)」)。Apache Flink 用 Managed Service では、Amazon S3 の呼び出し回数を最小限に抑え、時間がかかりすぎないように Flink を設定します。以下は、オペレーターのチェックポイント統計の例です。前述のオペレータチェックポイント統計に比べて、「**非同期時間**」が比較的長いことがわかります。

![\[チェックポイント機能の調査\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/checkpoint.png)


「**Start Delay**」の値が大きい場合は、チェックポイントの障壁がオペレータに到達するのを待つ時間の大半が費やされていることがわかります。これは、アプリケーションがレコードを処理するのに時間がかかっていることを示しています。つまり、バリアがジョブグラフ内をゆっくりと流れているということです。これは通常、Job にバックプレッシャーがかかっている場合や、オペレーターが常に忙しい場合に発生します。以下は、2 番目の KeyedProcess オペレータがビジー状態になっている JobGraph の例です。

![\[チェックポイント機能の調査\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/checkpoint2.png)


Flink フレームグラフまたは TaskManager スレッドダンプを使用して、何がそんなに時間がかかっているのかを調べることができます。ボトルネックが特定されたら、フレームグラフまたはスレッドダンプを使用してさらに調査できます。

## スレッドダンプ
<a name="troubleshooting-checkpoints-investigating-thread-dumps"></a>

スレッドダンプは、フレームグラフよりもレベルが少し低いもう 1 つのデバッグツールです。スレッドダンプは、ある時点でのすべてのスレッドの実行状態を出力します。Flink は JVM スレッドダンプを受け取ります。これは Flink プロセス内のすべてのスレッドの実行状態です。スレッドの状態は、スレッドのスタックトレースといくつかの追加情報によって示されます。フレームグラフは、実際には複数のスタックトレースを短時間で連続して取得して構築されます。グラフはこれらのトレースを視覚化したもので、一般的なコードパスを簡単に識別できます。

```
"KeyedProcess (1/3)#0" prio=5 Id=1423 RUNNABLE
    at app//scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:154)
    at $line33.$read$$iw$$iw$ExpensiveFunction.processElement(<console>>19)
    at $line33.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:14)
    at app//org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at app//org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
    at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at app//org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    ...
```

上の図は、Flink UI から取得した単一スレッドのスレッドダンプのスニペットです。1 行目には、このスレッドに関する次のような一般的な情報が含まれています。
+ スレッド名「KeyedProcess (1/3) \$10」
+ スレッドの優先度「prio=5」
+ 一意のスレッド「Id=1423」
+ スレッド状態:「実行可能」

 通常、スレッドの名前からそのスレッドの一般的な目的に関する情報が得られます。オペレータースレッドはオペレータと同じ名前を持ち、それがどのサブタスクに関連しているかを示すので、オペレータースレッドは名前で識別できます。たとえば、「KeyedProcess (1/3) \$10」スレッドは「KeyedProcess」オペレータからのもので、1 番目 (3 つのうち) のサブタスクからのものです。

スレッドは、次に示す状態のいずれかになります。
+ NEW — スレッドは作成されましたが、まだ処理されていません。
+ RUNNABLE — スレッドは CPU 上で実行されています
+ BLOCKED — スレッドは別のスレッドがロックを解放するのを待っている
+ WAITING — スレッドは`wait()`、`join()`、または `park()` メソッドを使用して待機している
+ TIMED\$1WAITING — スレッドはスリープ、ウェイト、ジョイン、パークの各メソッドを使用して待機していますが、待機時間は最大です。

**注記**  
Flink 1.13 では、スレッドダンプ内の 1 つのスタックトレースの最大深度は 8 に制限されています。

**注記**  
スレッドダンプは読み取りが難しく、複数のサンプルを採取して手動で分析する必要があるため、Flink アプリケーションのパフォーマンス問題をデバッグする最後の手段はスレッドダンプです。できる限り、フレームグラフを使用するのが望ましいです。

### Flink のスレッドダンプです。
<a name="troubleshooting-checkpoints-investigating-thread-dumps-flink"></a>

Flink では、Flink UI の左側のナビゲーションバーで「**タスクマネージャ**」 オプションを選択し、特定のタスクマネージャーを選択して [スレッドダンプ] タブに移動すると、「**スレッドダンプ**」を実行できます。スレッドダンプは、ダウンロードしたり、お気に入りのテキストエディター（またはスレッドダンプアナライザー）にコピーしたり、Flink Web UIのテキストビュー内で直接分析したりできます（ただし、この最後のオプションは少し扱いにくい場合があります）。

どのタスクマネージャーを使用するかを判断するには、特定のオペレータを選択したときに「**TaskManagers**」タブのスレッドダンプを使用できます。これは、オペレータがオペレータのさまざまなサブタスクで実行されており、異なるタスクマネージャーでも実行できることを示しています。

![\[スレッドダンプを使用する\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/checkpoint4.png)


ダンプは複数のスタックトレースで構成されます。ただし、ダンプを調べるときには、オペレータに関連するものが最も重要です。オペレータースレッドにはオペレータと同じ名前があり、どのサブタスクに関連しているかがわかるので、これらは簡単に見つかります。たとえば、次のスタックトレースは「KeyedProcess」オペレータからのもので、最初のサブタスクです。

```
"KeyedProcess (1/3)#0" prio=5 Id=595 RUNNABLE
    at app//scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:155)
    at $line360.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:19)
    at $line360.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:14)
    at app//org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at app//org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
    at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at app//org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    ...
```

同じ名前のオペレータが複数あると混乱するかもしれませんが、オペレータに名前を付けることでこの問題を回避できます。例えば、次のようになります。

```
....
.process(new ExpensiveFunction).name("Expensive function")
```

## 「[フレームグラフ](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/)」
<a name="troubleshooting-checkpoints-investigating-flame-graphs"></a>

フレームグラフは、ターゲットコードのスタックトレースを視覚化する便利なデバッグツールです。これにより、最も頻繁に使用されるコードパスを特定できます。スタックトレースを何度もサンプリングして作成されます。フレームグラフの X 軸にはさまざまなスタックプロファイルが表示され、Y 軸にはスタックの深さとスタックトレースの呼び出しが表示されます。フレームグラフの 1 つの長方形はスタックフレームを表し、フレームの幅はスタック内での出現頻度を示します。フレームグラフとその使用方法の詳細については、「[フレームグラフ](https://www.brendangregg.com/flamegraphs.html)」を参照してください。

Flink では、オペレータを選択して FlameGraph タブを選択すると、Web UI からオペレータの「**フレームグラフ**」にアクセスできます。十分な数のサンプルが収集されると、フレームグラフが表示されます。以下は、チェックポイントまで時間がかかっていた ProcessFunction のフレームグラフです。

![\[フレームグラフを使用する\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/checkpoint3.png)


これは非常に単純なフレームグラフで、すべてのCPU時間が `processElement` ExpensiveFunctionオペレータ内の各ルックに費やされていることを示しています。また、コード内のどこで実行されているかを判断するのに役立つ行番号も表示されます。

# チェックポイントがタイムアウトしています。
<a name="troubleshooting-chk-timeout"></a>

アプリケーションが最適化されていなかったり、適切にプロビジョニングされていなかったりすると、チェックポイントが失敗する可能性があります。このセクションでは、この状態の症状とトラブルシューティングの手順について説明します。

## 症状
<a name="troubleshooting-chk-timeout-symptoms"></a>

アプリケーションのチェックポイントに障害が発生すると、`numberOfFailedCheckpoints` が 0 より大きくなります。

チェックポイントが失敗するのは、アプリケーションエラーなどの直接的な障害でも、アプリケーションリソース不足などの一時的な障害でもかまいません。アプリケーションログとメトリクスをチェックして、次の症状がないか調べてください。
+ コード内のエラー。
+ アプリケーションの依存サービスへのアクセス中にエラーが発生しました。
+ データのシリアル化中にエラーが発生しました。デフォルトのシリアライザーがアプリケーションデータをシリアル化できない場合、アプリケーションは失敗します。アプリケーションでカスタムシリアライザーを使用する方法については、「Apache Flink ドキュメント」の「[Data Types and Serialization](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/)」を参照してください。
+ メモリ不足のエラー
+ 以下の指標が急上昇または着実に増加しています。
  + `heapMemoryUtilization`
  + `oldGenerationGCTime`
  + `oldGenerationGCCount`
  + `lastCheckpointSize`
  + `lastCheckpointDuration`

チェックポイントの監視の詳細については、「Apache Flink ドキュメント」の「[Monitoring Checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/monitoring/checkpoint_monitoring/)」を参照してください。

## 原因と解決策
<a name="troubleshooting-chk-timeout-causes"></a>

アプリケーションログのエラーメッセージには、直接的な障害の原因が示されます。一時的な障害には以下の原因が考えられます。
+ アプリケーションの KPU プロビジョニングが不十分。アプリケーションのプロビジョニングを引き上げる方法については、 [アプリケーションスケーリングを実装する](how-scaling.md) を参照してください。
+ アプリケーションの状態サイズが大きすぎる。`lastCheckpointSize` メトリクスを使用してアプリケーションの状態サイズを監視できます。
+ アプリケーションの状態データはキー間で不均等に分散されます。アプリケーションで `KeyBy` オペレータを使用する場合は、受信データがキー間で均等に分割されていることを確認してください。ほとんどのデータが 1 つのキーに割り当てられていると、障害の原因となるボトルネックになります。
+ アプリケーションにメモリやガベージコレクションのバックプレッシャが発生しています。アプリケーションの`heapMemoryUtilization`、`oldGenerationGCTime`、`oldGenerationGCCount` の値が急上昇していないか、または着実に増加していないかを監視します。

# Apache Beam アプリケーションのチェックポイント障害
<a name="troubleshooting-chk-failure-beam"></a>

Beam アプリケーションが「[ShutdownSourcesAfterIdlems](https://beam.apache.org/documentation/runners/flink/#:~:text=shutdownSourcesAfterIdleMs)」を 0 ミリ秒に設定して構成されている場合、タスクが「FINISHED」状態になっているためにチェックポイントがトリガーされないことがあります。このセクションでは、この状態の症状と解決策について説明します。

## 症状
<a name="troubleshooting-chk-failure-beam-symptoms"></a>

Apache Flink アプリケーション用 Managed Service の CloudWatch logs に移動し、次のログメッセージが記録されているかどうかを確認します。次のログメッセージは、一部のタスクが完了したためにチェックポイントがトリガーされなかったことを示しています。

```
                {
                "locationInformation": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:888)",
                "logger": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator",
                "message": "Failed to trigger checkpoint for job your job ID since some tasks of job your job ID has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.",
                "threadName": "Checkpoint Timer",
                "applicationARN": your application ARN,
                "applicationVersionId": "5",
                "messageSchemaVersion": "1",
                "messageType": "INFO"
                }
```

一部のタスクが「FINISHED」状態になり、チェックポイントを設定できなくなった Flink ダッシュボードでも確認できます。

![\[「FINISHED」状態のタスク\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/beam_checkpoint_failure.png)


## 原因
<a name="troubleshooting-chk-failure-beam-causes"></a>

ShutdownSourcesAfterIdlems は、設定したミリ秒の間アイドル状態だったソースをシャットダウンするビーム設定変数です。ソースがシャットダウンされると、チェックポイント設定はできなくなります。これにより、「[チェックポイント障害](https://issues.apache.org/jira/browse/FLINK-2491)」が発生する可能性があります。

タスクが「FINISED」状態になる原因の 1 つは、ShutdownSourcesAfterIdlems が 0 ミリ秒に設定されている場合です。つまり、アイドル状態のタスクはすぐにシャットダウンされます。

## ソリューション
<a name="troubleshooting-chk-failure-beam-solution"></a>

タスクがすぐに「FINISHED」状態にならないようにするには、ShutdownSourcesAfterIdlems を Long.max\$1Value に設定します。これには 2 つの方法で実行できます。
+ オプション 1: Apache Flink 用 Managed Service のアプリケーション設定ページでビーム設定が設定されている場合は、新しいキー値のペアを追加して ShutDpwnSourcesAfterIdlems を次のように設定できます。  
![\[IdleMS 後のシャットダウンソースを Long.MAX_VALUE に設定します\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/beam_checkpoint_failure_solution.png)
+ オプション 2: JAR ファイルでビーム構成が設定されている場合は、ShutdownSourcesAfterIdlems を次のように設定できます。

  ```
                          FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class); // Initialize Beam Options object
  
                          options.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE); // set shutdownSourcesAfterIdleMs to Long.MAX_VALUE
                          options.setRunner(FlinkRunner.class);
  
                          Pipeline p = Pipeline.create(options); // attach specified options to Beam pipeline
  ```

# バックプレッシャー
<a name="troubleshooting-backpressure"></a>

Flink はバックプレッシャーを使用して個々のオペレーターの処理速度を調整します。

オペレーターは、さまざまな理由から、受信したメッセージ量の処理を続けるのに苦労することがあります。操作には、オペレータが使用できる量よりも多くの CPU リソースが必要となる場合があります。オペレータは I/O 操作が完了するまで待つ場合があります。オペレータがイベントを十分な速さで処理できない場合、処理速度が遅いオペレータに供給する上流のオペレータにバックプレッシャーがかかります。これにより、アップストリームのオペレーターの速度が低下し、バックプレッシャーがソースにさらに伝わり、ソースも速度を落とすことでアプリケーション全体のスループットに適応するようになります。バックプレッシャーとその仕組みについて詳しくは、「[Apache Flink™ がバックプレッシャーを処理する方法](https://www.ververica.com/blog/how-flink-handles-backpressure)」を参照してください。

アプリケーション内のどのオペレータが遅いのかを知ることで、アプリケーションのパフォーマンス問題の根本原因を理解するための重要な情報を得ることができます。バックプレッシャ情報は「[Flink ダッシュボードを通じて公開されます](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/monitoring/back_pressure/)」。処理速度が遅いオペレータを特定するには、シンクに最も近い背圧値が高いオペレータ (次の例ではオペレータ B) を探します。その場合、速度低下の原因となっているオペレータは、ダウンストリームのオペレータの 1 つ (この例ではオペレータ C) です。B はイベントをより速く処理できますが、実際の処理速度が遅いオペレータ C に出力を転送できないため、バックプレッシャーがかかっています。

```
A (backpressured 93%) -> B (backpressured 85%) -> C (backpressured 11%) -> D (backpressured 0%)
```

処理が遅いオペレータを特定したら、そのオペレータが遅い理由を理解するように努めてください。理由は無数にあるかもしれませんが、何が問題なのかが明らかではなく、解決までに何日ものデバッグとプロファイリングが必要な場合もあります。以下に、明らかで一般的な理由をいくつか挙げます。その一部を以下で詳しく説明します。
+ オペレータが、ネットワークコールなどの低速な I/O を実行している (代わりに AsyncIO の使用を検討してください)。
+ データに偏りがあり、1 人のオペレーターが他のオペレーターよりも多くのイベントを受信しています (Flink ダッシュボードの個々のサブタスク (つまり、同じオペレーターのインスタンス) の送受信メッセージ数を確認して確認してください。
+ リソースを大量に消費する（データ・スキューがない場合、CPU/メモリ・バウンドの作業ではスケールアウトを、I/Oバウンドの作業では `ParallelismPerKPU` を増やすことを検討する）。
+ オペレータへの広範囲なロギング (実稼働アプリケーションではロギングを最小限に抑えるか、代わりにデバッグ出力をデータストリームに送信することを検討してください)。

## 廃棄シンクによるスループットのテスト
<a name="troubleshooting-testing-throughput"></a>

「[Discarding Sink](https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.html)」は、アプリケーションの実行中に受信したすべてのイベントを単に無視します (シンクがないアプリケーションは実行に失敗します)。これは、スループットのテスト、プロファイリング、およびアプリケーションが適切にスケーリングされているかどうかの検証に非常に役立ちます。また、シンクがバックプレッシャーの原因になっているのか、それともアプリケーションなのかを検証するための、非常に実用的なサニティチェックでもあります (ただし、バックプレッシャのメトリクスをチェックするだけでも簡単でわかりやすい場合が多いです)。

アプリケーションのすべてのシンクを廃棄シンクに置き換え、本番データに似たデータを生成するモックソースを作成することで、特定の並列度設定におけるアプリケーションの最大スループットを測定できます。さらに、並列度を増やして、アプリケーションが適切にスケーリングされ、スループットが高くなると (データスキューなどにより) 発生するボトルネックがないことを確認できます。

# データスキュー機能
<a name="troubleshooting-data-skew"></a>

Flink アプリケーションはクラスター上で分散的に実行されます。Flink は複数のノードにスケールアウトするために、キー付きストリームの概念を採用しています。つまり、ストリームのイベントは、顧客 ID などの特定のキーに従って分割され、Flink はノードごとに異なるパーティションを処理できるということです。その後、「[キー付きウィンドウ](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/)」、「[プロセス関数](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function/)」、「[非同期 I/O](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio/)」など、多くの Flink オペレータがこれらのパーティションに基づいて評価されます。

パーティションキーの選択は、ビジネスロジックによって決まることがよくあります。同時に、「[DynamoDB](https://aws.amazon.com/dynamodb/)」や Spark などのベストプラクティスの多くが Flink にも同様に適用されます。たとえば、次のようなものがあります。
+ パーティションキーのカーディナリティを高く保つこと
+ パーティション間のイベントボリュームの偏りを回避

 Flink ダッシュボードでサブタスク (つまり、同じオペレータのインスタンス) の送受信レコードを比較することで、パーティション内のスキューを特定できます。さらに、Apache Flink 用 Managed Service モニタリングでは、`numRecordsIn/Out` と `numRecordsInPerSecond/OutPerSecond` のメトリクスをサブタスク・レベルでも公開するように設定できます。

# ステートスキュー機能
<a name="troubleshooting-state-skew"></a>

ステートフルオペレータ、つまりウィンドウなどのビジネスロジックの状態を維持するオペレータの場合、データスキューは常にステートスキューにつながります。サブタスクの中には、データに偏りがあるために他のサブタスクよりも多くのイベントを受け取り、そのため状態を維持するデータも多くなるものがあります。ただし、パーティションのバランスが均等なアプリケーションでも、その状態で保持されるデータの量には偏りがある可能性があります。たとえば、セッションウィンドウでは、一部のユーザーとセッションがそれぞれ他のユーザーよりもずっと長くなることがあります。長いセッションが同じパーティションに属していると、同じオペレーターの異なるサブタスクが保持するステートサイズのバランスが崩れてしまう可能性があります。

 ステートスキューは、個々のサブタスクに必要なメモリとディスクリソースを増やすだけでなく、アプリケーション全体のパフォーマンスを低下させる可能性もあります。アプリケーションがチェックポイントまたはセーブポイントを取得しているとき、オペレータの状態は Amazon S3 に保持され、ノードまたはクラスターの障害から状態を保護します。このプロセスの間 (特に Apache Flink 用 Managed Serviceでデフォルトで有効になっている 1 回限りのセマンティクスの場合)、チェックポイント/セーブポイントが完了するまで、外部から処理が停止します。データに偏りがある場合、操作を完了するまでの時間は、特に大量の状態を蓄積した 1 つのサブタスクによって制限される可能性があります。極端なケースでは、1 つのサブタスクが状態を維持できないことが原因で、チェックポイントやセーブポイントの取得に失敗することがあります。

 データスキューと同様に、ステートスキューはアプリケーションの処理速度を大幅に低下させる可能性があります。

 ステートスキューを特定するには、Flink ダッシュボードを活用できます。最近のチェックポイントまたはセーブポイントを見つけて、詳細内の個々のサブタスクに保存されているデータ量を比較します。

# 異なるリージョンのリソースと統合する
<a name="troubleshooting-resources-in-different-regions"></a>

Flink 設定のクロスリージョンレプリケーションに必要な設定により、Apache Flink アプリケーション用 Managed Serviceとは異なるリージョンの Amazon S3 バケットへの書き込みに `StreamingFileSink` を使用できるようになります。そのためには、「[AWS サポート Center](https://console.aws.amazon.com/support/home#/)」にサポートチケットを提出してください。