

# Flink 1.15 Async Sink Deadlock
<a name="troubleshooting-async-deadlock"></a>

There is a [known issue](https://issues.apache.org/jira/browse/FLINK-32230) with AWS connectors for Apache Flink implementing AsyncSink interface. This affects applications using Flink 1.15 with the following connectors: 
+ For Java applications:
  + KinesisStreamsSink – `org.apache.flink:flink-connector-kinesis`
  + KinesisStreamsSink – `org.apache.flink:flink-connector-aws-kinesis-streams`
  + KinesisFirehoseSink – `org.apache.flink:flink-connector-aws-kinesis-firehose`
  + DynamoDbSink – `org.apache.flink:flink-connector-dynamodb`
+ Flink SQL/TableAPI/Python applications:
  + kinesis – `org.apache.flink:flink-sql-connector-kinesis`
  + kinesis – `org.apache.flink:flink-sql-connector-aws-kinesis-streams`
  + firehose – `org.apache.flink:flink-sql-connector-aws-kinesis-firehose`
  + dynamodb – `org.apache.flink:flink-sql-connector-dynamodb`

Affected applications will experience the following symptoms:
+ Flink job is in `RUNNING` state, but not processing data;
+ There are no job restarts;
+ Checkpoints are timing out.

The issue is caused by a [bug](https://github.com/aws/aws-sdk-java-v2/issues/4354) in AWS SDK resulting in it not surfacing certain errors to the caller when using the async HTTP client. This results in the sink waiting indefinitely for an “in-flight request” to complete during a checkpoint flush operation.

This issue had been fixed in AWS SDK starting from version **2.20.144**. 

Following are instructions on how to update affected connectors to use the new version of AWS SDK in your applications:

**Topics**
+ [Update Java applications](troubleshooting-async-deadlock-update-java-apps.md)
+ [Update Python applications](troubleshooting-async-deadlock-update-python-apps.md)

# Update Java applications
<a name="troubleshooting-async-deadlock-update-java-apps"></a>

Follow the procedures below to update Java applications:

## flink-connector-kinesis
<a name="troubleshooting-async-deadlock-update-java-apps-flink-connector-kinesis"></a>

If the application uses `flink-connector-kinesis`:

Kinesis connector uses shading to package some dependencies, including the AWS SDK, into the connector jar. To update the AWS SDK version, use the following procedure to replace these shaded classes:

------
#### [ Maven ]

1. Add Kinesis connector and required AWS SDK modules as project dependencies.

1. Configure `maven-shade-plugin`:

   1. Add filter to exclude shaded AWS SDK classes when copying content of the Kinesis connector jar.

   1. Add relocation rule to move updated AWS SDK classes to package, expected by Kinesis connector.

   **pom.xml** 

   ```
   <project>
       ...    
       <dependencies>
           ...
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-connector-kinesis</artifactId>
               <version>1.15.4</version>
           </dependency>
           
           <dependency>
               <groupId>software.amazon.awssdk</groupId>
               <artifactId>kinesis</artifactId>
               <version>2.20.144</version>
           </dependency>
           <dependency>
               <groupId>software.amazon.awssdk</groupId>
               <artifactId>netty-nio-client</artifactId>
               <version>2.20.144</version>
           </dependency>
           <dependency>
               <groupId>software.amazon.awssdk</groupId>
               <artifactId>sts</artifactId>
               <version>2.20.144</version>
           </dependency>
           ...
       </dependencies>
       ...
       <build>
           ...
           <plugins>
               ...
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-shade-plugin</artifactId>
                   <version>3.1.1</version>
                   <executions>
                       <execution>
                           <phase>package</phase>
                           <goals>
                               <goal>shade</goal>
                           </goals>
                           <configuration>
                               ...
                               <filters>
                                   ...
                                   <filter>
                                       <artifact>org.apache.flink:flink-connector-kinesis</artifact>
                                       <excludes>
                                           <exclude>org/apache/flink/kinesis/shaded/software/amazon/awssdk/**</exclude>
                                           <exclude>org/apache/flink/kinesis/shaded/org/reactivestreams/**</exclude>
                                           <exclude>org/apache/flink/kinesis/shaded/io/netty/**</exclude>
                                           <exclude>org/apache/flink/kinesis/shaded/com/typesafe/netty/**</exclude>
                                       </excludes>
                                   </filter>
                                   ...
                               </filters>
                               <relocations>
                                   ...
                                   <relocation>
                                       <pattern>software.amazon.awssdk</pattern>
                                       <shadedPattern>org.apache.flink.kinesis.shaded.software.amazon.awssdk</shadedPattern>
                                   </relocation>
                                   <relocation>
                                       <pattern>org.reactivestreams</pattern>
                                       <shadedPattern>org.apache.flink.kinesis.shaded.org.reactivestreams</shadedPattern>
                                   </relocation>
                                   <relocation>
                                       <pattern>io.netty</pattern>
                                       <shadedPattern>org.apache.flink.kinesis.shaded.io.netty</shadedPattern>
                                   </relocation>
                                   <relocation>
                                       <pattern>com.typesafe.netty</pattern>
                                       <shadedPattern>org.apache.flink.kinesis.shaded.com.typesafe.netty</shadedPattern>
                                   </relocation>
                                   ...
                               </relocations>
                              ...
                           </configuration>
                       </execution>
                   </executions>
               </plugin>
               ...
           </plugins>
           ... 
       </build>
   </project>
   ```

------
#### [ Gradle ]

1. Add Kinesis connector and required AWS SDK modules as project dependencies.

1. Adjust shadowJar configuration:

   1. Exclude shaded AWS SDK classes when copying content of the Kinesis connector jar.

   1. Relocate updated AWS SDK classes to a package expected by Kinesis connector.

   **build.gradle**

   ```
   ...
   dependencies {
       ...
       flinkShadowJar("org.apache.flink:flink-connector-kinesis:1.15.4")
       
       flinkShadowJar("software.amazon.awssdk:kinesis:2.20.144")
       flinkShadowJar("software.amazon.awssdk:sts:2.20.144")
       flinkShadowJar("software.amazon.awssdk:netty-nio-client:2.20.144")
       ...
   }
   ...
   shadowJar {
       configurations = [project.configurations.flinkShadowJar]
   
       exclude("software/amazon/kinesis/shaded/software/amazon/awssdk/**/*")
       exclude("org/apache/flink/kinesis/shaded/org/reactivestreams/**/*.class")
       exclude("org/apache/flink/kinesis/shaded/io/netty/**/*.class")
       exclude("org/apache/flink/kinesis/shaded/com/typesafe/netty/**/*.class")
       
       relocate("software.amazon.awssdk", "org.apache.flink.kinesis.shaded.software.amazon.awssdk")
       relocate("org.reactivestreams", "org.apache.flink.kinesis.shaded.org.reactivestreams")
       relocate("io.netty", "org.apache.flink.kinesis.shaded.io.netty")
       relocate("com.typesafe.netty", "org.apache.flink.kinesis.shaded.com.typesafe.netty")
   }
   ...
   ```

------

## Other affected connectors
<a name="troubleshooting-async-deadlock-update-java-apps-flink-another-connector"></a>

If the application uses another affected connector:

In order to update the AWS SDK version, the SDK version should be enforced in the project build configuration.

------
#### [ Maven ]

Add AWS SDK bill of materials (BOM) to the dependency management section of the `pom.xml` file to enforce SDK version for the project.

**pom.xml**

```
<project>
    ...    
    <dependencyManagement>
        <dependencies>
            ...
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>bom</artifactId>
                <version>2.20.144</version>
                <scope>import</scope>
                <type>pom</type>
            </dependency>
            ...
        </dependencies>
    </dependencyManagement>
    ...
</project>
```

------
#### [ Gradle ]

Add platform dependency on the AWS SDK bill of materials (BOM) to enforce SDK version for the project. This requires Gradle 5.0 or newer:

**build.gradle**

```
...
dependencies {
    ...
    flinkShadowJar(platform("software.amazon.awssdk:bom:2.20.144"))
    ...
}
...
```

------

# Update Python applications
<a name="troubleshooting-async-deadlock-update-python-apps"></a>

Python applications can use connectors in 2 different ways: packaging connectors and other Java dependencies as part of single uber-jar, or use connector jar directly. To fix applications affected by Async Sink deadlock:
+ If the application uses an uber jar, follow the instructions for [Update Java applications](troubleshooting-async-deadlock-update-java-apps.md).
+ To rebuild connector jars from source, use the following steps:

**Building connectors from source:**

Prerequisites, similar to Flink [build requirements](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/flinkdev/building/#build-flink):
+ Java 11
+ Maven 3.2.5

## flink-sql-connector-kinesis
<a name="troubleshooting-async-deadlock-update-python-apps-flink-sql-connector-kinesis"></a>

1. Download source code for Flink 1.15.4:

   ```
   wget https://archive.apache.org/dist/flink/flink-1.15.4/flink-1.15.4-src.tgz
   ```

1. Uncompress source code:

   ```
   tar -xvf flink-1.15.4-src.tgz
   ```

1. Navigate to kinesis connector directory

   ```
   cd flink-1.15.4/flink-connectors/flink-connector-kinesis/
   ```

1. Compile and install connector jar, specifying required AWS SDK version. To speed up build use `-DskipTests` to skip test execution and `-Dfast` to skip additional source code checks:

   ```
   mvn clean install -DskipTests -Dfast -Daws.sdkv2.version=2.20.144
   ```

1. Navigate to kinesis connector directory

   ```
   cd ../flink-sql-connector-kinesis
   ```

1. Compile and install sql connector jar:

   ```
   mvn clean install -DskipTests -Dfast
   ```

1. Resulting jar will be available at:

   ```
   target/flink-sql-connector-kinesis-1.15.4.jar
   ```

## flink-sql-connector-aws-kinesis-streams
<a name="troubleshooting-async-deadlock-update-python-apps-flink-sql-connector-aws-kinesis-streams"></a>

1. Download source code for Flink 1.15.4:

   ```
   wget https://archive.apache.org/dist/flink/flink-1.15.4/flink-1.15.4-src.tgz
   ```

1. Uncompress source code:

   ```
   tar -xvf flink-1.15.4-src.tgz
   ```

1. Navigate to kinesis connector directory

   ```
   cd flink-1.15.4/flink-connectors/flink-connector-aws-kinesis-streams/
   ```

1. Compile and install connector jar, specifying required AWS SDK version. To speed up build use `-DskipTests` to skip test execution and `-Dfast` to skip additional source code checks:

   ```
   mvn clean install -DskipTests -Dfast -Daws.sdk.version=2.20.144
   ```

1. Navigate to kinesis connector directory

   ```
   cd ../flink-sql-connector-aws-kinesis-streams
   ```

1. Compile and install sql connector jar:

   ```
   mvn clean install -DskipTests -Dfast
   ```

1. Resulting jar will be available at:

   ```
   target/flink-sql-connector-aws-kinesis-streams-1.15.4.jar
   ```

## flink-sql-connector-aws-kinesis-firehose
<a name="troubleshooting-async-deadlock-update-python-apps-flink-sql-connector-kinesis-firehose"></a>

1. Download source code for Flink 1.15.4:

   ```
   wget https://archive.apache.org/dist/flink/flink-1.15.4/flink-1.15.4-src.tgz
   ```

1. Uncompress source code:

   ```
   tar -xvf flink-1.15.4-src.tgz
   ```

1. Navigate to connector directory

   ```
   cd flink-1.15.4/flink-connectors/flink-connector-aws-kinesis-firehose/
   ```

1. Compile and install connector jar, specifying required AWS SDK version. To speed up build use `-DskipTests` to skip test execution and `-Dfast` to skip additional source code checks:

   ```
   mvn clean install -DskipTests -Dfast -Daws.sdk.version=2.20.144
   ```

1. Navigate to sql connector directory

   ```
   cd ../flink-sql-connector-aws-kinesis-firehose
   ```

1. Compile and install sql connector jar:

   ```
   mvn clean install -DskipTests -Dfast
   ```

1. Resulting jar will be available at:

   ```
   target/flink-sql-connector-aws-kinesis-firehose-1.15.4.jar
   ```

## flink-sql-connector-dynamodb
<a name="troubleshooting-async-deadlock-update-python-apps-flink-sql-connector-dynamodb"></a>

1. Download source code for Flink 1.15.4:

   ```
   wget https://archive.apache.org/dist/flink/flink-connector-aws-3.0.0/flink-connector-aws-3.0.0-src.tgz
   ```

1. Uncompress source code:

   ```
   tar -xvf flink-connector-aws-3.0.0-src.tgz
   ```

1. Navigate to connector directory

   ```
   cd flink-connector-aws-3.0.0
   ```

1. Compile and install connector jar, specifying required AWS SDK version. To speed up build use `-DskipTests` to skip test execution and `-Dfast` to skip additional source code checks:

   ```
   mvn clean install -DskipTests -Dfast -Dflink.version=1.15.4 -Daws.sdk.version=2.20.144
   ```

1. Resulting jar will be available at:

   ```
   flink-sql-connector-dynamodb/target/flink-sql-connector-dynamodb-3.0.0.jar
   ```