

# Runtime troubleshooting
<a name="troubleshooting-runtime"></a>

This section contains information about diagnosing and fixing runtime issues with your Managed Service for Apache Flink application.

**Topics**
+ [Troubleshooting tools](#troubleshooting-tools)
+ [Application issues](troubleshooting-symptoms.md)
+ [Application is restarting](troubleshooting-rt-restarts.md)
+ [Throughput is too slow](troubleshooting-rt-throughput.md)
+ [Unbounded state growth](troubleshooting-rt-stateleaks.md)
+ [I/O bound operators](troubleshooting-io-bound-operators.md)
+ [Upstream or source throttling from a Kinesis data stream](troubleshooting-source-throttling.md)
+ [Checkpoints](troubleshooting-checkpoints.md)
+ [Checkpointing is timing out](troubleshooting-chk-timeout.md)
+ [Checkpoint failure for Apache Beam application](troubleshooting-chk-failure-beam.md)
+ [Backpressure](troubleshooting-backpressure.md)
+ [Data skew](troubleshooting-data-skew.md)
+ [State skew](troubleshooting-state-skew.md)
+ [Integrate with resources in different Regions](troubleshooting-resources-in-different-regions.md)

## Troubleshooting tools
<a name="troubleshooting-tools"></a>

The primary tool for detecting application issues is CloudWatch alarms. Using CloudWatch alarms, you can set thresholds for CloudWatch metrics that indicate error or bottleneck conditions in your application. For information about recommended CloudWatch alarms, see [Use CloudWatch Alarms with Amazon Managed Service for Apache Flink](monitoring-metrics-alarms.md).

# Application issues
<a name="troubleshooting-symptoms"></a>

This section contains solutions for error conditions that you may encounter with your Managed Service for Apache Flink application.

**Topics**
+ [Application is stuck in a transient status](#troubleshooting-rt-stuck)
+ [Snapshot creation fails](#troubleshooting-rt-snapshots)
+ [Cannot access resources in a VPC](#troubleshooting-rt-vpc)
+ [Data is lost when writing to an Amazon S3 bucket](#troubleshooting-rt-s3)
+ [Application is in the RUNNING status but isn't processing data](#troubleshooting-rt-processing)
+ [Snapshot, application update, or application stop error: InvalidApplicationConfigurationException](#troubleshooting-rt-appconfigexception)
+ [java.nio.file.NoSuchFileException: /usr/local/openjdk-8/lib/security/cacerts](#troubleshooting-rt-fnf)

## Application is stuck in a transient status
<a name="troubleshooting-rt-stuck"></a>

If your application stays in a transient status (`STARTING`, `UPDATING`, `STOPPING`, or `AUTOSCALING`), you can stop your application by using the [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) action with the `Force` parameter set to `true`. You can't force stop an application in the `DELETING` status. Alternatively, if the application is in the `UPDATING` or `AUTOSCALING` status, you can roll it back to the previous running version. When you roll back an application, it loads state data from the last successful snapshot. If the application has no snapshots, Managed Service for Apache Flink rejects the rollback request. For more information about rolling back an application, see [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) action.

**Note**  
Force-stopping your application may lead to data loss or duplication. To prevent data loss or duplicate processing of data during application restarts, we recommend you to take frequent snapshots of your application.

Causes for stuck applications include the following:
+ **Application state is too large:** Having an application state that is too large or too persistent can cause the application to become stuck during a checkpoint or snapshot operation. Check your application's `lastCheckpointDuration` and `lastCheckpointSize` metrics for steadily increasing values or abnormally high values.
+ **Application code is too large:** Verify that your application JAR file is smaller than 512 MB. JAR files larger than 512 MB are not supported.
+ **Application snapshot creation fails:** Managed Service for Apache Flink takes a snapshot of the application during an [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) or [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StopApplication.html) request. The service then uses this snapshot state and restores the application using the updated application configuration to provide *exactly-once* processing semantics.If automatic snapshot creation fails, see [Snapshot creation fails](#troubleshooting-rt-snapshots) following.
+ **Restoring from a snapshot fails:** If you remove or change an operator in an application update and attempt to restore from a snapshot, the restore will fail by default if the snapshot contains state data for the missing operator. In addition, the application will be stuck in either the `STOPPED` or `UPDATING` status. To change this behavior and allow the restore to succeed, change the *AllowNonRestoredState* parameter of the application's [FlinkRunConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_FlinkRunConfiguration.html) to `true`. This will allow the resume operation to skip state data that cannot be mapped to the new program.
+ **Application initialization taking longer:** Managed Service for Apache Flink uses an internal timeout of 5 minutes (soft setting) while waiting for a Flink job to start. If your job is failing to start within this timeout, you will see a CloudWatch log as follows:

  ```
  Flink job did not start within a total timeout of 5 minutes for application: %s under account: %s
  ```

   If you encounter the above error, it means that your operations defined under Flink job’s `main` method are taking more than 5 minutes, causing the Flink job creation to time out on the Managed Service for Apache Flink end. We suggest you check the Flink **JobManager** logs as well as your application code to see if this delay in the `main` method is expected. If not, you need to take steps to address the issue so it completes in under 5 minutes. 

You can check your application status using either the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplications.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplications.html) or the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplication.html) actions.

## Snapshot creation fails
<a name="troubleshooting-rt-snapshots"></a>

The Managed Service for Apache Flink service can't take a snapshot under the following circumstances:
+ The application exceeded the snapshot limit. The limit for snapshots is 1,000. For more information, see [Manage application backups using snapshots](how-snapshots.md).
+ The application doesn't have permissions to access its source or sink.
+ The application code isn't functioning properly.
+ The application is experiencing other configuration issues.

If you get an exception while taking a snapshot during an application update or while stopping the application, set the `SnapshotsEnabled` property of your application's [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html) to `false` and retry the request. 

Snapshots can fail if your application's operators are not properly provisioned. For information about tuning operator performance, see [Operator scaling](performance-improving.md#performance-improving-scaling-op).

After the application returns to a healthy state, we recommend that you set the application's `SnapshotsEnabled` property to `true`.

## Cannot access resources in a VPC
<a name="troubleshooting-rt-vpc"></a>

If your application uses a VPC running on Amazon VPC, do the following to verify that your application has access to its resources:
+ Check your CloudWatch logs for the following error. This error indicates that your application cannot access resources in your VPC:

  ```
  org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
  ```

  If you see this error, verify that your route tables are set up correctly, and that your connectors have the correct connection settings.

  For information about setting up and analyzing CloudWatch logs, see [Logging and monitoring in Amazon Managed Service for Apache Flink](monitoring-overview.md).

## Data is lost when writing to an Amazon S3 bucket
<a name="troubleshooting-rt-s3"></a>

Some data loss might occur when writing output to an Amazon S3 bucket using Apache Flink version 1.6.2. We recommend using the latest supported version of Apache Flink when using Amazon S3 for output directly. To write to an Amazon S3 bucket using Apache Flink 1.6.2, we recommend using Firehose. For more information about using Firehose with Managed Service for Apache Flink, see [Firehose sink](earlier.md#get-started-exercise-fh).

## Application is in the RUNNING status but isn't processing data
<a name="troubleshooting-rt-processing"></a>

You can check your application status by using either the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplications.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplications.html) or the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplication.html) actions. If your application enters the `RUNNING` status but isn't writing data to your sink, you can troubleshoot the issue by adding an Amazon CloudWatch log stream to your application. For more information, see [Work with application CloudWatch logging options](cloudwatch-logs.md#adding_cloudwatch). The log stream contains messages that you can use to troubleshoot application issues.

## Snapshot, application update, or application stop error: InvalidApplicationConfigurationException
<a name="troubleshooting-rt-appconfigexception"></a>

An error similar to the following might occur during a snapshot operation, or during an operation that creates a snapshot, such as updating or stopping an application:

```
An error occurred (InvalidApplicationConfigurationException) when calling the UpdateApplication operation: 

Failed to take snapshot for the application xxxx at this moment. The application is currently experiencing downtime. 
Please check the application's CloudWatch metrics or CloudWatch logs for any possible errors and retry the request. 
You can also retry the request after disabling the snapshots in the Managed Service for Apache Flink console or by updating 
the ApplicationSnapshotConfiguration through the AWS SDK
```

This error occurs when the application is unable to create a snapshot. 

If you encounter this error during a snapshot operation or an operation that creates a snapshot, do the following:
+ Disable snapshots for your application. You can do this either in the Managed Service for Apache Flink console, or by using the `SnapshotsEnabledUpdate` parameter of the [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) action.
+ Investigate why snapshots cannot be created. For more information, see [Application is stuck in a transient status](#troubleshooting-rt-stuck).
+ Reenable snapshots when the application returns to a healthy state.

## java.nio.file.NoSuchFileException: /usr/local/openjdk-8/lib/security/cacerts
<a name="troubleshooting-rt-fnf"></a>

The location of the SSL truststore was updated in a previous deployment. Use the following value for the `ssl.truststore.location` parameter instead:

```
/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts
```

# Application is restarting
<a name="troubleshooting-rt-restarts"></a>

If your application is not healthy, its Apache Flink job continually fails and restarts. This section describes symptoms and troubleshooting steps for this condition.

## Symptoms
<a name="troubleshooting-rt-restarts-symptoms"></a>

This condition can have the following symptoms:
+ The `FullRestarts` metric is not zero. This metric represents the number of times the application's job has restarted since you started the application.
+ The `Downtime` metric is not zero. This metric represents the number of milliseconds that the application is in the `FAILING` or `RESTARTING` status.
+ The application log contains status changes to `RESTARTING` or `FAILED`. You can query your application log for these status changes using the following CloudWatch Logs Insights query: [Analyze errors: Application task-related failures](cloudwatch-logs-reading.md#cloudwatch-logs-reading-apps).

## Causes and solutions
<a name="troubleshooting-rt-restarts-causes"></a>

The following conditions may cause your application to become unstable and repeatedly restart:
+ **Operator is throwing an exception:** If any exception in an operator in your application is unhandled, the application fails over (by interpreting that the failure cannot be handled by operator). The application restarts from the latest checkpoint to maintain "exactly-once" processing semantics. As a result, `Downtime` is not zero during these restart periods. In order to prevent this from happening, we recommend that you handle any retryable exceptions in the application code.

  You can investigate the causes of this condition by querying your application logs for changes from your application's state from `RUNNING` to `FAILED`. For more information, see [Analyze errors: Application task-related failures](cloudwatch-logs-reading.md#cloudwatch-logs-reading-apps).
+ **Kinesis data streams are not properly provisioned:** If a source or sink for your application is a Kinesis data stream, check the [metrics](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html) for the stream for `ReadProvisionedThroughputExceeded` or `WriteProvisionedThroughputExceeded` errors.

  If you see these errors, you can increase the available throughput for the Kinesis stream by increasing the stream's number of shards. For more information, see [ How do I change the number of open shards in Kinesis Data Streams?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-streams-open-shards/).
+ **Other sources or sinks are not properly provisioned or available:** Verify that your application is correctly provisioning sources and sinks. Check that any sources or sinks used in the application (such as other AWS services, or external sources or destinations) are well provisioned, are not experiencing read or write throttling, or are periodically unavailable.

  If you are experiencing throughput-related issues with your dependent services, either increase resources available to those services, or investigate the cause of any errors or unavailability.
+ **Operators are not properly provisioned:** If the workload on the threads for one of the operators in your application is not correctly distributed, the operator can become overloaded and the application can crash. For information about tuning operator parallelism, see [Manage operator scaling properly](performance-improving.md#performance-improving-scaling-op).
+ **Application fails with DaemonException: ** This error appears in your application log if you are using a version of Apache Flink prior to 1.11. You may need to upgrade to a later version of Apache Flink so that a KPL version of 0.14 or later is used. 
+ **Application fails with TimeoutException, FlinkException, or RemoteTransportException:** These errors may appear in your application log if your task managers are crashing. If your application is overloaded, your task managers can experience CPU or memory resource pressure, causing them to fail.

  These errors may look like the following:
  + `java.util.concurrent.TimeoutException: The heartbeat of JobManager with id xxx timed out`
  + `org.apache.flink.util.FlinkException: The assigned slot xxx was removed`
  + `org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager`

  To troubleshoot this condition, check the following:
  + Check your CloudWatch metrics for unusual spikes in CPU or memory usage.
  + Check your application for throughput issues. For more information, see [Troubleshoot performance issues](performance-troubleshooting.md).
  + Examine your application log for unhandled exceptions that your application code is raising.
+ **Application fails with JaxbAnnotationModule Not Found error:** This error occurs if your application uses Apache Beam, but doesn't have the correct dependencies or dependency versions. Managed Service for Apache Flink applications that use Apache Beam must use the following versions of dependencies:

  ```
  <jackson.version>2.10.2</jackson.version>
  ...
  <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-jaxb-annotations</artifactId>
      <version>2.10.2</version>
  </dependency>
  ```

  If you do not provide the correct version of `jackson-module-jaxb-annotations` as an explicit dependency, your application loads it from the environment dependencies, and since the versions do not match, the application crashes at runtime. 

  For more information about using Apache Beam with Managed Service for Apache Flink, see [Use CloudFormationCreating an application using Apache Beam](examples-beam.md).
+ **Application fails with java.io.IOException: Insufficient number of network buffers**

  This happens when an application does not have enough memory allocated for network buffers. Network buffers facilitate communication between subtasks. They are used to store records before transmission over a network, and to store incoming data before dissecting it into records and handing them to subtasks. The number of network buffers required scales directly with the parallelism and complexity of your job graph. There are a number of approaches to mitigate this issue:
  + You can configure a lower `parallelismPerKpu` so that there is more memory allocated per-subtask and network buffers. Note that lowering `parallelismPerKpu` will increase KPU and therefore cost. To avoid this, you can keep the same amount of KPU by lowering parallelism by the same factor.
  + You can simplify your job graph by reducing the number of operators or chaining them so that fewer buffers are needed.
  + Otherwise, you can reach out to https://aws.amazon.com/premiumsupport/ for custom network buffer configuration.

# Throughput is too slow
<a name="troubleshooting-rt-throughput"></a>

If your application is not processing incoming streaming data quickly enough, it will perform poorly and become unstable. This section describes symptoms and troubleshooting steps for this condition. 

## Symptoms
<a name="troubleshooting-rt-throughput-symptoms"></a>

This condition can have the following symptoms:
+ If the data source for your application is a Kinesis stream, the stream's `millisbehindLatest` metric continually increases.
+ If the data source for your application is an Amazon MSK cluster, the cluster's consumer lag metrics continually increase. For more information, see [ Consumer-Lag Monitoring](https://docs.aws.amazon.com/msk/latest/developerguide/consumer-lag.html) in the [ Amazon MSK Developer Guide](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html).
+ If the data source for your application is a different service or source, check any available consumer lag metrics or data available.

## Causes and solutions
<a name="troubleshooting-rt-throughput-causes"></a>

There can be many causes for slow application throughput. If your application is not keeping up with input, check the following:
+ If throughput lag is spiking and then tapering off, check if the application is restarting. Your application will stop processing input while it restarts, causing lag to spike. For information about application failures, see [Application is restarting](troubleshooting-rt-restarts.md).
+ If throughput lag is consistent, check to see if your application is optimized for performance. For information on optimizing your application's performance, see [Troubleshoot performance issues](performance-troubleshooting.md).
+ If throughput lag is not spiking but continuously increasing, and your application is optimized for performance, you must increase your application resources. For information on increasing application resources, see [Implement application scaling](how-scaling.md).
+ If your application reads from a Kafka cluster in a different Region and `FlinkKafkaConsumer` or `KafkaSource` are mostly idle (high `idleTimeMsPerSecond` or low `CPUUtilization`) despite high consumer lag, you can increase the value for `receive.buffer.byte`, such as 2097152. For more information, see the high latency environment section in [Custom MSK configurations](https://docs.aws.amazon.com/msk/latest/developerguide/msk-configuration-properties.html).

For troubleshooting steps for slow throughput or consumer lag increasing in the application source, see [Troubleshoot performance issues](performance-troubleshooting.md).

# Unbounded state growth
<a name="troubleshooting-rt-stateleaks"></a>

If your application is not properly disposing of outdated state information, it will continually accumulate and lead to application performance or stability issues. This section describes symptoms and troubleshooting steps for this condition.

## Symptoms
<a name="troubleshooting-rt-stateleaks-symptoms"></a>

This condition can have the following symptoms:
+ The `lastCheckpointDuration` metric is gradually increasing or spiking.
+ The `lastCheckpointSize` metric is gradually increasing or spiking.

## Causes and solutions
<a name="troubleshooting-rt-stateleaks-causes"></a>

The following conditions may cause your application to accumulate state data: 
+ Your application is retaining state data longer than it is needed.
+ Your application uses window queries with too long a duration.
+ You did not set TTL for your state data. For more information, see [ State Time-To-Live (TTL)](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl) in the Apache Flink Documentation.
+ You are running an application that depends on Apache Beam version 2.25.0 or newer. You can opt out of the new version of the read transform by [extending your BeamApplicationProperties](https://docs.aws.amazon.com/managed-flink/latest/java/examples-beam.html#examples-beam-configure) with the key experiments and value `use_deprecated_read`. For more information, see the [Apache Beam Documentation](https://beam.apache.org/blog/beam-2.25.0/#highlights).

Sometimes applications are facing ever growing state size growth, which is not sustainable in the long term (a Flink application runs indefinitely, after all). Sometimes, this can be traced back to applications storing data in state and not aging out old information properly. But sometimes there are just unreasonable expectations on what Flink can deliver. Applications can use aggregations over large time windows spanning days or even weeks. Unless [AggregateFunctions](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/#aggregatefunction) are used, which allow incremental aggregations, Flink needs to keep the events of the entire window in state.

Moreover, when using process functions to implement custom operators, the application needs to remove data from state that is no longer required for the business logic. In that case, [state time-to-live](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl) can be used to automatically age out data based on processing time. Managed Service for Apache Flink is using incremental checkpoints and thus state ttl is based on [RocksDB compaction](https://github.com/facebook/rocksdb/wiki/Compaction). You can only observe an actual reduction in state size (indicated by checkpoint size) after a compaction operation occurs. In particular for checkpoint sizes below 200 MB, it's unlikely that you observe any checkpoint size reduction as a result of state expiring. However, savepoints are based on a clean copy of the state that does not contain old data, so you can trigger a snapshot in Managed Service for Apache Flink to force the removal of outdated state.

For debugging purposes, it can make sense to disable incremental checkpoints to verify more quickly that the checkpoint size actually decreases or stabilizes (and avoid the effect of compaction in RocksBS). This requires a ticket to the service team, though. 

# I/O bound operators
<a name="troubleshooting-io-bound-operators"></a>

It's best to avoid dependencies to external systems on the data path. It's often much more performant to keep a reference data set in state rather than querying an external system to enrich individual events. However, sometimes there are dependencies that cannot be easily moved to state, e.g., if you want to enrich events with a machine learning model that is hosted on Amazon Sagemaker.

Operators that are interfacing with external systems over the network can become a bottleneck and cause backpressure. It is highly recommended to use [AsyncIO](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio/) to implement the functionality, to reduce the wait time for individual calls and avoid the entire application slowing down.

Moreover, for applications with I/O bound operators it can also make sense to increase the [ParallelismPerKPU](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ParallelismConfiguration.html) setting of the Managed Service for Apache Flink application. This configuration describes the number of parallel subtasks an application can perform per Kinesis Processing Unit (KPU). By increasing the value from the default of 1 to, say, 4, the application leverages the same resources (and has the same cost) but can scale to 4 times the parallelism. This works well for I/O bound applications, but it causes additional overhead for applications that are not I/O bound.

# Upstream or source throttling from a Kinesis data stream
<a name="troubleshooting-source-throttling"></a>

**Symptom**: The application is encountering `LimitExceededExceptions` from their upstream source Kinesis data stream.

**Potential Cause**: The default setting for the Apache Flink library Kinesis connector is set to read from the Kinesis data stream source with a very aggressive default setting for the maximum number of records fetched per `GetRecords` call. Apache Flink is configured by default to fetch 10,000 records per `GetRecords` call (this call is made by default every 200 ms), although the limit per shard is only 1,000 records.

This default behavior can lead to throttling when attempting to consume from the Kinesis data stream, which will affect the applications performance and stability.

You can confirm this by checking the CloudWatch `ReadProvisionedThroughputExceeded` metric and seeing prolonged or sustained periods where this metric is greater than zero.

You can also see this in CloudWatch logs for your Amazon Managed Service for Apache Flink application by observing continued `LimitExceededException` errors.

**Resolution**: You can do one of two things to resolve this scenario:
+ Lower the default limit for the number of records fetched per `GetRecords` call
+ Enable Adaptive Reads in your Amazon Managed Service for Apache Flink application. For more information on the Adaptive Reads feature, see [SHARD\$1USE\$1ADAPTIVE\$1READS](https://nightlies.apache.org/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.html#SHARD_USE_ADAPTIVE_READS)

# Checkpoints
<a name="troubleshooting-checkpoints"></a>

Checkpoints are Flink’s mechanism to ensure that the state of an application is fault tolerant. The mechanism allows Flink to recover the state of operators if the job fails and gives the application the same semantics as failure-free execution. With Managed Service for Apache Flink, the state of an application is stored in RocksDB, an embedded key/value store that keeps its working state on disk. When a checkpoint is taken the state is also uploaded to Amazon S3 so even if the disk is lost then the checkpoint can be used to restore the applications state.

For more information, see [How does State Snapshotting Work?](https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/fault_tolerance/#how-does-state-snapshotting-work).

## Checkpointing stages
<a name="troubleshooting-checkpointing-stages"></a>

For a checkpointing operator subtask in Flink there are 5 main stages:
+ Waiting [**Start Delay**] – Flink uses checkpoint barriers that get inserted into the stream so time in this stage is the time the operator waits for the checkpoint barrier to reach it. 
+ Alignment [**Alignment Duration**] – In this stage the subtask has reached one barrier but it’s waiting for barriers from other input streams. 
+ Sync checkpointing [**Sync Duration**] – This stage is when the subtask actually snapshots the state of the operator and blocks all other activity on the subtask. 
+ Async checkpointing [**Async Duration**] – The majority of this stage is the subtask uploading the state to Amazon S3. During this stage, the subtask is no longer blocked and can process records. 
+ Acknowledging – This is usually a short stage and is simply the subtask sending an acknowledgement to the JobManager and also performing any commit messages (e.g. with Kafka sinks). 

 Each of these stages (apart from Acknowledging) maps to a duration metric for checkpoints that is available from the Flink WebUI, which can help isolate the cause of the long checkpoint.

To see an exact definition of each of the metrics available on checkpoints, go to [History Tab](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/checkpoint_monitoring/#history-tab).

## Investigating
<a name="troubleshooting-checkpoints-investigating"></a>

When investigating long checkpoint duration, the most important thing to determine is the bottleneck for the checkpoint, i.e., what operator and subtask is taking the longest to checkpoint and which stage of that subtask is taking an extended period of time. This can be determined using the Flink WebUI under the jobs checkpoint task. Flink’s Web interface provides data and information that helps to investigate checkpointing issues. For a full breakdown, see [Monitoring Checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/checkpoint_monitoring/).

 The first thing to look at is the **End to End Duration** of each operator in the Job graph to determine which operator is taking long to checkpoint and warrants further investigation. Per the Flink documentation, the definition of the duration is:

*The duration from the trigger timestamp until the latest acknowledgement (or n/a if no acknowledgement received yet). This end to end duration for a complete checkpoint is determined by the last subtask that acknowledges the checkpoint. This time is usually larger than single subtasks need to actually checkpoint the state.*

The other durations for the checkpoint also gives more fine-grained information as to where the time is being spent.

If the **Sync Duration** is high then this indicates something is happening during the snapshotting. During this stage `snapshotState()` is called for classes that implement the snapshotState interface; this can be user code so thread-dumps can be useful for investigating this.

A long **Async Duration** would suggest that a lot of time is being spent on uploading the state to Amazon S3. This can occur if the state is large or if there is a lot of state files that are being uploaded. If this is the case it is worth investigating how state is being used by the application and ensuring that the Flink native data structures are being used where possible ([Using Keyed State](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#using-keyed-state)). Managed Service for Apache Flink configures Flink in such a way as to minimize the number of Amazon S3 calls to ensure this doesn’t get too long. Following is an example of an operator's checkpointing statistics. It shows that the **Async Duration** is relatively long compared to the preceding operator checkpointing statistics.

![\[Investigating checkpointing\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/checkpoint.png)


The **Start Delay** being high would show that the majority of the time is being spent on waiting for the checkpoint barrier to reach the operator. This indicates that the application is taking a while to process records, meaning the barrier is flowing through the job graph slowly. This is usually the case if the Job is backpressured or if an operator(s) is constantly busy. Following is an example of a JobGraph where the second KeyedProcess operator is busy.

![\[Investigating checkpointing\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/checkpoint2.png)


You can investigate what is taking so long by either using Flink Flame Graphs or TaskManager thread dumps. Once the bottle-neck has been identified, it can be investigated further using either Flame-graphs or thread-dumps.

## Thread dumps
<a name="troubleshooting-checkpoints-investigating-thread-dumps"></a>

Thread dumps are another debugging tool that is at a slightly lower level than flame graphs. A thread dump outputs the execution state of all threads at a point in time. Flink takes a JVM thread dump, which is an execution state of all threads within the Flink process. The state of a thread is presented by a stack trace of the thread as well as some additional information. Flame graphs are actually built using multiple stack traces taken in quick succession. The graph is a visualisation made from these traces that makes it easy to identify the common code paths.

```
"KeyedProcess (1/3)#0" prio=5 Id=1423 RUNNABLE
    at app//scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:154)
    at $line33.$read$$iw$$iw$ExpensiveFunction.processElement(<console>>19)
    at $line33.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:14)
    at app//org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at app//org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
    at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at app//org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    ...
```

Above is a snippet of a thread dump taken from the Flink UI for a single thread. The first line contains some general information about this thread including:
+ The thread name *KeyedProcess (1/3)\$10*
+ Priority of the thread *prio=5*
+ A unique thread Id *Id=1423*
+ Thread state *RUNNABLE*

 The name of a thread usually gives information as to the general purpose of the thread. Operator threads can be identified by their name since operator threads have the same name as the operator, as well as an indication of which subtask it is related to, e.g., the *KeyedProcess (1/3)\$10* thread is from the *KeyedProcess* operator and is from the 1st (out of 3) subtask.

Threads can be in one of a few states:
+ NEW – The thread has been created but has not yet been processed
+ RUNNABLE – The thread is execution on the CPU
+ BLOCKED – The thread is waiting for another thread to release it’s lock
+ WAITING – The thread is waiting by using a `wait()`, `join()`, or `park()` method
+ TIMED\$1WAITING – The thread is waiting by using a sleep, wait, join or park method, but with a maximum wait time.

**Note**  
In Flink 1.13, the maximum depth of a single stacktrace in the thread dump is limited to 8. 

**Note**  
Thread dumps should be the last resort for debugging performance issues in a Flink application as they can be challenging to read, require multiple samples to be taken and manually analysed. If at all possible it is preferable to use flame graphs.

### Thread dumps in Flink
<a name="troubleshooting-checkpoints-investigating-thread-dumps-flink"></a>

In Flink, a thread dump can be taken by choosing the **Task Managers** option on the left navigation bar of the Flink UI, selecting a specific task manager, and then navigating to the **Thread Dump** tab. The thread dump can be downloaded, copied to your favorite text editor (or thread dump analyzer), or analyzed directly inside the text view in the Flink Web UI (however, this last option can be a bit clunky.

To determine which Task Manager to take a thread dump of the **TaskManagers** tab can be used when a particular operator is chosen. This shows that the operator is running on different subtasks of an operator and can run on different Task Managers.

![\[Using Thread dumps\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/checkpoint4.png)


The dump will be comprised of multiple stack traces. However when investigating the dump the ones related to an operator are the most important. These can easily be found since operator threads have the same name as the operator, as well as an indication of which subtask it is related to. For example the following stack trace is from the *KeyedProcess* operator and is the first subtask. 

```
"KeyedProcess (1/3)#0" prio=5 Id=595 RUNNABLE
    at app//scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:155)
    at $line360.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:19)
    at $line360.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:14)
    at app//org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at app//org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
    at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at app//org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    ...
```

This can become confusing if there are multiple operators with the same name but we can name operators to get around this. For example:

```
....
.process(new ExpensiveFunction).name("Expensive function")
```

## [Flame graphs](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/)
<a name="troubleshooting-checkpoints-investigating-flame-graphs"></a>

Flame graphs are a useful debugging tool that visualize the stack traces of the targeted code, which allows the most frequent code paths to be identified. They are created by sampling stack traces a number of times. The x-axis of a flame graph shows the different stack profiles, while the y-axis shows the stack depth, and calls in the stack trace. A single rectangle in a flame graph represents on stack frame, and the width of a frame shows how frequently it appears in the stacks. For more details about flame graphs and how to use them, see [Flame Graphs](https://www.brendangregg.com/flamegraphs.html).

In Flink, the flame graph for an operator can be accessed via the Web UI by selecting an operator and then choosing the **FlameGraph** tab. Once enough samples have been collected the flamegraph will be displayed. Following is the FlameGraph for the ProcessFunction that was taking a lot of time to checkpoint.

![\[Using Flame graphs\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/checkpoint3.png)


This is a very simple flame graph and shows that all the CPU time is being spent within a foreach look within the `processElement` of the ExpensiveFunction operator. You also get the line number to help determine where in the code execution is taking place.

# Checkpointing is timing out
<a name="troubleshooting-chk-timeout"></a>

If your application is not optimized or properly provisioned, checkpoints can fail. This section describes symptoms and troubleshooting steps for this condition. 

## Symptoms
<a name="troubleshooting-chk-timeout-symptoms"></a>

If checkpoints fail for your application, the `numberOfFailedCheckpoints` will be greater than zero. 

Checkpoints can fail due to either direct failures, such as application errors, or due to transient failures, such as running out of application resources. Check your application logs and metrics for the following symptoms:
+ Errors in your code.
+ Errors accessing your application's dependent services.
+ Errors serializing data. If the default serializer can't serialize your application data, the application will fail. For information about using a custom serializer in your application, see [Data Types and Serialization](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/) in the Apache Flink Documentation.
+ Out of Memory errors.
+ Spikes or steady increases in the following metrics:
  + `heapMemoryUtilization`
  + `oldGenerationGCTime`
  + `oldGenerationGCCount`
  + `lastCheckpointSize`
  + `lastCheckpointDuration`

For more information about monitoring checkpoints, see [Monitoring Checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/monitoring/checkpoint_monitoring/) in the Apache Flink Documentation.

## Causes and solutions
<a name="troubleshooting-chk-timeout-causes"></a>

Your application log error messages show the cause for direct failures. Transient failures can have the following causes:
+ Your application has insufficient KPU provisioning. For information about increasing application provisioning, see [Implement application scaling](how-scaling.md).
+ Your application state size is too large. You can monitor your application state size using the `lastCheckpointSize` metric.
+ Your application's state data is unequally distributed between keys. If your application uses the `KeyBy` operator, ensure that your incoming data is being divided equally between keys. If most of the data is being assigned to a single key, this creates a bottleneck that causes failures.
+ Your application is experiencing memory or garbage collection backpressure. Monitor your application's `heapMemoryUtilization`, `oldGenerationGCTime`, and `oldGenerationGCCount` for spikes or steadily increasing values.

# Checkpoint failure for Apache Beam application
<a name="troubleshooting-chk-failure-beam"></a>

If your Beam application is configured with [shutdownSourcesAfterIdleMs](https://beam.apache.org/documentation/runners/flink/#:~:text=shutdownSourcesAfterIdleMs) set to 0ms, checkpoints can fail to trigger because tasks are in "FINISHED" state. This section describes symptoms and resolution for this condition. 

## Symptom
<a name="troubleshooting-chk-failure-beam-symptoms"></a>

Go to your Managed Service for Apache Flink application CloudWatch logs and check if the following log message has been logged. The following log message indicates that checkpoint failed to trigger as some tasks has been finished. 

```
                {
                "locationInformation": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:888)",
                "logger": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator",
                "message": "Failed to trigger checkpoint for job your job ID since some tasks of job your job ID has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.",
                "threadName": "Checkpoint Timer",
                "applicationARN": your application ARN,
                "applicationVersionId": "5",
                "messageSchemaVersion": "1",
                "messageType": "INFO"
                }
```

This can also be found on Flink dashboard where some tasks have entered "FINISHED" state, and checkpointing is not possible anymore.

![\[Tasks in "FINISHED" state\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/beam_checkpoint_failure.png)


## Cause
<a name="troubleshooting-chk-failure-beam-causes"></a>

shutdownSourcesAfterIdleMs is a Beam config variable that shuts down sources which have been idle for the configured time of milliseconds. Once a source has been shut down, checkpointing is not possible anymore. This could lead to [checkpoint failure](https://issues.apache.org/jira/browse/FLINK-2491). 

One of the causes for tasks entering "FINISHED" state is when shutdownSourcesAfterIdleMs is set to 0ms, which means that tasks that are idle will be shutdown immediately.

## Solution
<a name="troubleshooting-chk-failure-beam-solution"></a>

To prevent tasks from entering "FINISHED" state immediately, set shutdownSourcesAfterIdleMs to Long.MAX\$1VALUE. This can be done in two ways:
+ Option 1: If your beam configuration is set in your Managed Service for Apache Flink application configuration page, then you can add a new key value pair to set shutdpwnSourcesAfteridleMs as follows:  
![\[Set shutdownSourcesAfterIdleMs to Long.MAX_VALUE\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/beam_checkpoint_failure_solution.png)
+ Option 2: If your beam configuration is set in your JAR file, then you can set shutdownSourcesAfterIdleMs as follows:

  ```
                          FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class); // Initialize Beam Options object
  
                          options.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE); // set shutdownSourcesAfterIdleMs to Long.MAX_VALUE
                          options.setRunner(FlinkRunner.class);
  
                          Pipeline p = Pipeline.create(options); // attach specified options to Beam pipeline
  ```

# Backpressure
<a name="troubleshooting-backpressure"></a>

Flink uses backpressure to adapt the processing speed of individual operators. 

The operator can struggle to keep up processing the message volume it receives for many reasons. The operation may require more CPU resources than the operator has available, The operator may wait for I/O operations to complete. If an operator cannot process events fast enough, it build backpressure in the upstream operators feeding into the slow operator. This causes the upstream operators to slow down, which can further propagate the backpressure to the source and cause the source to adapt to the overall throughput of the application by slowing down as well. You can find a deeper description of backpressure and how it works at [How Apache Flink™ handles backpressure](https://www.ververica.com/blog/how-flink-handles-backpressure).

Knowing which operators in an applications are slow gives you crucial information to understand the root cause of performance problems in the application. Backpressure information is [exposed through the Flink Dashboard](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/monitoring/back_pressure/). To identify the slow operator, look for the operator with a high backpressure value that is closest to a sink (operator B in the following example). The operator causing the slowness is then one of the downstream operators (operator C in the example). B could process events faster, but is backpressured as it cannot forward the output to the actual slow operator C.

```
A (backpressured 93%) -> B (backpressured 85%) -> C (backpressured 11%) -> D (backpressured 0%)
```

Once you have identified the slow operator, try to understand why it's slow. There could be a myriad of reasons and sometimes it's not obvious what's wrong and can require days of debugging and profiling to resolve. Following are some obvious and more common reasons, some of which are further explained below:
+ The operator is doing slow I/O, e.g., network calls (consider using AsyncIO instead).
+ There is a skew in the data and one operator is receiving more events than others (verify by looking at the number of messages in/out of individual subtasks (i.e., instances of the same operator) in the Flink dashboard.
+ It's a resource intensive operation (if there is no data skew consider scaling out for CPU/memory bound work or increasing `ParallelismPerKPU` for I/O bound work)
+ Extensive logging in the operator (reduce the logging to a minimum for production application or consider sending debug output to a data stream instead).

## Testing throughput with the Discarding Sink
<a name="troubleshooting-testing-throughput"></a>

The [Discarding Sink](https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.html) simply disregards all events it receives while still executing the application (an application without any sink fails to execute). This is very useful for throughput testing, profiling, and to verify if the application is scaling properly. It's also a very pragmatic sanity check to verify if the sinks are causing back pressure or the application (but just checking the backpressure metrics is often easier and more straightforward).

By replacing all sinks of an application with a discarding sink and creating a mock source that generates data that r esembles production data, you can measure the maximum throughput of the application for a certain parallelism setting. You can then also increase the parallelism to verify that the application scales properly and does not have a bottleneck that only emerges at higher throughput (e.g., because of data skew).

# Data skew
<a name="troubleshooting-data-skew"></a>

A Flink application is executed on a cluster in a distributed fashion. To scale out to multiple nodes, Flink uses the concept of keyed streams, which essentially means that the events of a stream are partitioned according to a specific key, e.g., customer id, and Flink can then process different partitions on different nodes. Many of the Flink operators are then evaluated based on these partitions, e.g., [Keyed Windows](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/), [Process Functions](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function/) and [Async I/O](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio/).

Choosing a partition key often depends on the business logic. At the same time, many of the best practices for, e.g., [DynamoDB](https://aws.amazon.com/dynamodb/) and Spark, equally apply to Flink, including:
+ ensuring a high cardinality of partition keys
+ avoiding skew in the event volume between partitions

 You can identify skew in the partitions by comparing the records received/sent of subtasks (i.e., instances of the same operator) in the Flink dashboard. In addition, Managed Service for Apache Flink monitoring can be configured to expose metrics for `numRecordsIn/Out` and `numRecordsInPerSecond/OutPerSecond` on a subtask level as well.

# State skew
<a name="troubleshooting-state-skew"></a>

For stateful operators, i.e., operators that maintain state for their business logic such as windows, data skew always leads to state skew. Some subtasks receive more events than others because of the skew in the data and hence are also persisting more data in state. However, even for an application that has evenly balanced partitions, there can be a skew in how much data is persisted in state. For instance, for session windows, some users and sessions respectively may be much longer than others. If the longer sessions happen to be part of the same partition, it can lead to an imbalance of the state size kept by different subtasks of the same operator.

 State skew not only increases more memory and disk resources required by individual subtasks, it can also decrease the overall performance of the application. When an application is taking a checkpoint or savepoint, the operator state is persisted to Amazon S3, to protect the state against node or cluster failure. During this process (especially with exactly once semantics that are enabled by default on Managed Service for Apache Flink), the processing stalls from an external perspective until the checkpoint/savepoint has completed. If there is data skew, the time to complete the operation can be bound by a single subtask that has accumulated a particularly high amount of state. In extreme cases, taking checkpoints/savepoints can fail because of a single subtask not being able to persist state.

 So similar to data skew, state skew can substantially slow down an application.

 To identify state skew, you can leverage the Flink dashboard. Find a recent checkpoint or savepoint and compare the amount of data that has been stored for individual subtasks in the details.

# Integrate with resources in different Regions
<a name="troubleshooting-resources-in-different-regions"></a>

You can enable using `StreamingFileSink` to write to an Amazon S3 bucket in a different Region from your Managed Service for Apache Flink application via a setting required for cross Region replication in the Flink configuration. To do this, file a support ticket at [AWS Support Center](https://console.aws.amazon.com/support/home#/).