

# Troubleshoot Managed Service for Apache Flink
<a name="troubleshooting"></a>

The following topics can help you troubleshoot problems that you might encounter with Amazon Managed Service for Apache Flink. 

Choose the appropriate topic to review solutions.

**Topics**
+ [Development troubleshooting](troubleshooting-development.md)
+ [Runtime troubleshooting](troubleshooting-runtime.md)

# Development troubleshooting
<a name="troubleshooting-development"></a>

This section contains information about diagnosing and fixing development issues with your Managed Service for Apache Flink application.

**Topics**
+ [System rollback best practices](troubleshooting-system-rollback.md)
+ [Hudi configuration best practices](troubleshooting-hudi.md)
+ [Apache Flink Flame Graphs](troubleshooting-update-flamegraphs.md)
+ [Credential provider issue with EFO connector 1.15.2](troubleshooting-credential-provider.md)
+ [Applications with unsupported Kinesis connectors](troubleshooting-unsupported-kinesis-connectors.md)
+ [Compile error: "Could not resolve dependencies for project"](troubleshooting-compile.md)
+ [Invalid choice: "kinesisanalyticsv2"](troubleshooting-cli-update.md)
+ [UpdateApplication action isn't reloading application code](troubleshooting-update.md)
+ [S3 StreamingFileSink FileNotFoundExceptions](troubleshooting-s3sink.md)
+ [FlinkKafkaConsumer issue with stop with savepoint](troubleshooting-FlinkKafkaConsumer.md)
+ [Flink 1.15 Async Sink Deadlock](troubleshooting-async-deadlock.md)
+ [Amazon Kinesis data streams source processing out of order during re-sharding](troubleshooting-kinesis-data-streams-processing-out-of-order.md)
+ [Real-time vector embedding blueprints FAQ and troubleshooting](troubleshooting-blueprints.md)

# System rollback best practices
<a name="troubleshooting-system-rollback"></a>

With automatic system rollback and operations visibility capabilities in Amazon Managed Service for Apache Flink, you can identify and resolve issues with your applications.

## System rollbacks
<a name="troubleshooting-unsupported-kinesis-connectors-error"></a>

If your application update or scaling operation fails due to a customer error, such as a code bug or permission issue, Amazon Managed Service for Apache Flink automatically attempts to roll back to the previous running version if you have opted in to this functionality. For more information, see [Enable system rollbacks for your Managed Service for Apache Flink application](how-system-rollbacks.md). If this autorollback fails or you have not opted in or opted out, your application will be placed into the `READY` state. To update your application, complete the following steps:   Check the Amazon Managed Service for Apache Flink console or use the `DescribeApplicationOperation` API to see the error description for why the operation failed.    For the full error stack, use [Cloudwatch logs](https://docs.aws.amazon.com/managed-flink/latest/java/logging.html).   Common issues are insufficient permissions, incompatible code changes, or infrastructure misconfigurations. Resolve the underlying issue.    Use the `UpdateApplicaton` API to redeploy your new application version.    

## Manual rollback
<a name="troubleshooting-unsupported-kinesis-connectors-error"></a>

If the application is not progressing and is in a transient state for long, or if the application successfully transitioned to `Running`, but you see downstream issues like processing errors in a successfully updated Flink application, you can manually roll it back using the `RollbackApplication` API.

1. Call `RollbackApplication` - this will revert to the previous running version and restore the previous state. 

1. Monitor the rollback operation using the `DescribeApplicationOperation` API.

1. If rollback fails, use the previous system rollback steps.

## Operations visibility
<a name="troubleshooting-unsupported-kinesis-connectors-error"></a>

The `ListApplicationOperations` API shows the history of all customer and system operations on your application.

1. Get the *operationId* of the failed operation from the list.

1. Call `DescribeApplicationOperation` and check the status and *statusDescription*.

1. If an operation failed, the description points to a potential error to investigate. 

**Common error code bugs:** Use the rollback capabilities to revert to the last working version. Resolve bugs and retry the update. 

**Permission issues:** Use the `DescribeApplicationOperation` to see the required permissions. Update application permissions and retry. 

**Amazon Managed Service for Apache Flink service issues:** Check the AWS Health Dashboard or open a support case.

# Hudi configuration best practices
<a name="troubleshooting-hudi"></a>

To run Hudi connectors on Managed Service for Apache Flink we recommend the following configuration changes.

Disable `hoodie.embed.timeline.server`

Hudi connector on Flink sets up an embedded timeline (TM) server on the Flink jobmanager (JM) to cache metadata to improve performance when job parallelism is high. We recommend that you disable this embedded server on Managed Service for Apache Flink because we disable non-Flink communication between JM and TM.

If this server is enabled, Hudi writes will first attempt to connect to the embedded server on JM, and then fall back to reading metadata from Amazon S3. This means that Hudi incurs a connection timeout that delays Hudi writes and causes a performance impact on Managed Service for Apache Flink.

# Apache Flink Flame Graphs
<a name="troubleshooting-update-flamegraphs"></a>

Flame Graphs are enabled by default on applications in Managed Service for Apache Flink versions that support it. Flame Graphs may affect application performance if you keep the graph open, as mentioned in [Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15//docs/ops/debugging/flame_graphs/). 

 If you want to disable Flame Graphs for your application, create a case to request it to be disabled for your application ARN. For more information, see the [AWS Support Center](https://console.aws.amazon.com/support/home#/).

# Credential provider issue with EFO connector 1.15.2
<a name="troubleshooting-credential-provider"></a>

There is a [known issue](https://issues.apache.org/jira/browse/FLINK-29205) with Kinesis Data Streams EFO connector versions up to 1.15.2 where the `FlinkKinesisConsumer` is not respecting `Credential Provider` configuration. Valid configurations are being disregarded due to the issue, which results in the `AUTO` credential provider being used. This can cause a problem using cross-account access to Kinesis using EFO connector.

To resolve this error please use EFO connector version 1.15.3 or higher. 

# Applications with unsupported Kinesis connectors
<a name="troubleshooting-unsupported-kinesis-connectors"></a>

Managed Service for Apache Flink for Apache Flink version 1.15 or later will [automatically reject applications from starting or updating](https://docs.aws.amazon.com/managed-flink/latest/java/flink-1-15-2.html) if they are using unsupported Kinesis Connector versions (pre-version 1.15.2) bundled into application JARs or archives (ZIP). 

## Rejection error
<a name="troubleshooting-unsupported-kinesis-connectors-error"></a>

You will see the following error when submitting create / update application calls through:

```
An error occurred (InvalidArgumentException) when calling the CreateApplication operation: An unsupported Kinesis connector version has been detected in the application. Please update flink-connector-kinesis to any version equal to or newer than 1.15.2.
For more information refer to connector fix: https://issues.apache.org/jira/browse/FLINK-23528
```

## Steps to remediate
<a name="troubleshooting-unsupported-kinesis-connectors-steps-to-remediate"></a>
+ Update the application’s dependency on `flink-connector-kinesis`. If you are using Maven as your project’s build tool, follow [Update a Maven dependency](#troubleshooting-unsupported-kinesis-connectors-update-maven-dependency). If you are using Gradle, follow [Update a Gradle dependency](#troubleshooting-unsupported-kinesis-connectors-update-gradle-dependency).
+ Repackage the application.
+ Upload to an Amazon S3 bucket.
+ Resubmit the create / update application request with the revised application just uploaded to the Amazon S3 bucket.
+ If you continue to see the same error message, re-check your application dependencies. If the problem persists please create a support ticket. 

### Update a Maven dependency
<a name="troubleshooting-unsupported-kinesis-connectors-update-maven-dependency"></a>

1. Open the project’s `pom.xml`.

1. Find the project’s dependencies. They look like:

   ```
   <project>
   
       ...
   
       <dependencies>
   
           ...
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-connector-kinesis</artifactId>
           </dependency>
   
           ...
   
       </dependencies>
   
       ...
   
   </project>
   ```

1. Update `flink-connector-kinesis` to a version that is equal to or newer than 1.15.2. For instance:

   ```
   <project>
   
       ...
   
       <dependencies>
   
           ...
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-connector-kinesis</artifactId>
               <version>1.15.2</version>
           </dependency>
   
           ...
   
       </dependencies>
   
       ...
   
   </project>
   ```

### Update a Gradle dependency
<a name="troubleshooting-unsupported-kinesis-connectors-update-gradle-dependency"></a>

1. Open the project’s `build.gradle` (or `build.gradle.kts` for Kotlin applications). 

1. Find the project’s dependencies. They look like:

   ```
   ...
   
   dependencies {
   
       ...
   
       implementation("org.apache.flink:flink-connector-kinesis")
   
       ...
   
   }
   
   ...
   ```

1. Update `flink-connector-kinesis` to a version that is equal to or newer than 1.15.2. For instance:

   ```
   ...
   
   dependencies {
   
       ...
   
       implementation("org.apache.flink:flink-connector-kinesis:1.15.2")
   
       ...
   
   }
   
   ...
   ```

# Compile error: "Could not resolve dependencies for project"
<a name="troubleshooting-compile"></a>

In order to compile the Managed Service for Apache Flink sample applications, you must first download and compile the Apache Flink Kinesis connector and add it to your local Maven repository. If the connector hasn't been added to your repository, a compile error similar to the following appears:

```
Could not resolve dependencies for project your project name: Failure to find org.apache.flink:flink-connector-kinesis_2.11:jar:1.8.2 in https://repo.maven.apache.org/maven2 was cached in the local repository, resolution will not be reattempted until the update interval of central has elapsed or updates are forced
```

To resolve this error, you must download the Apache Flink source code (version 1.8.2 from [https://flink.apache.org/downloads.html](https://flink.apache.org/downloads.html)) for the connector. For instructions about how to download, compile, and install the Apache Flink source code, see [Using the Apache Flink Kinesis Streams connector with previous Apache Flink versions](earlier.md#how-creating-apps-building-kinesis).

# Invalid choice: "kinesisanalyticsv2"
<a name="troubleshooting-cli-update"></a>

To use v2 of the Managed Service for Apache Flink API, you need the latest version of the AWS Command Line Interface (AWS CLI).

For information about upgrading the AWS CLI, see [ Installing the AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/installing.html) in the *AWS Command Line Interface User Guide*.

# UpdateApplication action isn't reloading application code
<a name="troubleshooting-update"></a>

The [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) action will not reload application code with the same file name if no S3 object version is specified. To reload application code with the same file name, enable versioning on your S3 bucket, and specify the new object version using the `ObjectVersionUpdate` parameter. For more information about enabling object versioning in an S3 bucket, see [Enabling or Disabling Versioning](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/enable-versioning.html).

# S3 StreamingFileSink FileNotFoundExceptions
<a name="troubleshooting-s3sink"></a>

Managed Service for Apache Flink applications can run into In-progress part file `FileNotFoundException` when starting from snapshots if an In-progress part file referred to by its savepoint is missing. When this failure mode occurs, the Managed Service for Apache Flink application’s operator state is usually non-recoverable and must be restarted without snapshot using `SKIP_RESTORE_FROM_SNAPSHOT`. See following example stacktrace:

```
java.io.FileNotFoundException: No such file or directory: s3://amzn-s3-demo-bucket/pathj/INSERT/2023/4/19/7/_part-2-1234_tmp_12345678-1234-1234-1234-123456789012
        at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2231)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
        at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObject(HadoopS3AccessHelper.java:98)
        at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.recoverInProgressPart(S3RecoverableMultipartUploadFactory.java:97)
...
```

Flink `StreamingFileSink` writes records to filesystems supported by the [File Systems](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/filesystems/overview/). Given that the incoming streams can be unbounded, data is organized into part files of finite size with new files added as data is written. Part lifecycle and rollover policy determine the timing, size and the naming of the part files. 

During checkpointing and savepointing (snapshotting), all Pending files are renamed and committed. However, In-progress part files are not committed but renamed and their reference is kept within checkpoint or savepoint metadata to be used when restoring jobs. These In-progress part files will eventually rollover to Pending, renamed and committed by a subsequent checkpoint or savepoint.

Following are the root causes and mitigation for missing In-progress part file:
+ Stale snapshot used to start the Managed Service for Apache Flink application – only the latest system snapshot taken when an application is stopped or updated can be used to start a Managed Service for Apache Flink application with Amazon S3 StreamingFileSink. To avoid this class of failure, use the latest system snapshot.
  + This happens for example when you pick a snapshot created using `CreateSnapshot` instead of a system-triggered Snapshot during stop or update. The older snapshot’s savepoint keeps an out-of-date reference to In-progress part file that has been renamed and committed by subsequent checkpoint or savepoint.
  + This can also happen when a system triggered snapshot from non-latest Stop/Update event is picked. An example is an application with system snapshot disabled but has `RESTORE_FROM_LATEST_SNAPSHOT` configured. Generally, Managed Service for Apache Flink applications with Amazon S3 StreamingFileSink should always have system snapshot enabled and `RESTORE_FROM_LATEST_SNAPSHOT` configured.
+ In-progress part file removed – As the In-progress part file is located in an S3 bucket, it can be removed by other components or actors which have access to the bucket. 
  + This can happen when you have stopped your app for too long and the In-progress part file referred to by your app’s savepoint has been removed by [S3 bucket MultiPartUpload](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-abort-incomplete-mpu-lifecycle-config.html) lifecycle policy. To avoid this class of failure, make sure that your S3 Bucket MPU lifecycle policy covers a sufficiently large period for your use case.
  + This can also happen when the In-progress part file has been removed manually or by another one of your system’s components. To avoid this class of failure, please make sure that In-progress part files are not removed by other actors or components.
+ Race condition where an automated checkpoint is triggered after savepoint – This affects Managed Service for Apache Flink versions up to and including 1.13. This issue is fixed in Managed Service for Apache Flink version 1.15. Migrate your application to the latest version of Managed Service for Apache Flink to prevent recurrence. We also suggest migrating from StreamingFileSink to [FileSink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/filesystem/#file-sink).
  + When applications are stopped or updated, Managed Service for Apache Flink triggers a savepoint and stops the application in two steps. If an automated checkpoint triggers between the two steps, the savepoint will be unusable as its In-progress part file would be renamed and potentially committed.

# FlinkKafkaConsumer issue with stop with savepoint
<a name="troubleshooting-FlinkKafkaConsumer"></a>

When using the legacy FlinkKafkaConsumer there is a possibility your application may get stuck in UPDATING, STOPPING or SCALING, if you have system snapshots enabled. There is no published fix available for this [issue](https://issues.apache.org/jira/browse/FLINK-28758), therefore we recommend you upgrade to the new [KafkaSource](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-source) to mitigate this issue. 

If you are using the `FlinkKafkaConsumer` with snapshots enabled, there is a possibility when the Flink job processes a stop with savepoint API request, the `FlinkKafkaConsumer` can fail with a runtime error reporting a `ClosedException`. Under these conditions the Flink application becomes stuck, manifesting as Failed Checkpoints. 

# Flink 1.15 Async Sink Deadlock
<a name="troubleshooting-async-deadlock"></a>

There is a [known issue](https://issues.apache.org/jira/browse/FLINK-32230) with AWS connectors for Apache Flink implementing AsyncSink interface. This affects applications using Flink 1.15 with the following connectors: 
+ For Java applications:
  + KinesisStreamsSink – `org.apache.flink:flink-connector-kinesis`
  + KinesisStreamsSink – `org.apache.flink:flink-connector-aws-kinesis-streams`
  + KinesisFirehoseSink – `org.apache.flink:flink-connector-aws-kinesis-firehose`
  + DynamoDbSink – `org.apache.flink:flink-connector-dynamodb`
+ Flink SQL/TableAPI/Python applications:
  + kinesis – `org.apache.flink:flink-sql-connector-kinesis`
  + kinesis – `org.apache.flink:flink-sql-connector-aws-kinesis-streams`
  + firehose – `org.apache.flink:flink-sql-connector-aws-kinesis-firehose`
  + dynamodb – `org.apache.flink:flink-sql-connector-dynamodb`

Affected applications will experience the following symptoms:
+ Flink job is in `RUNNING` state, but not processing data;
+ There are no job restarts;
+ Checkpoints are timing out.

The issue is caused by a [bug](https://github.com/aws/aws-sdk-java-v2/issues/4354) in AWS SDK resulting in it not surfacing certain errors to the caller when using the async HTTP client. This results in the sink waiting indefinitely for an “in-flight request” to complete during a checkpoint flush operation.

This issue had been fixed in AWS SDK starting from version **2.20.144**. 

Following are instructions on how to update affected connectors to use the new version of AWS SDK in your applications:

**Topics**
+ [Update Java applications](troubleshooting-async-deadlock-update-java-apps.md)
+ [Update Python applications](troubleshooting-async-deadlock-update-python-apps.md)

# Update Java applications
<a name="troubleshooting-async-deadlock-update-java-apps"></a>

Follow the procedures below to update Java applications:

## flink-connector-kinesis
<a name="troubleshooting-async-deadlock-update-java-apps-flink-connector-kinesis"></a>

If the application uses `flink-connector-kinesis`:

Kinesis connector uses shading to package some dependencies, including the AWS SDK, into the connector jar. To update the AWS SDK version, use the following procedure to replace these shaded classes:

------
#### [ Maven ]

1. Add Kinesis connector and required AWS SDK modules as project dependencies.

1. Configure `maven-shade-plugin`:

   1. Add filter to exclude shaded AWS SDK classes when copying content of the Kinesis connector jar.

   1. Add relocation rule to move updated AWS SDK classes to package, expected by Kinesis connector.

   **pom.xml** 

   ```
   <project>
       ...    
       <dependencies>
           ...
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-connector-kinesis</artifactId>
               <version>1.15.4</version>
           </dependency>
           
           <dependency>
               <groupId>software.amazon.awssdk</groupId>
               <artifactId>kinesis</artifactId>
               <version>2.20.144</version>
           </dependency>
           <dependency>
               <groupId>software.amazon.awssdk</groupId>
               <artifactId>netty-nio-client</artifactId>
               <version>2.20.144</version>
           </dependency>
           <dependency>
               <groupId>software.amazon.awssdk</groupId>
               <artifactId>sts</artifactId>
               <version>2.20.144</version>
           </dependency>
           ...
       </dependencies>
       ...
       <build>
           ...
           <plugins>
               ...
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-shade-plugin</artifactId>
                   <version>3.1.1</version>
                   <executions>
                       <execution>
                           <phase>package</phase>
                           <goals>
                               <goal>shade</goal>
                           </goals>
                           <configuration>
                               ...
                               <filters>
                                   ...
                                   <filter>
                                       <artifact>org.apache.flink:flink-connector-kinesis</artifact>
                                       <excludes>
                                           <exclude>org/apache/flink/kinesis/shaded/software/amazon/awssdk/**</exclude>
                                           <exclude>org/apache/flink/kinesis/shaded/org/reactivestreams/**</exclude>
                                           <exclude>org/apache/flink/kinesis/shaded/io/netty/**</exclude>
                                           <exclude>org/apache/flink/kinesis/shaded/com/typesafe/netty/**</exclude>
                                       </excludes>
                                   </filter>
                                   ...
                               </filters>
                               <relocations>
                                   ...
                                   <relocation>
                                       <pattern>software.amazon.awssdk</pattern>
                                       <shadedPattern>org.apache.flink.kinesis.shaded.software.amazon.awssdk</shadedPattern>
                                   </relocation>
                                   <relocation>
                                       <pattern>org.reactivestreams</pattern>
                                       <shadedPattern>org.apache.flink.kinesis.shaded.org.reactivestreams</shadedPattern>
                                   </relocation>
                                   <relocation>
                                       <pattern>io.netty</pattern>
                                       <shadedPattern>org.apache.flink.kinesis.shaded.io.netty</shadedPattern>
                                   </relocation>
                                   <relocation>
                                       <pattern>com.typesafe.netty</pattern>
                                       <shadedPattern>org.apache.flink.kinesis.shaded.com.typesafe.netty</shadedPattern>
                                   </relocation>
                                   ...
                               </relocations>
                              ...
                           </configuration>
                       </execution>
                   </executions>
               </plugin>
               ...
           </plugins>
           ... 
       </build>
   </project>
   ```

------
#### [ Gradle ]

1. Add Kinesis connector and required AWS SDK modules as project dependencies.

1. Adjust shadowJar configuration:

   1. Exclude shaded AWS SDK classes when copying content of the Kinesis connector jar.

   1. Relocate updated AWS SDK classes to a package expected by Kinesis connector.

   **build.gradle**

   ```
   ...
   dependencies {
       ...
       flinkShadowJar("org.apache.flink:flink-connector-kinesis:1.15.4")
       
       flinkShadowJar("software.amazon.awssdk:kinesis:2.20.144")
       flinkShadowJar("software.amazon.awssdk:sts:2.20.144")
       flinkShadowJar("software.amazon.awssdk:netty-nio-client:2.20.144")
       ...
   }
   ...
   shadowJar {
       configurations = [project.configurations.flinkShadowJar]
   
       exclude("software/amazon/kinesis/shaded/software/amazon/awssdk/**/*")
       exclude("org/apache/flink/kinesis/shaded/org/reactivestreams/**/*.class")
       exclude("org/apache/flink/kinesis/shaded/io/netty/**/*.class")
       exclude("org/apache/flink/kinesis/shaded/com/typesafe/netty/**/*.class")
       
       relocate("software.amazon.awssdk", "org.apache.flink.kinesis.shaded.software.amazon.awssdk")
       relocate("org.reactivestreams", "org.apache.flink.kinesis.shaded.org.reactivestreams")
       relocate("io.netty", "org.apache.flink.kinesis.shaded.io.netty")
       relocate("com.typesafe.netty", "org.apache.flink.kinesis.shaded.com.typesafe.netty")
   }
   ...
   ```

------

## Other affected connectors
<a name="troubleshooting-async-deadlock-update-java-apps-flink-another-connector"></a>

If the application uses another affected connector:

In order to update the AWS SDK version, the SDK version should be enforced in the project build configuration.

------
#### [ Maven ]

Add AWS SDK bill of materials (BOM) to the dependency management section of the `pom.xml` file to enforce SDK version for the project.

**pom.xml**

```
<project>
    ...    
    <dependencyManagement>
        <dependencies>
            ...
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>bom</artifactId>
                <version>2.20.144</version>
                <scope>import</scope>
                <type>pom</type>
            </dependency>
            ...
        </dependencies>
    </dependencyManagement>
    ...
</project>
```

------
#### [ Gradle ]

Add platform dependency on the AWS SDK bill of materials (BOM) to enforce SDK version for the project. This requires Gradle 5.0 or newer:

**build.gradle**

```
...
dependencies {
    ...
    flinkShadowJar(platform("software.amazon.awssdk:bom:2.20.144"))
    ...
}
...
```

------

# Update Python applications
<a name="troubleshooting-async-deadlock-update-python-apps"></a>

Python applications can use connectors in 2 different ways: packaging connectors and other Java dependencies as part of single uber-jar, or use connector jar directly. To fix applications affected by Async Sink deadlock:
+ If the application uses an uber jar, follow the instructions for [Update Java applications](troubleshooting-async-deadlock-update-java-apps.md).
+ To rebuild connector jars from source, use the following steps:

**Building connectors from source:**

Prerequisites, similar to Flink [build requirements](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/flinkdev/building/#build-flink):
+ Java 11
+ Maven 3.2.5

## flink-sql-connector-kinesis
<a name="troubleshooting-async-deadlock-update-python-apps-flink-sql-connector-kinesis"></a>

1. Download source code for Flink 1.15.4:

   ```
   wget https://archive.apache.org/dist/flink/flink-1.15.4/flink-1.15.4-src.tgz
   ```

1. Uncompress source code:

   ```
   tar -xvf flink-1.15.4-src.tgz
   ```

1. Navigate to kinesis connector directory

   ```
   cd flink-1.15.4/flink-connectors/flink-connector-kinesis/
   ```

1. Compile and install connector jar, specifying required AWS SDK version. To speed up build use `-DskipTests` to skip test execution and `-Dfast` to skip additional source code checks:

   ```
   mvn clean install -DskipTests -Dfast -Daws.sdkv2.version=2.20.144
   ```

1. Navigate to kinesis connector directory

   ```
   cd ../flink-sql-connector-kinesis
   ```

1. Compile and install sql connector jar:

   ```
   mvn clean install -DskipTests -Dfast
   ```

1. Resulting jar will be available at:

   ```
   target/flink-sql-connector-kinesis-1.15.4.jar
   ```

## flink-sql-connector-aws-kinesis-streams
<a name="troubleshooting-async-deadlock-update-python-apps-flink-sql-connector-aws-kinesis-streams"></a>

1. Download source code for Flink 1.15.4:

   ```
   wget https://archive.apache.org/dist/flink/flink-1.15.4/flink-1.15.4-src.tgz
   ```

1. Uncompress source code:

   ```
   tar -xvf flink-1.15.4-src.tgz
   ```

1. Navigate to kinesis connector directory

   ```
   cd flink-1.15.4/flink-connectors/flink-connector-aws-kinesis-streams/
   ```

1. Compile and install connector jar, specifying required AWS SDK version. To speed up build use `-DskipTests` to skip test execution and `-Dfast` to skip additional source code checks:

   ```
   mvn clean install -DskipTests -Dfast -Daws.sdk.version=2.20.144
   ```

1. Navigate to kinesis connector directory

   ```
   cd ../flink-sql-connector-aws-kinesis-streams
   ```

1. Compile and install sql connector jar:

   ```
   mvn clean install -DskipTests -Dfast
   ```

1. Resulting jar will be available at:

   ```
   target/flink-sql-connector-aws-kinesis-streams-1.15.4.jar
   ```

## flink-sql-connector-aws-kinesis-firehose
<a name="troubleshooting-async-deadlock-update-python-apps-flink-sql-connector-kinesis-firehose"></a>

1. Download source code for Flink 1.15.4:

   ```
   wget https://archive.apache.org/dist/flink/flink-1.15.4/flink-1.15.4-src.tgz
   ```

1. Uncompress source code:

   ```
   tar -xvf flink-1.15.4-src.tgz
   ```

1. Navigate to connector directory

   ```
   cd flink-1.15.4/flink-connectors/flink-connector-aws-kinesis-firehose/
   ```

1. Compile and install connector jar, specifying required AWS SDK version. To speed up build use `-DskipTests` to skip test execution and `-Dfast` to skip additional source code checks:

   ```
   mvn clean install -DskipTests -Dfast -Daws.sdk.version=2.20.144
   ```

1. Navigate to sql connector directory

   ```
   cd ../flink-sql-connector-aws-kinesis-firehose
   ```

1. Compile and install sql connector jar:

   ```
   mvn clean install -DskipTests -Dfast
   ```

1. Resulting jar will be available at:

   ```
   target/flink-sql-connector-aws-kinesis-firehose-1.15.4.jar
   ```

## flink-sql-connector-dynamodb
<a name="troubleshooting-async-deadlock-update-python-apps-flink-sql-connector-dynamodb"></a>

1. Download source code for Flink 1.15.4:

   ```
   wget https://archive.apache.org/dist/flink/flink-connector-aws-3.0.0/flink-connector-aws-3.0.0-src.tgz
   ```

1. Uncompress source code:

   ```
   tar -xvf flink-connector-aws-3.0.0-src.tgz
   ```

1. Navigate to connector directory

   ```
   cd flink-connector-aws-3.0.0
   ```

1. Compile and install connector jar, specifying required AWS SDK version. To speed up build use `-DskipTests` to skip test execution and `-Dfast` to skip additional source code checks:

   ```
   mvn clean install -DskipTests -Dfast -Dflink.version=1.15.4 -Daws.sdk.version=2.20.144
   ```

1. Resulting jar will be available at:

   ```
   flink-sql-connector-dynamodb/target/flink-sql-connector-dynamodb-3.0.0.jar
   ```

# Amazon Kinesis data streams source processing out of order during re-sharding
<a name="troubleshooting-kinesis-data-streams-processing-out-of-order"></a>

The current FlinkKinesisConsumer implementation doesn’t provide strong ordering guarantees between Kinesis shards. This may lead to out-of-order processing during re-sharding of Kinesis Stream, in particular for Flink applications that experience processing lag. Under some circumstances, for example windows operators based on event times, events might get discarded because of the resulting lateness. 

![\[Diagram showing shards and shard consumers with time progression and trim horizon.\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/flink-ts.png)


This is a [known problem](https://issues.apache.org/jira/browse/FLINK-6349) in Open Source Flink. Until connector fix is made available, ensure your Flink applications are not falling behind Kinesis Data Streams during re-partitioning. By ensuring that the processing delay is tolerated by your Flink apps, you can minimize the impact of out-of-order processing and risk of data loss. 

# Real-time vector embedding blueprints FAQ and troubleshooting
<a name="troubleshooting-blueprints"></a>

Review the following FAQ and troubleshooting sections to troubleshoot real-time vector embedding blueprint issues. For more information about real-time vector embedding blueprints, see [Real-time vector embedding blueprints](https://docs.aws.amazon.com/msk/latest/developerguide/ai-vector-embedding-integration-learn-more.html).

For general Managed Service for Apache Flink application troubleshooting, see [https://docs.aws.amazon.com/managed-flink/latest/java/troubleshooting-runtime.html](https://docs.aws.amazon.com/managed-flink/latest/java/troubleshooting-runtime.html).

**Topics**
+ [Real-time vector embedding blueprints - FAQ](troubleshooting-blueprints-FAQ.md)
+ [Real-time vector embedding blueprints - troubleshooting](troubleshooting-blueprints-TS.md)

# Real-time vector embedding blueprints - FAQ
<a name="troubleshooting-blueprints-FAQ"></a>

Review the following FAQ about real-time vector embedding blueprints. For more information about real-time vector embedding blueprints, see [Real-time vector embedding blueprints](https://docs.aws.amazon.com/msk/latest/developerguide/ai-vector-embedding-integration-learn-more.html).

**Topics**
+ [What AWS resources does this blueprint create?](#troubleshooting-blueprints-1)
+ [What are my actions after the AWS CloudFormation stack deployment is complete?](#troubleshooting-blueprints-2)
+ [What should be the structure of the data in the source Amazon MSK topic(s)?](#troubleshooting-blueprints-3)
+ [Can I specify parts of a message to embed?](#troubleshooting-blueprints-4)
+ [Can I read data from multiple Amazon MSK topics?](#troubleshooting-blueprints-5)
+ [Can I use regex to configure Amazon MSK topic names?](#troubleshooting-blueprints-6)
+ [What is the maximum size of a message that can be read from an Amazon MSK topic?](#troubleshooting-blueprints-7)
+ [What type of OpenSearch is supported?](#troubleshooting-blueprints-8)
+ [Why do I need to use a vector search collection, vector index, and add a vector field in my OpenSearch Serverless colelction?](#troubleshooting-blueprints-9)
+ [What should I set as the dimension for my vector field?](#troubleshooting-blueprints-10)
+ [What does the output look like in the configured OpenSearch index?](#troubleshooting-blueprints-11)
+ [Can I specify metadata fields to add to the document stored in the OpenSearch index?](#troubleshooting-blueprints-12)
+ [Should I expect duplicate entries in the OpenSearch index?](#troubleshooting-blueprints-13)
+ [Can I send data to multiple OpenSearch indices?](#troubleshooting-blueprints-14)
+ [Can I deploy multiple real-time vector embedding applications in a single AWS account?](#troubleshooting-blueprints-15)
+ [Can multiple real-time vector embedding applications use the same data source or sink?](#troubleshooting-blueprints-16)
+ [Does the application support cross-account connectivity?](#troubleshooting-blueprints-17)
+ [Does the application support cross-Region connectivity?](#troubleshooting-blueprints-18)
+ [Can my Amazon MSK cluster and OpenSearch collection be in different VPCs or subnets?](#troubleshooting-blueprints-19)
+ [What embedding models are supported by the application?](#troubleshooting-blueprints-20)
+ [Can I fine-tune the performance of my application based on my workload?](#troubleshooting-blueprints-21)
+ [What Amazon MSK authentication types are supported?](#troubleshooting-blueprints-22)
+ [What is `sink.os.bulkFlushIntervalMillis` and how do I set it?](#troubleshooting-blueprints-23)
+ [When I deploy my Managed Service for Apache Flink application, from what point in the Amazon MSK topic will it begin reading messages?](#troubleshooting-blueprints-24)
+ [How do I use `source.msk.starting.offset`?](#troubleshooting-blueprints-25)
+ [What chunking strategies are supported?](#troubleshooting-blueprints-26)
+ [How do I read records in my vector datastore?](#troubleshooting-blueprints-27)
+ [Where can I find new updates to the source code?](#troubleshooting-blueprints-28)
+ [Can I make a change to the AWS CloudFormation template and update the Managed Service for Apache Flink application?](#troubleshooting-blueprints-29)
+ [Will AWS monitor and maintain the application on my behalf?](#troubleshooting-blueprints-30)
+ [Does this application move my data outside my AWS account?](#troubleshooting-blueprints-31)

## What AWS resources does this blueprint create?
<a name="troubleshooting-blueprints-1"></a>

To find resources deployed in your account, navigate to AWS CloudFormation console and identify the stack name that starts with the name you provided for your Managed Service for Apache Flink application. Choose the **Resources** tab to check the resources that were created as part of the stack. The following are the key resources that the stack creates:
+ Real-time vector embedding Managed Service for Apache Flink application
+ Amazon S3 bucket for holding the source code for the real-time vector embedding application
+ CloudWatch log group and log stream for storing logs
+ Lambda functions for fetching and creating resources
+ IAM roles and policies for Lambdas, Managed Service for Apache Flink application, and accessing Amazon Bedrock and Amazon OpenSearch Service
+ Data access policy for Amazon OpenSearch Service
+ VPC endpoints for accessing Amazon Bedrock and Amazon OpenSearch Service

## What are my actions after the AWS CloudFormation stack deployment is complete?
<a name="troubleshooting-blueprints-2"></a>

After the AWS CloudFormation stack deployment is complete, access the Managed Service for Apache Flink console and find your blueprint Managed Service for Apache Flink application. Choose the **Configure** tab and confirm that all runtime properties are setup correctly. They may overflow to the next page. When you are confident of the settings, choose **Run**. The application will start ingesting messages from your topic.

To check for new releases, see [https://github.com/awslabs/real-time-vectorization-of-streaming-data/releases](https://github.com/awslabs/real-time-vectorization-of-streaming-data/releases).

## What should be the structure of the data in the source Amazon MSK topic(s)?
<a name="troubleshooting-blueprints-3"></a>

We currently support structured and unstructured source data. 
+ Unstructured data is denoted by `STRING` in `source.msk.data.type`. The data is read as is from the incoming message.
+ We currently support structured JSON data, denoted by `JSON` in `source.msk.data.type`. The data must always be in JSON format. If the application receives a malformed JSON, the application will fail. 
+ When using JSON as source data type, make sure that every message in all source topics is a valid JSON. If you subscribe to one or more topics that do not contain JSON objects with this setting, the application will fail. If one or more topics have a mix of structured and unstructured data, we recommended that you configure source data as unstructured in the Managed Service for Apache Flink application. 

## Can I specify parts of a message to embed?
<a name="troubleshooting-blueprints-4"></a>
+ For unstructured input data where `source.msk.data.type` is `STRING`, the application will always embed the entire message and store the entire message in the configured OpenSearch index.
+ For structured input data where `source.msk.data.type` is `JSON`, you can configure `embed.input.config.json.fieldsToEmbed` to specify which field in the JSON object should be selected for embedding. This only works for top-level JSON fields and does not work with nested JSONs and with messages containing a JSON array. Use .\$1 to embed the entire JSON.

## Can I read data from multiple Amazon MSK topics?
<a name="troubleshooting-blueprints-5"></a>

Yes, you can read data from multiple Amazon MSK topics with this application. Data from all topics must be of the same type (either STRING or JSON) or it might cause the application to fail. Data from all topics is always stored in a single OpenSearch index.

## Can I use regex to configure Amazon MSK topic names?
<a name="troubleshooting-blueprints-6"></a>

`source.msk.topic.names` does not support a list of regex. We support either a comma separated list of topic names or `.*` regex to include all topics.

## What is the maximum size of a message that can be read from an Amazon MSK topic?
<a name="troubleshooting-blueprints-7"></a>

The maximum size of a message that can be processed is limited by the Amazon Bedrock InvokeModel body limit that is currently set to 25,000,000. For more information, see [InvokeModel](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html#API_runtime_InvokeModel_RequestBody).

## What type of OpenSearch is supported?
<a name="troubleshooting-blueprints-8"></a>

We support both OpenSearch domains and collections. If you are using an OpenSearch collection, make sure to use a vector collection and create a vector index to use for this application. This will let you use the OpenSearch vector database capabilities for querying your data. To learn more, see[Amazon OpenSearch Service’s vector database capabilities explained](https://aws.amazon.com/blogs/big-data/amazon-opensearch-services-vector-database-capabilities-explained/).

## Why do I need to use a vector search collection, vector index, and add a vector field in my OpenSearch Serverless colelction?
<a name="troubleshooting-blueprints-9"></a>

The *vector search* collection type in OpenSearch Serverless provides a similarity search capability that is scalable and high performing. It streamlines building modern machine learning (ML) augmented search experiences and generative artificial intelligence (AI) applications. For more information, see [Working with vector search collections](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-vector-search.html?icmpid=docs_console_unmapped).

## What should I set as the dimension for my vector field?
<a name="troubleshooting-blueprints-10"></a>

Set the dimension of the vector field based on the embedding model that you want to use. Refer to the following table, and confirm these values from the respective documentation.


**Vector field dimensions**  

| Amazon Bedrock vector embedding model name | Output dimension support offered by the model | 
| --- | --- | 
|  Amazon Titan Text Embeddings V1  | 1,536 | 
|  Amazon Titan Text Embeddings V2  | 1,024 (default), 384, 256 | 
|  Amazon Titan Multimodal Embeddings G1  | 1,024 (default), 384, 256 | 
|  Cohere Embed English  | 1,024 | 
|  Cohere Embed Multilingual  | 1,024 | 

## What does the output look like in the configured OpenSearch index?
<a name="troubleshooting-blueprints-11"></a>

Every document in the OpenSearch index contains following fields:
+ **original\$1data**: The data that was used to generate embeddings. For STRING type, it is the entire message. For JSON object, it is the JSON object that was used for embeddings. It could be the entire JSON in the message or specified fields in the JSON. For example, if name was selected to be embedded from incoming messages, the output would look as follows:

  ```
  "original_data": "{\"name\":\"John Doe\"}"
  ```
+ **embedded\$1data**: A vector float array of embeddings generated by Amazon Bedrock
+ **date**: UTC timestamp at which the document was stored in OpenSearch

## Can I specify metadata fields to add to the document stored in the OpenSearch index?
<a name="troubleshooting-blueprints-12"></a>

No, currently, we do not support adding additional fields to the final document stored in the OpenSearch index.

## Should I expect duplicate entries in the OpenSearch index?
<a name="troubleshooting-blueprints-13"></a>

Depending on how you configured your application, you might see duplicate messages in the index. One common reason is application restart. The application is configured by default to start reading from the earliest message in the source topic. When you change the configuraiton, the application restarts, and processes all messages in the topic again. To avoid re-processing, refer to the documentation on how to use source.msk.starting.offset and correctly set the starting offset for your application.

## Can I send data to multiple OpenSearch indices?
<a name="troubleshooting-blueprints-14"></a>

No, the application supports storing data to a single OpenSearch index. To setup vectorization output to multiple indices, you must deploy separate Managed Service for Apache Flink applications.

## Can I deploy multiple real-time vector embedding applications in a single AWS account?
<a name="troubleshooting-blueprints-15"></a>

Yes, you can deploy multiple real-time vector embedding Managed Service for Apache Flink applications in a single AWS account if every application has a unique name.

## Can multiple real-time vector embedding applications use the same data source or sink?
<a name="troubleshooting-blueprints-16"></a>

Yes, you can create multiple real-time vector embedding Managed Service for Apache Flink applications that read data from the same topic(s) or store data in the same index.

## Does the application support cross-account connectivity?
<a name="troubleshooting-blueprints-17"></a>

No, for the application to run successfully, the Amazon MSK cluster and the OpenSearch collection must be in the same AWS account where you are trying to setup your Managed Service for Apache Flink application.

## Does the application support cross-Region connectivity?
<a name="troubleshooting-blueprints-18"></a>

No, the application only allows you to deploy an Managed Service for Apache Flink application with an Amazon MSK cluster and an OpenSearch collection in the same Region of the Managed Service for Apache Flink application.

## Can my Amazon MSK cluster and OpenSearch collection be in different VPCs or subnets?
<a name="troubleshooting-blueprints-19"></a>

Yes, we support Amazon MSK cluster and OpenSearch collection in different VPCs and subnets as long as they are in the same AWS account. See (General MSF troubleshooting) to make sure your setup is correct.

## What embedding models are supported by the application?
<a name="troubleshooting-blueprints-20"></a>

Currently, the application supports all models that are supported by Bedrock. These include:
+ Amazon Titan Embeddings G1 - Text
+  Amazon Titan Text Embeddings V2
+  Amazon Titan Multimodal Embeddings G1 
+  Cohere Embed English 
+  Cohere Embed Multilingual 

## Can I fine-tune the performance of my application based on my workload?
<a name="troubleshooting-blueprints-21"></a>

Yes. The throughput of the application depends on a number of factors, all of which can be controlled by the customers: 

1. **AWS MSF KPUs**: The application is deployed with default parallelism factor 2 and parallelism per KPU 1, with automatic scaling turned on. However, we recommend that you configure scaling for the Managed Service for Apache Flink application according to your workloads. For more information, see [Review Managed Service for Apache Flink application resources](https://docs.aws.amazon.com/managed-flink/latest/java/how-resources.html).

1. **Amazon Bedrock**: Based on the selected Amazon Bedrock on-demand model, different quotas might apply. Review service quotas in Bedrock to see the workload that the service will be able to handle. For more information, see [Quotas for Amazon Bedrock](https://docs.aws.amazon.com/bedrock/latest/userguide/quotas.html).

1. **Amazon OpenSearch Service**: Additionally, in some situations, you might notice that OpenSearch is the bottleneck in your pipeline. For scaling information, see OpenSearch scaling [Sizing Amazon OpenSearch Service domains](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/sizing-domains.html).

## What Amazon MSK authentication types are supported?
<a name="troubleshooting-blueprints-22"></a>

We only support the IAM MSK authentication type.

## What is `sink.os.bulkFlushIntervalMillis` and how do I set it?
<a name="troubleshooting-blueprints-23"></a>

When sending data to Amazon OpenSearch Service, the bulk flush interval is the interval at which the bulk request is run, regardless of the number of actions or the size of the request. The default value is set to 1 millisecond.

While setting a flush interval can help to make sure that data is indexed timely, it can also lead to increased overhead if set too low. Consider your use case and the importance of timely indexing when choosing a flush interval.

## When I deploy my Managed Service for Apache Flink application, from what point in the Amazon MSK topic will it begin reading messages?
<a name="troubleshooting-blueprints-24"></a>

The application will start reading messages from the Amazon MSK topic at the offset specified by the `source.msk.starting.offset` configuration set in the application’s runtime configuration. If `source.msk.starting.offset` is not explicitly set, the default behavior of the application is to start reading from the earliest available message in the topic.

## How do I use `source.msk.starting.offset`?
<a name="troubleshooting-blueprints-25"></a>

Explicitly set s`ource.msk.starting.offset` to one of the following values, based on desired behavior: 


+  EARLIEST: The default setting, which reads from oldest offset in the partition. This is a good choice especially if: 
  +  You have newly created Amazon MSK topics and consumer applications.
  +  You need to replay data, so you can build or reconstruct state. This is relevant when implementing the event sourcing pattern or when initializing a new service that requires a complete view of the data history. 
+ LATEST: The Managed Service for Apache Flink application will read messages from the end of the partition. We recommend this option if you only care about new messages being produced and don't need to process historical data. In this setting, the consumer will ignore the existing messages and only read new messages published by the upstream producer.
+ COMMITTED: The Managed Service for Apache Flink application will start consuming messages from the committed offset of the consuming group. If the committed offset doesn't exist, the EARLIEST reset strategy will be used. 

## What chunking strategies are supported?
<a name="troubleshooting-blueprints-26"></a>

We are using the [langchain](https://js.langchain.com/v0.1/docs/get_started/introduction/) library to chunk inputs. Chunking is only applied if the length of the input is greater than the chosen `maxSegmentSizeInChars`. We support the following five chunking types:
+ `SPLIT_BY_CHARACTER`: Will fit as many characters as it can into each chunk where each chunk length is no greater than maxSegmentSizeInChars. Doesn’t care about whitespace, so it can cut off words.
+ `SPLIT_BY_WORD`: Will find whitespace characters to chunk by. No words are cut off.
+ `SPLIT_BY_SENTENCE`: Sentence boundaries are detected using the Apache OpenNLP library with the English sentence model.
+ `SPLIT_BY_LINE`: Will find new line characters to chunk by.
+ `SPLIT_BY_PARAGRAPH`: Will find consecutive new line characters to chunk by.

The splitting strategies fall back according to the preceding order, where the larger chunking strategies like `SPLIT_BY_PARAGRAPH` fall back to `SPLIT_BY_CHARACTER`. For example, when using `SPLIT_BY_LINE`, if a line is too long then the line will be sub-chunked by sentence, where each chunk will fit in as many sentences as it can. If there are any sentences that are too long, then it will be chunked at the word-level. If a word is too long, then it will be split by character.

## How do I read records in my vector datastore?
<a name="troubleshooting-blueprints-27"></a>

1. When `source.msk.data.type` is `STRING`
   + **original\$1data**: The entire original string from the Amazon MSK message.
   + **embedded\$1data**: Embedding vector created from `chunk_data` if it is not empty (chunking applied) or created from `original_data` if no chunking was applied.
   + **chunk\$1data**: Only present when the original data was chunked. Contains the chunk of the original message that was used to create the embedding in `embedded_data`.

1. When `source.msk.data.type` is `JSON`
   + **original\$1data**: The entire original JSON from the Amazon MSK message *after* JSON key filtering is applied. 
   + **embedded\$1data**: Embedding vector created from `chunk_data` if it is not empty (chunking applied) or created from `original_data` if no chunking was applied.
   + **chunk\$1key**: Only present when the original data was chunked. Contains the JSON key that the chunk is from in `original_data`. For example, it can look like `jsonKey1.nestedJsonKeyA` for nested keys or *metadata* in the example of `original_data`.
   + **chunk\$1data**: Only present when the original data was chunked. Contains the chunk of the original message that was used to create the embedding in `embedded_data`.

Yes, you can read data from multiple Amazon MSK topics with this application. Data from all topics must be of the same type (either STRING or JSON) or it might cause the application to fail. Data from all topics is always stored in a single OpenSearch index.

## Where can I find new updates to the source code?
<a name="troubleshooting-blueprints-28"></a>

Go to [https://github.com/awslabs/real-time-vectorization-of-streaming-data/releases](https://github.com/awslabs/real-time-vectorization-of-streaming-data/releases) to check for new releases.

## Can I make a change to the AWS CloudFormation template and update the Managed Service for Apache Flink application?
<a name="troubleshooting-blueprints-29"></a>

No, making a change to the AWS CloudFormation template does not update the Managed Service for Apache Flink application. Any new change in AWS CloudFormation implies a new stack needs to be deployed.

## Will AWS monitor and maintain the application on my behalf?
<a name="troubleshooting-blueprints-30"></a>

No, AWS will not monitor, scale, update or patch this application on your behalf. 

## Does this application move my data outside my AWS account?
<a name="troubleshooting-blueprints-31"></a>

All data read and stored by the Managed Service for Apache Flink application stays within your AWS account and never leaves your account.

# Real-time vector embedding blueprints - troubleshooting
<a name="troubleshooting-blueprints-TS"></a>

Review the following troubleshooting topics about real-time vector embedding blueprints. For more information about real-time vector embedding blueprints, see [Real-time vector embedding blueprints](https://docs.aws.amazon.com/msk/latest/developerguide/ai-vector-embedding-integration-learn-more.html).

**Topics**
+ [My CloudFormation stack deployment has failed or rolled back. What can I do to fix it?](#troubleshooting-blueprints-deployment)
+ [I don't want my application to start reading messages from the beginning of the Amazon MSK topics. What do I do?](#troubleshooting-blueprints-beginning)
+ [How do I know if there is an issue with my Managed Service for Apache Flink application and how can I debug it?](#troubleshooting-blueprints-debug)
+ [What are the key metrics that I should be monitoring for my Managed Service for Apache Flink application?](#troubleshooting-blueprints-metrics)

## My CloudFormation stack deployment has failed or rolled back. What can I do to fix it?
<a name="troubleshooting-blueprints-deployment"></a>
+ Go to your CFN stack and find the reason for the stack failure. It could be related to missing permissions, AWS resource name collisions, among other causes. Fix the root cause of the deployment failure. For more information, see the [ CloudWatch troubleshooting guide](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/troubleshooting.html#basic-ts-guide).
+  [Optional] There can only be one VPC endpoint per service per VPC. If you deployed multiple real-time vector embedding blueprints to write to the Amazon OpenSearch Service collections in the same VPC, they might be sharing VPC endpoints. These might either already be present in your account for the VPC, or the first real-time vector embedding blueprint stack will create VPC endpoints for Amazon Bedrock and Amazon OpenSearch Service that will be used by all other stacks deployed in your account. If a stack fails, check if that stack created VPC endpoints for Amazon Bedrock and Amazon OpenSearch Service and delete them if they are not used anywhere else in your account. For steps for deleting VPC endpoints, refer to the documentation on how to safely delete your application.
+ There might be other services or applications in your account using the VPC endpoint. Deleting it might create network disruption for other services. Be careful in deleting these endpoints.

## I don't want my application to start reading messages from the beginning of the Amazon MSK topics. What do I do?
<a name="troubleshooting-blueprints-beginning"></a>

You must explicitly set `source.msk.starting.offset` to one of the following values, depending on the desired behavior:
+ **Earliest offset**: The oldest offset in the partition.
+ **Latest offset**: Consumers will read messages from the end of the partition.
+ **Committed offset**: Read from the last message the consumer processed within a partition.

## How do I know if there is an issue with my Managed Service for Apache Flink application and how can I debug it?
<a name="troubleshooting-blueprints-debug"></a>

Use the [Managed Service for Apache Flink troubleshooting guide](https://docs.aws.amazon.com/managed-flink/latest/java/troubleshooting-runtime.html) to debug Managed Service for Apache Flink related issues with your application.

## What are the key metrics that I should be monitoring for my Managed Service for Apache Flink application?
<a name="troubleshooting-blueprints-metrics"></a>
+ All metrics available for a regular Managed Service for Apache Flink application can help you monitor your application. For more information, see [Metrics and dimensions in Managed Service for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/metrics-dimensions.html).
+ To monitor Amazon Bedrock metrics, see [Amazon CloudWatch metrics for Amazon Bedrock](https://docs.aws.amazon.com/bedrock/latest/userguide/monitoring.html#runtime-cloudwatch-metrics).
+ We have added two new metrics for monitoring performance of generating embeddings. Find them under the `EmbeddingGeneration` operation name in CloudWatch. The two metrics are:
  + **BedrockTitanEmbeddingTokenCount**: Number of tokens present in a single request to Amazon Bedrock.
  + **BedrockEmbeddingGenerationLatencyMs**: Reports the time taken to send and receive a response from Amazon Bedrock for generating embeddings, in milliseconds.
+ For Amazon OpenSearch Service serverless collections, you can use metrics such as `IngestionDataRate`, `IngestionDocumentErrors` and others. For more information, see [Monitoring OpenSearch Serverless with Amazon CloudWatch](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/monitoring-cloudwatch.html).
+ For OpenSearch provisioned metrics, see [Monitoring OpenSearch cluster metrics with Amazon CloudWatch](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/managedomains-cloudwatchmetrics.html).

# Runtime troubleshooting
<a name="troubleshooting-runtime"></a>

This section contains information about diagnosing and fixing runtime issues with your Managed Service for Apache Flink application.

**Topics**
+ [Troubleshooting tools](#troubleshooting-tools)
+ [Application issues](troubleshooting-symptoms.md)
+ [Application is restarting](troubleshooting-rt-restarts.md)
+ [Throughput is too slow](troubleshooting-rt-throughput.md)
+ [Unbounded state growth](troubleshooting-rt-stateleaks.md)
+ [I/O bound operators](troubleshooting-io-bound-operators.md)
+ [Upstream or source throttling from a Kinesis data stream](troubleshooting-source-throttling.md)
+ [Checkpoints](troubleshooting-checkpoints.md)
+ [Checkpointing is timing out](troubleshooting-chk-timeout.md)
+ [Checkpoint failure for Apache Beam application](troubleshooting-chk-failure-beam.md)
+ [Backpressure](troubleshooting-backpressure.md)
+ [Data skew](troubleshooting-data-skew.md)
+ [State skew](troubleshooting-state-skew.md)
+ [Integrate with resources in different Regions](troubleshooting-resources-in-different-regions.md)

## Troubleshooting tools
<a name="troubleshooting-tools"></a>

The primary tool for detecting application issues is CloudWatch alarms. Using CloudWatch alarms, you can set thresholds for CloudWatch metrics that indicate error or bottleneck conditions in your application. For information about recommended CloudWatch alarms, see [Use CloudWatch Alarms with Amazon Managed Service for Apache Flink](monitoring-metrics-alarms.md).

# Application issues
<a name="troubleshooting-symptoms"></a>

This section contains solutions for error conditions that you may encounter with your Managed Service for Apache Flink application.

**Topics**
+ [Application is stuck in a transient status](#troubleshooting-rt-stuck)
+ [Snapshot creation fails](#troubleshooting-rt-snapshots)
+ [Cannot access resources in a VPC](#troubleshooting-rt-vpc)
+ [Data is lost when writing to an Amazon S3 bucket](#troubleshooting-rt-s3)
+ [Application is in the RUNNING status but isn't processing data](#troubleshooting-rt-processing)
+ [Snapshot, application update, or application stop error: InvalidApplicationConfigurationException](#troubleshooting-rt-appconfigexception)
+ [java.nio.file.NoSuchFileException: /usr/local/openjdk-8/lib/security/cacerts](#troubleshooting-rt-fnf)

## Application is stuck in a transient status
<a name="troubleshooting-rt-stuck"></a>

If your application stays in a transient status (`STARTING`, `UPDATING`, `STOPPING`, or `AUTOSCALING`), you can stop your application by using the [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) action with the `Force` parameter set to `true`. You can't force stop an application in the `DELETING` status. Alternatively, if the application is in the `UPDATING` or `AUTOSCALING` status, you can roll it back to the previous running version. When you roll back an application, it loads state data from the last successful snapshot. If the application has no snapshots, Managed Service for Apache Flink rejects the rollback request. For more information about rolling back an application, see [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) action.

**Note**  
Force-stopping your application may lead to data loss or duplication. To prevent data loss or duplicate processing of data during application restarts, we recommend you to take frequent snapshots of your application.

Causes for stuck applications include the following:
+ **Application state is too large:** Having an application state that is too large or too persistent can cause the application to become stuck during a checkpoint or snapshot operation. Check your application's `lastCheckpointDuration` and `lastCheckpointSize` metrics for steadily increasing values or abnormally high values.
+ **Application code is too large:** Verify that your application JAR file is smaller than 512 MB. JAR files larger than 512 MB are not supported.
+ **Application snapshot creation fails:** Managed Service for Apache Flink takes a snapshot of the application during an [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) or [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StopApplication.html) request. The service then uses this snapshot state and restores the application using the updated application configuration to provide *exactly-once* processing semantics.If automatic snapshot creation fails, see [Snapshot creation fails](#troubleshooting-rt-snapshots) following.
+ **Restoring from a snapshot fails:** If you remove or change an operator in an application update and attempt to restore from a snapshot, the restore will fail by default if the snapshot contains state data for the missing operator. In addition, the application will be stuck in either the `STOPPED` or `UPDATING` status. To change this behavior and allow the restore to succeed, change the *AllowNonRestoredState* parameter of the application's [FlinkRunConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_FlinkRunConfiguration.html) to `true`. This will allow the resume operation to skip state data that cannot be mapped to the new program.
+ **Application initialization taking longer:** Managed Service for Apache Flink uses an internal timeout of 5 minutes (soft setting) while waiting for a Flink job to start. If your job is failing to start within this timeout, you will see a CloudWatch log as follows:

  ```
  Flink job did not start within a total timeout of 5 minutes for application: %s under account: %s
  ```

   If you encounter the above error, it means that your operations defined under Flink job’s `main` method are taking more than 5 minutes, causing the Flink job creation to time out on the Managed Service for Apache Flink end. We suggest you check the Flink **JobManager** logs as well as your application code to see if this delay in the `main` method is expected. If not, you need to take steps to address the issue so it completes in under 5 minutes. 

You can check your application status using either the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplications.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplications.html) or the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplication.html) actions.

## Snapshot creation fails
<a name="troubleshooting-rt-snapshots"></a>

The Managed Service for Apache Flink service can't take a snapshot under the following circumstances:
+ The application exceeded the snapshot limit. The limit for snapshots is 1,000. For more information, see [Manage application backups using snapshots](how-snapshots.md).
+ The application doesn't have permissions to access its source or sink.
+ The application code isn't functioning properly.
+ The application is experiencing other configuration issues.

If you get an exception while taking a snapshot during an application update or while stopping the application, set the `SnapshotsEnabled` property of your application's [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html) to `false` and retry the request. 

Snapshots can fail if your application's operators are not properly provisioned. For information about tuning operator performance, see [Operator scaling](performance-improving.md#performance-improving-scaling-op).

After the application returns to a healthy state, we recommend that you set the application's `SnapshotsEnabled` property to `true`.

## Cannot access resources in a VPC
<a name="troubleshooting-rt-vpc"></a>

If your application uses a VPC running on Amazon VPC, do the following to verify that your application has access to its resources:
+ Check your CloudWatch logs for the following error. This error indicates that your application cannot access resources in your VPC:

  ```
  org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
  ```

  If you see this error, verify that your route tables are set up correctly, and that your connectors have the correct connection settings.

  For information about setting up and analyzing CloudWatch logs, see [Logging and monitoring in Amazon Managed Service for Apache Flink](monitoring-overview.md).

## Data is lost when writing to an Amazon S3 bucket
<a name="troubleshooting-rt-s3"></a>

Some data loss might occur when writing output to an Amazon S3 bucket using Apache Flink version 1.6.2. We recommend using the latest supported version of Apache Flink when using Amazon S3 for output directly. To write to an Amazon S3 bucket using Apache Flink 1.6.2, we recommend using Firehose. For more information about using Firehose with Managed Service for Apache Flink, see [Firehose sink](earlier.md#get-started-exercise-fh).

## Application is in the RUNNING status but isn't processing data
<a name="troubleshooting-rt-processing"></a>

You can check your application status by using either the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplications.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplications.html) or the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplication.html) actions. If your application enters the `RUNNING` status but isn't writing data to your sink, you can troubleshoot the issue by adding an Amazon CloudWatch log stream to your application. For more information, see [Work with application CloudWatch logging options](cloudwatch-logs.md#adding_cloudwatch). The log stream contains messages that you can use to troubleshoot application issues.

## Snapshot, application update, or application stop error: InvalidApplicationConfigurationException
<a name="troubleshooting-rt-appconfigexception"></a>

An error similar to the following might occur during a snapshot operation, or during an operation that creates a snapshot, such as updating or stopping an application:

```
An error occurred (InvalidApplicationConfigurationException) when calling the UpdateApplication operation: 

Failed to take snapshot for the application xxxx at this moment. The application is currently experiencing downtime. 
Please check the application's CloudWatch metrics or CloudWatch logs for any possible errors and retry the request. 
You can also retry the request after disabling the snapshots in the Managed Service for Apache Flink console or by updating 
the ApplicationSnapshotConfiguration through the AWS SDK
```

This error occurs when the application is unable to create a snapshot. 

If you encounter this error during a snapshot operation or an operation that creates a snapshot, do the following:
+ Disable snapshots for your application. You can do this either in the Managed Service for Apache Flink console, or by using the `SnapshotsEnabledUpdate` parameter of the [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) action.
+ Investigate why snapshots cannot be created. For more information, see [Application is stuck in a transient status](#troubleshooting-rt-stuck).
+ Reenable snapshots when the application returns to a healthy state.

## java.nio.file.NoSuchFileException: /usr/local/openjdk-8/lib/security/cacerts
<a name="troubleshooting-rt-fnf"></a>

The location of the SSL truststore was updated in a previous deployment. Use the following value for the `ssl.truststore.location` parameter instead:

```
/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts
```

# Application is restarting
<a name="troubleshooting-rt-restarts"></a>

If your application is not healthy, its Apache Flink job continually fails and restarts. This section describes symptoms and troubleshooting steps for this condition.

## Symptoms
<a name="troubleshooting-rt-restarts-symptoms"></a>

This condition can have the following symptoms:
+ The `FullRestarts` metric is not zero. This metric represents the number of times the application's job has restarted since you started the application.
+ The `Downtime` metric is not zero. This metric represents the number of milliseconds that the application is in the `FAILING` or `RESTARTING` status.
+ The application log contains status changes to `RESTARTING` or `FAILED`. You can query your application log for these status changes using the following CloudWatch Logs Insights query: [Analyze errors: Application task-related failures](cloudwatch-logs-reading.md#cloudwatch-logs-reading-apps).

## Causes and solutions
<a name="troubleshooting-rt-restarts-causes"></a>

The following conditions may cause your application to become unstable and repeatedly restart:
+ **Operator is throwing an exception:** If any exception in an operator in your application is unhandled, the application fails over (by interpreting that the failure cannot be handled by operator). The application restarts from the latest checkpoint to maintain "exactly-once" processing semantics. As a result, `Downtime` is not zero during these restart periods. In order to prevent this from happening, we recommend that you handle any retryable exceptions in the application code.

  You can investigate the causes of this condition by querying your application logs for changes from your application's state from `RUNNING` to `FAILED`. For more information, see [Analyze errors: Application task-related failures](cloudwatch-logs-reading.md#cloudwatch-logs-reading-apps).
+ **Kinesis data streams are not properly provisioned:** If a source or sink for your application is a Kinesis data stream, check the [metrics](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html) for the stream for `ReadProvisionedThroughputExceeded` or `WriteProvisionedThroughputExceeded` errors.

  If you see these errors, you can increase the available throughput for the Kinesis stream by increasing the stream's number of shards. For more information, see [ How do I change the number of open shards in Kinesis Data Streams?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-streams-open-shards/).
+ **Other sources or sinks are not properly provisioned or available:** Verify that your application is correctly provisioning sources and sinks. Check that any sources or sinks used in the application (such as other AWS services, or external sources or destinations) are well provisioned, are not experiencing read or write throttling, or are periodically unavailable.

  If you are experiencing throughput-related issues with your dependent services, either increase resources available to those services, or investigate the cause of any errors or unavailability.
+ **Operators are not properly provisioned:** If the workload on the threads for one of the operators in your application is not correctly distributed, the operator can become overloaded and the application can crash. For information about tuning operator parallelism, see [Manage operator scaling properly](performance-improving.md#performance-improving-scaling-op).
+ **Application fails with DaemonException: ** This error appears in your application log if you are using a version of Apache Flink prior to 1.11. You may need to upgrade to a later version of Apache Flink so that a KPL version of 0.14 or later is used. 
+ **Application fails with TimeoutException, FlinkException, or RemoteTransportException:** These errors may appear in your application log if your task managers are crashing. If your application is overloaded, your task managers can experience CPU or memory resource pressure, causing them to fail.

  These errors may look like the following:
  + `java.util.concurrent.TimeoutException: The heartbeat of JobManager with id xxx timed out`
  + `org.apache.flink.util.FlinkException: The assigned slot xxx was removed`
  + `org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager`

  To troubleshoot this condition, check the following:
  + Check your CloudWatch metrics for unusual spikes in CPU or memory usage.
  + Check your application for throughput issues. For more information, see [Troubleshoot performance issues](performance-troubleshooting.md).
  + Examine your application log for unhandled exceptions that your application code is raising.
+ **Application fails with JaxbAnnotationModule Not Found error:** This error occurs if your application uses Apache Beam, but doesn't have the correct dependencies or dependency versions. Managed Service for Apache Flink applications that use Apache Beam must use the following versions of dependencies:

  ```
  <jackson.version>2.10.2</jackson.version>
  ...
  <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-jaxb-annotations</artifactId>
      <version>2.10.2</version>
  </dependency>
  ```

  If you do not provide the correct version of `jackson-module-jaxb-annotations` as an explicit dependency, your application loads it from the environment dependencies, and since the versions do not match, the application crashes at runtime. 

  For more information about using Apache Beam with Managed Service for Apache Flink, see [Use CloudFormationCreating an application using Apache Beam](examples-beam.md).
+ **Application fails with java.io.IOException: Insufficient number of network buffers**

  This happens when an application does not have enough memory allocated for network buffers. Network buffers facilitate communication between subtasks. They are used to store records before transmission over a network, and to store incoming data before dissecting it into records and handing them to subtasks. The number of network buffers required scales directly with the parallelism and complexity of your job graph. There are a number of approaches to mitigate this issue:
  + You can configure a lower `parallelismPerKpu` so that there is more memory allocated per-subtask and network buffers. Note that lowering `parallelismPerKpu` will increase KPU and therefore cost. To avoid this, you can keep the same amount of KPU by lowering parallelism by the same factor.
  + You can simplify your job graph by reducing the number of operators or chaining them so that fewer buffers are needed.
  + Otherwise, you can reach out to https://aws.amazon.com/premiumsupport/ for custom network buffer configuration.

# Throughput is too slow
<a name="troubleshooting-rt-throughput"></a>

If your application is not processing incoming streaming data quickly enough, it will perform poorly and become unstable. This section describes symptoms and troubleshooting steps for this condition. 

## Symptoms
<a name="troubleshooting-rt-throughput-symptoms"></a>

This condition can have the following symptoms:
+ If the data source for your application is a Kinesis stream, the stream's `millisbehindLatest` metric continually increases.
+ If the data source for your application is an Amazon MSK cluster, the cluster's consumer lag metrics continually increase. For more information, see [ Consumer-Lag Monitoring](https://docs.aws.amazon.com/msk/latest/developerguide/consumer-lag.html) in the [ Amazon MSK Developer Guide](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html).
+ If the data source for your application is a different service or source, check any available consumer lag metrics or data available.

## Causes and solutions
<a name="troubleshooting-rt-throughput-causes"></a>

There can be many causes for slow application throughput. If your application is not keeping up with input, check the following:
+ If throughput lag is spiking and then tapering off, check if the application is restarting. Your application will stop processing input while it restarts, causing lag to spike. For information about application failures, see [Application is restarting](troubleshooting-rt-restarts.md).
+ If throughput lag is consistent, check to see if your application is optimized for performance. For information on optimizing your application's performance, see [Troubleshoot performance issues](performance-troubleshooting.md).
+ If throughput lag is not spiking but continuously increasing, and your application is optimized for performance, you must increase your application resources. For information on increasing application resources, see [Implement application scaling](how-scaling.md).
+ If your application reads from a Kafka cluster in a different Region and `FlinkKafkaConsumer` or `KafkaSource` are mostly idle (high `idleTimeMsPerSecond` or low `CPUUtilization`) despite high consumer lag, you can increase the value for `receive.buffer.byte`, such as 2097152. For more information, see the high latency environment section in [Custom MSK configurations](https://docs.aws.amazon.com/msk/latest/developerguide/msk-configuration-properties.html).

For troubleshooting steps for slow throughput or consumer lag increasing in the application source, see [Troubleshoot performance issues](performance-troubleshooting.md).

# Unbounded state growth
<a name="troubleshooting-rt-stateleaks"></a>

If your application is not properly disposing of outdated state information, it will continually accumulate and lead to application performance or stability issues. This section describes symptoms and troubleshooting steps for this condition.

## Symptoms
<a name="troubleshooting-rt-stateleaks-symptoms"></a>

This condition can have the following symptoms:
+ The `lastCheckpointDuration` metric is gradually increasing or spiking.
+ The `lastCheckpointSize` metric is gradually increasing or spiking.

## Causes and solutions
<a name="troubleshooting-rt-stateleaks-causes"></a>

The following conditions may cause your application to accumulate state data: 
+ Your application is retaining state data longer than it is needed.
+ Your application uses window queries with too long a duration.
+ You did not set TTL for your state data. For more information, see [ State Time-To-Live (TTL)](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl) in the Apache Flink Documentation.
+ You are running an application that depends on Apache Beam version 2.25.0 or newer. You can opt out of the new version of the read transform by [extending your BeamApplicationProperties](https://docs.aws.amazon.com/managed-flink/latest/java/examples-beam.html#examples-beam-configure) with the key experiments and value `use_deprecated_read`. For more information, see the [Apache Beam Documentation](https://beam.apache.org/blog/beam-2.25.0/#highlights).

Sometimes applications are facing ever growing state size growth, which is not sustainable in the long term (a Flink application runs indefinitely, after all). Sometimes, this can be traced back to applications storing data in state and not aging out old information properly. But sometimes there are just unreasonable expectations on what Flink can deliver. Applications can use aggregations over large time windows spanning days or even weeks. Unless [AggregateFunctions](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/#aggregatefunction) are used, which allow incremental aggregations, Flink needs to keep the events of the entire window in state.

Moreover, when using process functions to implement custom operators, the application needs to remove data from state that is no longer required for the business logic. In that case, [state time-to-live](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl) can be used to automatically age out data based on processing time. Managed Service for Apache Flink is using incremental checkpoints and thus state ttl is based on [RocksDB compaction](https://github.com/facebook/rocksdb/wiki/Compaction). You can only observe an actual reduction in state size (indicated by checkpoint size) after a compaction operation occurs. In particular for checkpoint sizes below 200 MB, it's unlikely that you observe any checkpoint size reduction as a result of state expiring. However, savepoints are based on a clean copy of the state that does not contain old data, so you can trigger a snapshot in Managed Service for Apache Flink to force the removal of outdated state.

For debugging purposes, it can make sense to disable incremental checkpoints to verify more quickly that the checkpoint size actually decreases or stabilizes (and avoid the effect of compaction in RocksBS). This requires a ticket to the service team, though. 

# I/O bound operators
<a name="troubleshooting-io-bound-operators"></a>

It's best to avoid dependencies to external systems on the data path. It's often much more performant to keep a reference data set in state rather than querying an external system to enrich individual events. However, sometimes there are dependencies that cannot be easily moved to state, e.g., if you want to enrich events with a machine learning model that is hosted on Amazon Sagemaker.

Operators that are interfacing with external systems over the network can become a bottleneck and cause backpressure. It is highly recommended to use [AsyncIO](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio/) to implement the functionality, to reduce the wait time for individual calls and avoid the entire application slowing down.

Moreover, for applications with I/O bound operators it can also make sense to increase the [ParallelismPerKPU](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ParallelismConfiguration.html) setting of the Managed Service for Apache Flink application. This configuration describes the number of parallel subtasks an application can perform per Kinesis Processing Unit (KPU). By increasing the value from the default of 1 to, say, 4, the application leverages the same resources (and has the same cost) but can scale to 4 times the parallelism. This works well for I/O bound applications, but it causes additional overhead for applications that are not I/O bound.

# Upstream or source throttling from a Kinesis data stream
<a name="troubleshooting-source-throttling"></a>

**Symptom**: The application is encountering `LimitExceededExceptions` from their upstream source Kinesis data stream.

**Potential Cause**: The default setting for the Apache Flink library Kinesis connector is set to read from the Kinesis data stream source with a very aggressive default setting for the maximum number of records fetched per `GetRecords` call. Apache Flink is configured by default to fetch 10,000 records per `GetRecords` call (this call is made by default every 200 ms), although the limit per shard is only 1,000 records.

This default behavior can lead to throttling when attempting to consume from the Kinesis data stream, which will affect the applications performance and stability.

You can confirm this by checking the CloudWatch `ReadProvisionedThroughputExceeded` metric and seeing prolonged or sustained periods where this metric is greater than zero.

You can also see this in CloudWatch logs for your Amazon Managed Service for Apache Flink application by observing continued `LimitExceededException` errors.

**Resolution**: You can do one of two things to resolve this scenario:
+ Lower the default limit for the number of records fetched per `GetRecords` call
+ Enable Adaptive Reads in your Amazon Managed Service for Apache Flink application. For more information on the Adaptive Reads feature, see [SHARD\$1USE\$1ADAPTIVE\$1READS](https://nightlies.apache.org/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.html#SHARD_USE_ADAPTIVE_READS)

# Checkpoints
<a name="troubleshooting-checkpoints"></a>

Checkpoints are Flink’s mechanism to ensure that the state of an application is fault tolerant. The mechanism allows Flink to recover the state of operators if the job fails and gives the application the same semantics as failure-free execution. With Managed Service for Apache Flink, the state of an application is stored in RocksDB, an embedded key/value store that keeps its working state on disk. When a checkpoint is taken the state is also uploaded to Amazon S3 so even if the disk is lost then the checkpoint can be used to restore the applications state.

For more information, see [How does State Snapshotting Work?](https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/fault_tolerance/#how-does-state-snapshotting-work).

## Checkpointing stages
<a name="troubleshooting-checkpointing-stages"></a>

For a checkpointing operator subtask in Flink there are 5 main stages:
+ Waiting [**Start Delay**] – Flink uses checkpoint barriers that get inserted into the stream so time in this stage is the time the operator waits for the checkpoint barrier to reach it. 
+ Alignment [**Alignment Duration**] – In this stage the subtask has reached one barrier but it’s waiting for barriers from other input streams. 
+ Sync checkpointing [**Sync Duration**] – This stage is when the subtask actually snapshots the state of the operator and blocks all other activity on the subtask. 
+ Async checkpointing [**Async Duration**] – The majority of this stage is the subtask uploading the state to Amazon S3. During this stage, the subtask is no longer blocked and can process records. 
+ Acknowledging – This is usually a short stage and is simply the subtask sending an acknowledgement to the JobManager and also performing any commit messages (e.g. with Kafka sinks). 

 Each of these stages (apart from Acknowledging) maps to a duration metric for checkpoints that is available from the Flink WebUI, which can help isolate the cause of the long checkpoint.

To see an exact definition of each of the metrics available on checkpoints, go to [History Tab](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/checkpoint_monitoring/#history-tab).

## Investigating
<a name="troubleshooting-checkpoints-investigating"></a>

When investigating long checkpoint duration, the most important thing to determine is the bottleneck for the checkpoint, i.e., what operator and subtask is taking the longest to checkpoint and which stage of that subtask is taking an extended period of time. This can be determined using the Flink WebUI under the jobs checkpoint task. Flink’s Web interface provides data and information that helps to investigate checkpointing issues. For a full breakdown, see [Monitoring Checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/checkpoint_monitoring/).

 The first thing to look at is the **End to End Duration** of each operator in the Job graph to determine which operator is taking long to checkpoint and warrants further investigation. Per the Flink documentation, the definition of the duration is:

*The duration from the trigger timestamp until the latest acknowledgement (or n/a if no acknowledgement received yet). This end to end duration for a complete checkpoint is determined by the last subtask that acknowledges the checkpoint. This time is usually larger than single subtasks need to actually checkpoint the state.*

The other durations for the checkpoint also gives more fine-grained information as to where the time is being spent.

If the **Sync Duration** is high then this indicates something is happening during the snapshotting. During this stage `snapshotState()` is called for classes that implement the snapshotState interface; this can be user code so thread-dumps can be useful for investigating this.

A long **Async Duration** would suggest that a lot of time is being spent on uploading the state to Amazon S3. This can occur if the state is large or if there is a lot of state files that are being uploaded. If this is the case it is worth investigating how state is being used by the application and ensuring that the Flink native data structures are being used where possible ([Using Keyed State](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#using-keyed-state)). Managed Service for Apache Flink configures Flink in such a way as to minimize the number of Amazon S3 calls to ensure this doesn’t get too long. Following is an example of an operator's checkpointing statistics. It shows that the **Async Duration** is relatively long compared to the preceding operator checkpointing statistics.

![\[Investigating checkpointing\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/checkpoint.png)


The **Start Delay** being high would show that the majority of the time is being spent on waiting for the checkpoint barrier to reach the operator. This indicates that the application is taking a while to process records, meaning the barrier is flowing through the job graph slowly. This is usually the case if the Job is backpressured or if an operator(s) is constantly busy. Following is an example of a JobGraph where the second KeyedProcess operator is busy.

![\[Investigating checkpointing\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/checkpoint2.png)


You can investigate what is taking so long by either using Flink Flame Graphs or TaskManager thread dumps. Once the bottle-neck has been identified, it can be investigated further using either Flame-graphs or thread-dumps.

## Thread dumps
<a name="troubleshooting-checkpoints-investigating-thread-dumps"></a>

Thread dumps are another debugging tool that is at a slightly lower level than flame graphs. A thread dump outputs the execution state of all threads at a point in time. Flink takes a JVM thread dump, which is an execution state of all threads within the Flink process. The state of a thread is presented by a stack trace of the thread as well as some additional information. Flame graphs are actually built using multiple stack traces taken in quick succession. The graph is a visualisation made from these traces that makes it easy to identify the common code paths.

```
"KeyedProcess (1/3)#0" prio=5 Id=1423 RUNNABLE
    at app//scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:154)
    at $line33.$read$$iw$$iw$ExpensiveFunction.processElement(<console>>19)
    at $line33.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:14)
    at app//org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at app//org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
    at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at app//org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    ...
```

Above is a snippet of a thread dump taken from the Flink UI for a single thread. The first line contains some general information about this thread including:
+ The thread name *KeyedProcess (1/3)\$10*
+ Priority of the thread *prio=5*
+ A unique thread Id *Id=1423*
+ Thread state *RUNNABLE*

 The name of a thread usually gives information as to the general purpose of the thread. Operator threads can be identified by their name since operator threads have the same name as the operator, as well as an indication of which subtask it is related to, e.g., the *KeyedProcess (1/3)\$10* thread is from the *KeyedProcess* operator and is from the 1st (out of 3) subtask.

Threads can be in one of a few states:
+ NEW – The thread has been created but has not yet been processed
+ RUNNABLE – The thread is execution on the CPU
+ BLOCKED – The thread is waiting for another thread to release it’s lock
+ WAITING – The thread is waiting by using a `wait()`, `join()`, or `park()` method
+ TIMED\$1WAITING – The thread is waiting by using a sleep, wait, join or park method, but with a maximum wait time.

**Note**  
In Flink 1.13, the maximum depth of a single stacktrace in the thread dump is limited to 8. 

**Note**  
Thread dumps should be the last resort for debugging performance issues in a Flink application as they can be challenging to read, require multiple samples to be taken and manually analysed. If at all possible it is preferable to use flame graphs.

### Thread dumps in Flink
<a name="troubleshooting-checkpoints-investigating-thread-dumps-flink"></a>

In Flink, a thread dump can be taken by choosing the **Task Managers** option on the left navigation bar of the Flink UI, selecting a specific task manager, and then navigating to the **Thread Dump** tab. The thread dump can be downloaded, copied to your favorite text editor (or thread dump analyzer), or analyzed directly inside the text view in the Flink Web UI (however, this last option can be a bit clunky.

To determine which Task Manager to take a thread dump of the **TaskManagers** tab can be used when a particular operator is chosen. This shows that the operator is running on different subtasks of an operator and can run on different Task Managers.

![\[Using Thread dumps\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/checkpoint4.png)


The dump will be comprised of multiple stack traces. However when investigating the dump the ones related to an operator are the most important. These can easily be found since operator threads have the same name as the operator, as well as an indication of which subtask it is related to. For example the following stack trace is from the *KeyedProcess* operator and is the first subtask. 

```
"KeyedProcess (1/3)#0" prio=5 Id=595 RUNNABLE
    at app//scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:155)
    at $line360.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:19)
    at $line360.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:14)
    at app//org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at app//org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
    at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at app//org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    ...
```

This can become confusing if there are multiple operators with the same name but we can name operators to get around this. For example:

```
....
.process(new ExpensiveFunction).name("Expensive function")
```

## [Flame graphs](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/)
<a name="troubleshooting-checkpoints-investigating-flame-graphs"></a>

Flame graphs are a useful debugging tool that visualize the stack traces of the targeted code, which allows the most frequent code paths to be identified. They are created by sampling stack traces a number of times. The x-axis of a flame graph shows the different stack profiles, while the y-axis shows the stack depth, and calls in the stack trace. A single rectangle in a flame graph represents on stack frame, and the width of a frame shows how frequently it appears in the stacks. For more details about flame graphs and how to use them, see [Flame Graphs](https://www.brendangregg.com/flamegraphs.html).

In Flink, the flame graph for an operator can be accessed via the Web UI by selecting an operator and then choosing the **FlameGraph** tab. Once enough samples have been collected the flamegraph will be displayed. Following is the FlameGraph for the ProcessFunction that was taking a lot of time to checkpoint.

![\[Using Flame graphs\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/checkpoint3.png)


This is a very simple flame graph and shows that all the CPU time is being spent within a foreach look within the `processElement` of the ExpensiveFunction operator. You also get the line number to help determine where in the code execution is taking place.

# Checkpointing is timing out
<a name="troubleshooting-chk-timeout"></a>

If your application is not optimized or properly provisioned, checkpoints can fail. This section describes symptoms and troubleshooting steps for this condition. 

## Symptoms
<a name="troubleshooting-chk-timeout-symptoms"></a>

If checkpoints fail for your application, the `numberOfFailedCheckpoints` will be greater than zero. 

Checkpoints can fail due to either direct failures, such as application errors, or due to transient failures, such as running out of application resources. Check your application logs and metrics for the following symptoms:
+ Errors in your code.
+ Errors accessing your application's dependent services.
+ Errors serializing data. If the default serializer can't serialize your application data, the application will fail. For information about using a custom serializer in your application, see [Data Types and Serialization](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/) in the Apache Flink Documentation.
+ Out of Memory errors.
+ Spikes or steady increases in the following metrics:
  + `heapMemoryUtilization`
  + `oldGenerationGCTime`
  + `oldGenerationGCCount`
  + `lastCheckpointSize`
  + `lastCheckpointDuration`

For more information about monitoring checkpoints, see [Monitoring Checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/monitoring/checkpoint_monitoring/) in the Apache Flink Documentation.

## Causes and solutions
<a name="troubleshooting-chk-timeout-causes"></a>

Your application log error messages show the cause for direct failures. Transient failures can have the following causes:
+ Your application has insufficient KPU provisioning. For information about increasing application provisioning, see [Implement application scaling](how-scaling.md).
+ Your application state size is too large. You can monitor your application state size using the `lastCheckpointSize` metric.
+ Your application's state data is unequally distributed between keys. If your application uses the `KeyBy` operator, ensure that your incoming data is being divided equally between keys. If most of the data is being assigned to a single key, this creates a bottleneck that causes failures.
+ Your application is experiencing memory or garbage collection backpressure. Monitor your application's `heapMemoryUtilization`, `oldGenerationGCTime`, and `oldGenerationGCCount` for spikes or steadily increasing values.

# Checkpoint failure for Apache Beam application
<a name="troubleshooting-chk-failure-beam"></a>

If your Beam application is configured with [shutdownSourcesAfterIdleMs](https://beam.apache.org/documentation/runners/flink/#:~:text=shutdownSourcesAfterIdleMs) set to 0ms, checkpoints can fail to trigger because tasks are in "FINISHED" state. This section describes symptoms and resolution for this condition. 

## Symptom
<a name="troubleshooting-chk-failure-beam-symptoms"></a>

Go to your Managed Service for Apache Flink application CloudWatch logs and check if the following log message has been logged. The following log message indicates that checkpoint failed to trigger as some tasks has been finished. 

```
                {
                "locationInformation": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:888)",
                "logger": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator",
                "message": "Failed to trigger checkpoint for job your job ID since some tasks of job your job ID has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.",
                "threadName": "Checkpoint Timer",
                "applicationARN": your application ARN,
                "applicationVersionId": "5",
                "messageSchemaVersion": "1",
                "messageType": "INFO"
                }
```

This can also be found on Flink dashboard where some tasks have entered "FINISHED" state, and checkpointing is not possible anymore.

![\[Tasks in "FINISHED" state\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/beam_checkpoint_failure.png)


## Cause
<a name="troubleshooting-chk-failure-beam-causes"></a>

shutdownSourcesAfterIdleMs is a Beam config variable that shuts down sources which have been idle for the configured time of milliseconds. Once a source has been shut down, checkpointing is not possible anymore. This could lead to [checkpoint failure](https://issues.apache.org/jira/browse/FLINK-2491). 

One of the causes for tasks entering "FINISHED" state is when shutdownSourcesAfterIdleMs is set to 0ms, which means that tasks that are idle will be shutdown immediately.

## Solution
<a name="troubleshooting-chk-failure-beam-solution"></a>

To prevent tasks from entering "FINISHED" state immediately, set shutdownSourcesAfterIdleMs to Long.MAX\$1VALUE. This can be done in two ways:
+ Option 1: If your beam configuration is set in your Managed Service for Apache Flink application configuration page, then you can add a new key value pair to set shutdpwnSourcesAfteridleMs as follows:  
![\[Set shutdownSourcesAfterIdleMs to Long.MAX_VALUE\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/beam_checkpoint_failure_solution.png)
+ Option 2: If your beam configuration is set in your JAR file, then you can set shutdownSourcesAfterIdleMs as follows:

  ```
                          FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class); // Initialize Beam Options object
  
                          options.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE); // set shutdownSourcesAfterIdleMs to Long.MAX_VALUE
                          options.setRunner(FlinkRunner.class);
  
                          Pipeline p = Pipeline.create(options); // attach specified options to Beam pipeline
  ```

# Backpressure
<a name="troubleshooting-backpressure"></a>

Flink uses backpressure to adapt the processing speed of individual operators. 

The operator can struggle to keep up processing the message volume it receives for many reasons. The operation may require more CPU resources than the operator has available, The operator may wait for I/O operations to complete. If an operator cannot process events fast enough, it build backpressure in the upstream operators feeding into the slow operator. This causes the upstream operators to slow down, which can further propagate the backpressure to the source and cause the source to adapt to the overall throughput of the application by slowing down as well. You can find a deeper description of backpressure and how it works at [How Apache Flink™ handles backpressure](https://www.ververica.com/blog/how-flink-handles-backpressure).

Knowing which operators in an applications are slow gives you crucial information to understand the root cause of performance problems in the application. Backpressure information is [exposed through the Flink Dashboard](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/monitoring/back_pressure/). To identify the slow operator, look for the operator with a high backpressure value that is closest to a sink (operator B in the following example). The operator causing the slowness is then one of the downstream operators (operator C in the example). B could process events faster, but is backpressured as it cannot forward the output to the actual slow operator C.

```
A (backpressured 93%) -> B (backpressured 85%) -> C (backpressured 11%) -> D (backpressured 0%)
```

Once you have identified the slow operator, try to understand why it's slow. There could be a myriad of reasons and sometimes it's not obvious what's wrong and can require days of debugging and profiling to resolve. Following are some obvious and more common reasons, some of which are further explained below:
+ The operator is doing slow I/O, e.g., network calls (consider using AsyncIO instead).
+ There is a skew in the data and one operator is receiving more events than others (verify by looking at the number of messages in/out of individual subtasks (i.e., instances of the same operator) in the Flink dashboard.
+ It's a resource intensive operation (if there is no data skew consider scaling out for CPU/memory bound work or increasing `ParallelismPerKPU` for I/O bound work)
+ Extensive logging in the operator (reduce the logging to a minimum for production application or consider sending debug output to a data stream instead).

## Testing throughput with the Discarding Sink
<a name="troubleshooting-testing-throughput"></a>

The [Discarding Sink](https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.html) simply disregards all events it receives while still executing the application (an application without any sink fails to execute). This is very useful for throughput testing, profiling, and to verify if the application is scaling properly. It's also a very pragmatic sanity check to verify if the sinks are causing back pressure or the application (but just checking the backpressure metrics is often easier and more straightforward).

By replacing all sinks of an application with a discarding sink and creating a mock source that generates data that r esembles production data, you can measure the maximum throughput of the application for a certain parallelism setting. You can then also increase the parallelism to verify that the application scales properly and does not have a bottleneck that only emerges at higher throughput (e.g., because of data skew).

# Data skew
<a name="troubleshooting-data-skew"></a>

A Flink application is executed on a cluster in a distributed fashion. To scale out to multiple nodes, Flink uses the concept of keyed streams, which essentially means that the events of a stream are partitioned according to a specific key, e.g., customer id, and Flink can then process different partitions on different nodes. Many of the Flink operators are then evaluated based on these partitions, e.g., [Keyed Windows](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/), [Process Functions](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function/) and [Async I/O](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio/).

Choosing a partition key often depends on the business logic. At the same time, many of the best practices for, e.g., [DynamoDB](https://aws.amazon.com/dynamodb/) and Spark, equally apply to Flink, including:
+ ensuring a high cardinality of partition keys
+ avoiding skew in the event volume between partitions

 You can identify skew in the partitions by comparing the records received/sent of subtasks (i.e., instances of the same operator) in the Flink dashboard. In addition, Managed Service for Apache Flink monitoring can be configured to expose metrics for `numRecordsIn/Out` and `numRecordsInPerSecond/OutPerSecond` on a subtask level as well.

# State skew
<a name="troubleshooting-state-skew"></a>

For stateful operators, i.e., operators that maintain state for their business logic such as windows, data skew always leads to state skew. Some subtasks receive more events than others because of the skew in the data and hence are also persisting more data in state. However, even for an application that has evenly balanced partitions, there can be a skew in how much data is persisted in state. For instance, for session windows, some users and sessions respectively may be much longer than others. If the longer sessions happen to be part of the same partition, it can lead to an imbalance of the state size kept by different subtasks of the same operator.

 State skew not only increases more memory and disk resources required by individual subtasks, it can also decrease the overall performance of the application. When an application is taking a checkpoint or savepoint, the operator state is persisted to Amazon S3, to protect the state against node or cluster failure. During this process (especially with exactly once semantics that are enabled by default on Managed Service for Apache Flink), the processing stalls from an external perspective until the checkpoint/savepoint has completed. If there is data skew, the time to complete the operation can be bound by a single subtask that has accumulated a particularly high amount of state. In extreme cases, taking checkpoints/savepoints can fail because of a single subtask not being able to persist state.

 So similar to data skew, state skew can substantially slow down an application.

 To identify state skew, you can leverage the Flink dashboard. Find a recent checkpoint or savepoint and compare the amount of data that has been stored for individual subtasks in the details.

# Integrate with resources in different Regions
<a name="troubleshooting-resources-in-different-regions"></a>

You can enable using `StreamingFileSink` to write to an Amazon S3 bucket in a different Region from your Managed Service for Apache Flink application via a setting required for cross Region replication in the Flink configuration. To do this, file a support ticket at [AWS Support Center](https://console.aws.amazon.com/support/home#/).