

# Managed Service for Apache Flink: How it works
<a name="how-it-works"></a>

Managed Service for Apache Flink is a fully managed Amazon service that lets you use an Apache Flink application to process streaming data. First, you program your Apache Flink application, and then you create your Managed Service for Apache Flink application.

## Program your Apache Flink application
<a name="how-it-works-programming"></a>

An Apache Flink application is a Java or Scala application that is created with the Apache Flink framework. You author and build your Apache Flink application locally. 

Applications primarily use either the [DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html) or the [ Table API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/). The other Apache Flink APIs are also available for you to use, but they are less commonly used in building streaming applications.

The features of the two APIs are as follows:

### DataStream API
<a name="how-it-works-prog-datastream"></a>

The Apache Flink DataStream API programming model is based on two components:
+ **Data stream:** The structured representation of a continuous flow of data records.
+ **Transformation operator:** Takes one or more data streams as input, and produces one or more data streams as output.

Applications created with the DataStream API do the following:
+ Read data from a Data Source (such as a Kinesis stream or Amazon MSK topic).
+ Apply transformations to the data, such as filtering, aggregation, or enrichment.
+ Write the transformed data to a Data Sink.

Applications that use the DataStream API can be written in Java or Scala, and can read from a Kinesis data stream, a Amazon MSK topic, or a custom source.

Your application processes data by using a *connector*. Apache Flink uses the following types of connectors: 
+ **Source**: A connector used to read external data.
+ **Sink**: A connector used to write to external locations. 
+ **Operator**: A connector used to process data within the application.

A typical application consists of at least one data stream with a source, a data stream with one or more operators, and at least one data sink.

For more information about using the DataStream API, see [Review DataStream API components](how-datastream.md).

### Table API
<a name="how-it-works-prog-table"></a>

The Apache Flink Table API programming model is based on the following components:
+ **Table Environment:** An interface to underlying data that you use to create and host one or more tables. 
+ **Table:** An object providing access to a SQL table or view.
+ **Table Source:** Used to read data from an external source, such as an Amazon MSK topic.
+ **Table Function:** A SQL query or API call used to transform data.
+ **Table Sink:** Used to write data to an external location, such as an Amazon S3 bucket.

Applications created with the Table API do the following:
+ Create a `TableEnvironment` by connecting to a `Table Source`. 
+ Create a table in the `TableEnvironment` using either SQL queries or Table API functions.
+ Run a query on the table using either Table API or SQL
+ Apply transformations on the results of the query using Table Functions or SQL queries.
+ Write the query or function results to a `Table Sink`.

Applications that use the Table API can be written in Java or Scala, and can query data using either API calls or SQL queries. 

For more information about using the Table API, see [Review Table API components](how-table.md).

## Create your Managed Service for Apache Flink application
<a name="how-it-works-app"></a>

Managed Service for Apache Flink is an AWS service that creates an environment for hosting your Apache Flink application and provides it with the following settings::
+ **[Use runtime properties](how-properties.md): ** Parameters that you can provide to your application. You can change these parameters without recompiling your application code.
+ **[Implement fault tolerance](how-fault.md)**: How your application recovers from interrupts and restarts.
+ **[Logging and monitoring in Amazon Managed Service for Apache Flink](monitoring-overview.md)**: How your application logs events to CloudWatch Logs. 
+ **[Implement application scaling](how-scaling.md)**: How your application provisions computing resources.

You create your Managed Service for Apache Flink application using either the console or the AWS CLI. To get started creating a Managed Service for Apache Flink application, see [Tutorial: Get started using the DataStream API in Managed Service for Apache Flink](getting-started.md).

# Create a Managed Service for Apache Flink application
<a name="how-creating-apps"></a>

This topic contains information about creating a Managed Service for Apache Flink application.

**Topics**
+ [

## Build your Managed Service for Apache Flink application code
](#how-creating-apps-building)
+ [

## Create your Managed Service for Apache Flink application
](#how-creating-apps-creating)
+ [

## Use customer managed keys
](#how-creating-apps-use-cmk)
+ [

## Start your Managed Service for Apache Flink application
](#how-creating-apps-starting)
+ [

## Verify your Managed Service for Apache Flink application
](#how-creating-apps-verifying)
+ [

# Enable system rollbacks for your Managed Service for Apache Flink application
](how-system-rollbacks.md)

## Build your Managed Service for Apache Flink application code
<a name="how-creating-apps-building"></a>

This section describes the components that you use to build the application code for your Managed Service for Apache Flink application. 

We recommend that you use the latest supported version of Apache Flink for your application code. For information about upgrading Managed Service for Apache Flink applications, see [Use in-place version upgrades for Apache Flink](how-in-place-version-upgrades.md). 

You build your application code using [Apache Maven](https://maven.apache.org/). An Apache Maven project uses a `pom.xml` file to specify the versions of components that it uses. 

**Note**  
Managed Service for Apache Flink supports JAR files up to 512 MB in size. If you use a JAR file larger than this, your application will fail to start.

Applications can now use the Java API from any Scala version. You must bundle the Scala standard library of your choice into your Scala applications.

For information about creating a Managed Service for Apache Flink application that uses **Apache Beam**, see [Use Apache Beam with Managed Service for Apache Flink applications](how-creating-apps-beam.md).

### Specify your application's Apache Flink version
<a name="how-creating-apps-building-flink"></a>

When using Managed Service for Apache Flink Runtime version 1.1.0 and later, you specify the version of Apache Flink that your application uses when you compile your application. You provide the version of Apache Flink with the `-Dflink.version` parameter. For example, if you are using Apache Flink 2.2.0, provide the following:

```
mvn package -Dflink.version=2.2.0
```

For building applications with earlier versions of Apache Flink, see [Earlier versions](earlier.md).

## Create your Managed Service for Apache Flink application
<a name="how-creating-apps-creating"></a>

After you've built your application code, you do the following to create your Managed Service for Apache Flink (Amazon MSF) application:
+ **Upload your Application code**: Upload your application code to an Amazon S3 bucket. You specify the S3 bucket name and object name of your application code when you create your application. For a tutorial that shows how to upload your application code, see the [Tutorial: Get started using the DataStream API in Managed Service for Apache Flink](getting-started.md) tutorial.
+ **Create your Managed Service for Apache Flink application**: Use one of the following methods to create your Amazon MSF application:
**Note**  
Amazon MSF encrypts your application by default using AWS owned keys. You can also create your new application using AWS KMS customer managed keys (CMKs) to create, own, and manage your keys yourself. For information about CMKs, see [Key management in Amazon Managed Service for Apache Flink](key-management-flink.md).
  + **Create your Amazon MSF application using the AWS console:** You can create and configure your application using the AWS console. 

    When you create your application using the console, your application's dependent resources (such as CloudWatch Logs streams, IAM roles, and IAM policies) are created for you. 

    When you create your application using the console, you specify what version of Apache Flink your application uses by selecting it from the pull-down on the **Managed Service for Apache Flink - Create application** page. 

    For a tutorial about how to use the console to create an application, see the [Tutorial: Get started using the DataStream API in Managed Service for Apache Flink](getting-started.md) tutorial.
  + **Create your Amazon MSF application using the AWS CLI:** You can create and configure your application using the AWS CLI. 

    When you create your application using the CLI, you must also create your application's dependent resources (such as CloudWatch Logs streams, IAM roles, and IAM policies) manually.

    When you create your application using the CLI, you specify what version of Apache Flink your application uses by using the `RuntimeEnvironment` parameter of the `CreateApplication` action.
**Note**  
You can change the `RuntimeEnvironment` of an existing application. To learn how, see [Use in-place version upgrades for Apache Flink](how-in-place-version-upgrades.md).

## Use customer managed keys
<a name="how-creating-apps-use-cmk"></a>

In Amazon MSF, customer managed keys (CMKs) is a feature using which you can encrypt your application's data with a key that you create, own, and manage on AWS Key Management Service (AWS KMS). For an Amazon MSF application, this means all data subject to a Flink [checkpoint](how-fault.md) or [snapshot](how-snapshots.md) are encrypted with a CMK you define for that application.

To use CMK with your application, you must first [create your new application](#how-creating-apps-creating), and then apply a CMK. For more information about using CMKs, see [Key management in Amazon Managed Service for Apache Flink](key-management-flink.md).

## Start your Managed Service for Apache Flink application
<a name="how-creating-apps-starting"></a>

After you have built your application code, uploaded it to S3, and created your Managed Service for Apache Flink application, you then start your application. Starting a Managed Service for Apache Flink application typically takes several minutes.

Use one of the following methods to start your application:
+ **Start your Managed Service for Apache Flink application using the AWS console:** You can run your application by choosing **Run** on your application's page in the AWS console.
+ **Start your Managed Service for Apache Flink application using the AWS API:** You can run your application using the [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) action. 

## Verify your Managed Service for Apache Flink application
<a name="how-creating-apps-verifying"></a>

You can verify that your application is working in the following ways:
+ **Using CloudWatch Logs:** You can use CloudWatch Logs and CloudWatch Logs Insights to verify that your application is running properly. For information about using CloudWatch Logs with your Managed Service for Apache Flink application, see [Logging and monitoring in Amazon Managed Service for Apache Flink](monitoring-overview.md).
+ **Using CloudWatch Metrics:** You can use CloudWatch Metrics to monitor your application's activity, or activity in the resources your application uses for input or output (such as Kinesis streams, Firehose streams, or Amazon S3 buckets.) For more information about CloudWatch metrics, see [Working with Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html) in the Amazon CloudWatch User Guide.
+ **Monitoring Output Locations:** If your application writes output to a location (such as an Amazon S3 bucket or database), you can monitor that location for written data.

# Enable system rollbacks for your Managed Service for Apache Flink application
<a name="how-system-rollbacks"></a>

With system-rollback capability, you can achieve higher availability of your running Apache Flink application on Amazon Managed Service for Apache Flink. Opting into this configuration enables the service to automatically revert the application to the previously running version when an action such as `UpdateApplication` or `autoscaling` runs into code or configurations bugs.

**Note**  
To use the system rollback feature, you must opt in by updating your application. Existing applications will not automatically use system rollback by default.

## How it works
<a name="how-rollback-works"></a>

When you initiate an application operation, such as an update or scaling action, the Amazon Managed Service for Apache Flink first attempts to run that operation. If it detects issues that prevent the operation from succeeding, such as code bugs or insufficient permissions, the service automatically initiates a `RollbackApplication` operation.

The rollback attempts to restore the application to the previous version that ran successfully, along with the associated application state. If the rollback is successful, your application continues processing data with minimal downtime using the previous version. If the automatic rollback also fails, Amazon Managed Service for Apache Flink transitions the application to the `READY` status, so that you can take further actions, including fixing the error and retrying the operation. 

You must opt in to use automatic system rollbacks. You can enable it using the console or API for all operations on your application from this point forward. 

The following example request for the `UpdateApplication` action enables system rollbacks for an application:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSystemRollbackConfigurationUpdate": { 
         "RollbackEnabledUpdate": "true"
       }
    }
}
```

## Review common scenarios for automatic system rollback
<a name="common-scenarios"></a>

The following scenarios illustrate where automatic system rollbacks are beneficial:
+ **Application updates:** If you update your application with new code that has bugs when initializing the Flink job through the main method, the automatic rollback allows the previous working version to be restored. Other update scenarios where system rollbacks are helpful include:
  + If your application is updated to run with a parallelism higher than [maxParallelism](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html#how-scaling-auto).
  + If your application is updated to run with incorrect subnets for a VPC application that results in a failure during the Flink job startup. 
+ **Flink version upgrades:** When you upgrade to a new Apache Flink version and the upgraded application encounters a snapshot compatibility issue, system rollback lets you revert to the prior Flink version automatically. 
+ **AutoScaling:** When the application scales up but runs into issues restoring from a savepoint, due to operator mismatch between the snapshot and the Flink job graph.

## Use operation APIs for system rollbacks
<a name="operation-apis"></a>

To provide better visibility, Amazon Managed Service for Apache Flink has two APIs related to application operations that can help you track failures and related system rollbacks.

`ListApplicationOperations`

This API lists all operations performed on the application, including `UpdateApplication`, `Maintenance`, `RollbackApplication`, and others in reverse chronological order. The following example request for the `ListApplicationOperations` action lists the first 10 application operations for the application:

```
{
   "ApplicationName": "MyApplication",
   "Limit": 10
}
```

This following example request for `ListApplicationOperations` helps filter the list to previous updates on the application:

```
{
   "ApplicationName": "MyApplication",
   "operation": "UpdateApplication"
}
```

`DescribeApplicationOperation`

This API provides detailed information about a specific operation listed by `ListApplicationOperations`, including the reason for failure, if applicable. The following example request for the `DescribeApplicationOperation` action lists details for a specific application operation:

```
{
   "ApplicationName": "MyApplication",
   "OperationId": "xyzoperation"
}
```

For troubleshooting information, see [System rollback best practices](troubleshooting-system-rollback.md).

# Run a Managed Service for Apache Flink application
<a name="how-running-apps"></a>

This topic contains information about running a Managed Service for Apache Flink.

When you run your Managed Service for Apache Flink application, the service creates an Apache Flink job. An Apache Flink job is the execution lifecycle of your Managed Service for Apache Flink application. The execution of the job, and the resources it uses, are managed by the Job Manager. The Job Manager separates the execution of the application into tasks. Each task is managed by a Task Manager. When you monitor your application's performance, you can examine the performance of each Task Manager, or of the Job Manager as a whole. 

For information about Apache Flink jobs, see [Jobs and Scheduling](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/) in the Apache Flink Documentation.

## Identify application and job status
<a name="how-running-job-status"></a>

Both your application and the application's job have a current execution status:
+ **Application status:** Your application has a current status that describes its phase of execution. Application statuses include the following:
  + **Steady application statuses:** Your application typically stays in these statuses until you make a status change:
    + **READY:** A new or stopped application is in the READY status until you run it.
    + **RUNNING:** An application that has successfully started is in the RUNNING status.
  + **Transient application statuses:** An application in these statuses is typically in the process of transitioning to another status. If an application stays in a transient status for a length of time, you can stop the application using the [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) action with the `Force` parameter set to `true`. These statuses include the following:
    + `STARTING:` Occurs after the [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) action. The application is transitioning from the `READY` to the `RUNNING` status.
    + `STOPPING:` Occurs after the [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) action. The application is transitioning from the `RUNNING` to the `READY` status.
    + `DELETING:` Occurs after the [DeleteApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplication.html) action. The application is in the process of being deleted.
    + `UPDATING:` Occurs after the [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) action. The application is updating, and will transition back to the `RUNNING` or `READY` status.
    + `AUTOSCALING:` The application has the `AutoScalingEnabled` property of the [ ParallelismConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ParallelismConfiguration.html) set to `true`, and the service is increasing the parallelism of the application. When the application is in this status, the only valid API action you can use is the [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) action with the `Force` parameter set to `true`. For information about automatic scaling, see [Use automatic scaling in Managed Service for Apache Flink](how-scaling-auto.md).
    + `FORCE_STOPPING:` Occurs after the [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) action is called with the `Force` parameter set to `true`. The application is in the process of being force stopped. The application transitions from the `STARTING`, `UPDATING`, `STOPPING`, or `AUTOSCALING` status to the `READY` status.
    + `ROLLING_BACK:` Occurs after the [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) action is called. The application is in the process of being rolled back to a previous version. The application transitions from the `UPDATING` or `AUTOSCALING` status to the `RUNNING` status.
    + `MAINTENANCE:` Occurs while Managed Service for Apache Flink applies patches to your application. For more information, see [Manage maintenance tasks for Managed Service for Apache Flink](maintenance.md).

  You can check your application's status using the console, or by using the [DescribeApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplication.html) action.
+ **Job status:** When your application is in the `RUNNING` status, your job has a status that describes its current execution phase. A job starts in the `CREATED` status, and then proceeds to the `RUNNING` status when it has started. If error conditions occur, your application enters the following status: 
  + For applications using Apache Flink 1.11 and later, your application enters the `RESTARTING` status.
  + For applications using Apache Flink 1.8 and prior, your application enters the `FAILING` status.

  The application then proceeds to either the `RESTARTING` or `FAILED` status, depending on whether the job can be restarted. 

  You can check the job's status by examining your application's CloudWatch log for status changes.

## Run batch workloads
<a name="batch-workloads"></a>

Managed Service for Apache Flink supports running Apache Flink batch workloads. In a batch job, when an Apache Flink job gets to the **FINISHED** status, Managed Service for Apache Flink application status is set to **READY**. For more information about Flink job statuses, see [Jobs and Scheduling](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/internals/job_scheduling/).

# Review Managed Service for Apache Flink application resources
<a name="how-resources"></a>

This section describes the system resources that your application uses. Understanding how Managed Service for Apache Flink provisions and uses resources will help you design, create, and maintain a performant and stable Managed Service for Apache Flink application.

## Managed Service for Apache Flink application resources
<a name="how-resources-kda"></a>

Managed Service for Apache Flink is an AWS service that creates an environment for hosting your Apache Flink application. The Managed Service for Apache Flink service provides resources using units called **Kinesis Processing Units (KPUs)**.

One KPU represents the following system resources:
+ One CPU core
+ 4 GB of memory, of which one GB is native memory and three GB are heap memory
+ 50 GB of disk space

KPUs run applications in distinct execution units called **tasks** and **subtasks**. You can think of a subtask as the equivalent of a thread.

The number of KPUs available to an application is equal to the application's `Parallelism` setting, divided by the application's `ParallelismPerKPU` setting. 

For more information about application parallelism, see [Implement application scaling](how-scaling.md).

## Apache Flink application resources
<a name="how-resources-flink"></a>

The Apache Flink environment allocates resources for your application using units called **task slots**. When Managed Service for Apache Flink allocates resources for your application, it assigns one or more Apache Flink task slots to a single KPU. The number of slots assigned to a single KPU is equal to your application's `ParallelismPerKPU` setting. For more information about task slots, see [ Job Scheduling](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/) in the Apache Flink Documentation.

### Operator parallelism
<a name="how-resources-flink-operatorparallelism"></a>

You can set the maximum number of subtasks that an operator can use. This value is called **Operator Parallelism**. By default, the parallelism of each operator in your application is equal to the application's parallelism. This means that by default, each operator in your application can use all of the available subtasks in the application if needed.

You can set the parallelism of the operators in your application using the `setParallelism` method. Using this method, you can control the number of subtasks each operator can use at one time.

For more information about operators, see [ Operators](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) in the Apache Flink Documentation.

### Operator chaining
<a name="how-resources-flink-operatorchaining"></a>

Normally, each operator uses a separate subtask to execute, but if several operators always execute in sequence, the runtime can assign them all to the same task. This process is called **Operator Chaining**.

Several sequential operators can be chained into a single task if they all operate on the same data. The following are some of the criteria needed for this to be true:
+ The operators do 1-to-1 simple forwarding.
+ The operators all have the same operator parallelism.

When your application chains operators into a single subtask, it conserves system resources, because the service doesn't need to perform network operations and allocate subtasks for each operator. To determine if your application is using operator chaining, look at the job graph in the Managed Service for Apache Flink console. Each vertex in the application represents one or more operators. The graph shows operators that have been chained as a single vertex.

# Per second billing in Managed Service for Apache Flink
<a name="how-pricing"></a>

Managed Service for Apache Flink is now billed in one-second increments. There is a ten-minute minimum charge per application. Per-second billing is applicable to applications that are newly launched or already running. This section describes how Managed Service for Apache Flink meters and bills you for your usage. To learn more about Managed Service for Apache Flink pricing, see [Amazon Managed Service for Apache Flink Pricing](https://aws.amazon.com/managed-service-apache-flink/pricing/). 

## How it works
<a name="how-resources-kda"></a>

Managed Service for Apache Flink charges you for the duration and number of **Kinesis Processing Units (KPUs)** that are billed in one-second increments in the supported AWS Regions. A single KPU comprises 1vCPU compute and 4 GB of memory. You are charged an hourly rate based on the number of KPUs used to run your applications. 

For example, an application running for 20 minutes and 10 seconds will be charged for 20 minutes and 10 seconds, multiplied by the resources it used. An application that is running for 5 minutes will be charged the ten-minute minimum, multiplied by the resources it used.

Managed Service for Apache Flink states usage in hours. For example, 15 minutes corresponds to 0.25 hours. 

For Apache Flink applications, you are charged a single additional KPU per application, used for orchestration. Applications are also charged for running storage and durable backups. Running application storage is used for stateful processing capabilities in Managed Service for Apache Flink and is charged per GB/month. Durable backups are optional and provide point-in-time recovery for applications, charged per GB/month. 

In streaming mode, Managed Service for Apache Flink automatically scales the number of KPUs required by your stream processing application as the demands of memory and compute fluctuate. You can choose to provision your application with the required number of KPUs. 

## AWS Region availability
<a name="how-pricing-regions"></a>

**Note**  
At this time, per second billing is not available in the following Regions: AWS GovCloud (US-East), AWS GovCloud (US-West), China (Beijing), and China (Ningxia).

Per second billing is available in the following AWS Regions: 
+ US East (N. Virginia) - us-east-1
+ US East (Ohio) - us-east-2
+ US West (N. California) - us-west-1
+ US West (Oregon) - us-west-2
+ Africa (Cape Town) - af-south-1
+ Asia Pacific (Hong Kong) - ap-east-1
+ Asia Pacific (Hyderabad) - ap-south-1
+ Asia Pacific (Jakarta) - ap-southeast-3
+ Asia Pacific (Melbourne) - ap-southeast-4
+ Asia Pacific (Mumbai) - ap-south-1
+ Asia Pacific (Osaka) - ap-northeast-3
+ Asia Pacific (Seoul) - ap-northeast-2
+ Asia Pacific (Singapore) - ap-southeast-1
+ Asia Pacific (Sydney) - ap-southeast-2
+ Asia Pacific (Tokyo) - ap-northeast-1
+ Canada (Central) - ca-central-1
+ Canada West (Calgary) - ca-west-1
+ Europe (Frankfurt) - eu-central-1
+ Europe (Ireland) - eu-west-1
+ Europe (London) - eu-west-2
+ Europe (Milan) - eu-south-1
+ Europe (Paris) - eu-west-3
+ Europe (Spain) - eu-south-2
+ Europe (Stockholm) - eu-north-1
+ Europe (Zurich) - eu-central-2
+ Israel (Tel Aviv) - il-central-1
+ Middle East (Bahrain) - me-south-1
+ Middle East (UAE) - me-central-1
+ South America (São Paulo) - sa-east-1

## Pricing examples
<a name="how-pricing-examples"></a>

You can find pricing examples on the Managed Service for Apache Flink pricing page. For more information, see [Amazon Managed Service for Apache Flink Pricing](https://aws.amazon.com/managed-service-apache-flink/pricing/). Following are further examples with Cost Usage Report illustrations for each.

### A long running, heavy workload
<a name="pricing-example-1"></a>

You are a large Video streaming service and you would like to build a real-time video recommendation based on your users’ interactions. You use an Apache Flink application in Managed Service for Apache Flink to continuously ingest user interaction events from multiple Kinesis data streams and to process events in real time before outputting to a downstream system. User interaction events are transformed using several operators. This includes partitioning data by event type, enriching data with additional metadata, sorting data by timestamp, and buffering data for 5 minutes before delivery. The application has many transformation steps that are compute-intensive and parallelizable. Your Flink application is configured to run with 20 KPUs to accommodate the workload. Your application uses 1 GB of durable application backup every day. The monthly Managed Service for Apache Flink charges will be computed as follows:

**Monthly charges**

The price in the US East (N. Virginia) Region is \$10.11 per KPU-hour. Managed Service for Apache Flink allocates 50 GB of running application storage per KPU and charges \$10.10 per GB/month.
+ Monthly KPU charges: 24 hours \$1 30 days \$1 (20 KPUs \$1 1 additional KPU for streaming application) \$1 \$10.11/hour = \$11,584.00
+ Monthly running application storage charges: 30 days \$1 20 KPUs \$1 50 GB/KPUs \$1 \$10.10/GB-month = \$1100.00
+ Monthly durable application storage charges: 30 days \$1 1 GB \$1 0.023/GB-month = \$10.03
+ Total charges: \$11,584.00 \$1 \$1100 \$1 \$10.03 = **\$11,684.03**

**Cost usage report for Managed Service for Apache Flink on the Billing and Cost Management console for the month**

Kinesis Analytics
+ USD 1,684.03 - US East (N. Virginia)
+ Amazon Kinesis Analytics CreateSnapshot
  + \$10.023 per GB-month of durable application backups
    + 1 GB-month - USD 0.03
+ Amazon Kinesis Analytics StartApplication
  + \$10.10 per GB-month of running application storage
    + 1,000 GB-month - USD 100
  + \$10.11 per Kinesis Processing Unit-hour for Apache Flink applications
    + 15,120 KPU-hour - USD 1,584

### A batch workload that runs for \$115 minutes every day
<a name="pricing-example-2"></a>

You use an Apache Flink application in Managed Service for Apache Flink to transform log data in Amazon Simple Storage Service (Amazon S3) in batch mode. The log data is transformed using several operators. This includes applying a schema to the different log events, partitioning data by event type, and sorting data by timestamp. The application has many transformation steps, but none are computationally intensive. This application ingests data at 2,000 records/second for 15 minutes every day in a 30-day month. You do not create any durable application backups. The monthly Managed Service for Apache Flink charges will be computed as follows:

**Monthly charges**

The price in the US East (N. Virginia) Region is \$10.11 per KPU-hour. Managed Service for Apache Flink allocates 50 GB of running application storage per KPU and charges \$10.10 per GB/month.
+ Batch Workload: During the 15 minutes per day, the Managed Service for Apache Flink application is processing 2,000 records/second, which takes 2KPUs. 30 days/month \$1 15 minutes/day = 450 minutes/month
+ Monthly KPU charges: 450 minutes/month \$1 (2KPUs \$1 1 additional KPU for streaming application) \$1 \$10.11/hour = \$12.48
+ Monthly running application storage charges: 450 minutes/month \$1 2 KPUs \$1 50 GB/KPUs \$1 \$10.10/GB-month = \$10.11
+ Total charges: \$12.48 \$1 0.11 = **\$12.59**

**Cost usage report for Managed Service for Apache Flink on the Billing and Cost Management console for the month**

Kinesis Analytics
+ USD 2.59 - US East (N. Virginia)
+ Amazon Kinesis Analytics StartApplication
  + \$10.10 per GB-month of running application backups
    + 1.042 GB-month - USD 0.11
  + \$10.11 per Kinesis Processing Unit-hour for Apache Flink applications
    + 22.5 KPU-Hour - USD 2.48

### A test application that stops and starts continuously in the same hour, attracting multiple minimum charges
<a name="pricing-example-3"></a>

You’re a large ecommerce platform that processes millions of transactions every day. You want to develop real-time fraud detection. You use an Apache Flink application in Managed Service for Apache Flink to ingest transaction events from Kinesis Data Streams and process events in real-time with different transformation steps. This includes using a sliding window to aggregate events, partitioning events by event types, and applying specific detection rules for different event types. During development, you start and stop your application multiple times to test and debug behavior. There are occasions when your application only runs for a few minutes. There is an hour when you’re testing your application with 4 KPUs and your application does not use any durable application backups:
+ At 10:05 AM, you start your application, which runs for 30 minutes before it’s stopped at 10:35 AM.
+ At 10:40 AM, you start your application again, which runs for 5 minutes before it’s stopped at 10:45 AM.
+ At 10:50 AM, you start the application again, which runs for 2 minutes before it’s stopped at 10:52 AM.

Managed Service for Apache Flink charges a minimum of 10 minutes of usage each time an application starts running. The monthly Managed Service for Apache Flink usage for your application will be computed as follows:
+ First time your application starts and stops: 30 minutes of usage
+ Second time your application starts and stops: 10 minutes of usage (your application runs for 5 minutes rounded up to the 10 minutes minimum charge)
+ Third time your application starts and stops: 10 minutes of usage (your application runs for 2 minutes, rounded up to the 10 minutes minimum charge)

In total, your application would be charged for 50 minutes of usage. If there are no other times in the month your application is running, the monthly Managed Service for Apache Flink charges will be computed as follows:

**Monthly charges**

The price in the US East (N. Virginia) Region is \$10.11 per KPU-hour. Managed Service for Apache Flink allocates 50 GB of running application storage per KPU and charges \$10.10 per GB/month.
+ Monthly KPU charges: 50 minutes \$1 (4KPUs \$1 1 additional KPU for streaming application) \$1 \$10.11/hour = \$10.46 (rounded to the nearest penny)
+ Monthly running application storage charges: 50 minutes \$1 4 KPUs \$1 50 GB/KPUs \$1 \$10.10/GB-month = \$10.03 (rounded to the nearest penny)
+ Total charges: \$10.46 \$1 0.03 = **\$10.49**

**Cost usage report for Managed Service for Apache Flink on the Billing and Cost Management console for the month**

Kinesis Analytics
+ USD 0.49 - US East (N. Virginia)
+ Amazon Kinesis Analytics StartApplication
  + \$10.10 per GB-month of running application storage
    + 0.232 GB-month - USD 0.03
  + \$10.11 per Kinesis Processing Unit-hour for Apache Flink applications
    + 4.167 KPU-Hour - USD 0.46

# Review DataStream API components
<a name="how-datastream"></a>

Your Apache Flink application uses the [ Apache Flink DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) to transform data in a data stream. 

This section describes the different components that move, transform, and track data:
+ [Use connectors to move data in Managed Service for Apache Flink with the DataStream API](how-connectors.md): These components move data between your application and external data sources and destinations.
+ [Transform data using operators in Managed Service for Apache Flink with the DataStream API](how-operators.md): These components transform or group data elements within your application.
+ [Track events in Managed Service for Apache Flink using the DataStream API](how-time.md): This topic describes how Managed Service for Apache Flink tracks events when using the DataStream API.

# Use connectors to move data in Managed Service for Apache Flink with the DataStream API
<a name="how-connectors"></a>

In the Amazon Managed Service for Apache Flink DataStream API, *connectors* are software components that move data into and out of a Managed Service for Apache Flink application. Connectors are flexible integrations that let you read from files and directories. Connectors consist of complete modules for interacting with Amazon services and third-party systems.

Types of connectors include the following:
+ [Add streaming data sources](how-sources.md): Provide data to your application from a Kinesis data stream, file, or other data source.
+ [Write data using sinks](how-sinks.md): Send data from your application to a Kinesis data stream, Firehose stream, or other data destination.
+ [Use Asynchronous I/O](how-async.md): Provides asynchronous access to a data source (such as a database) to enrich stream events. 

## Available connectors
<a name="how-connectors-list"></a>

The Apache Flink framework contains connectors for accessing data from a variety of sources. For information about connectors available in the Apache Flink framework, see [Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/) in the [Apache Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

**Warning**  
If you have applications running on Flink 1.6, 1.8, 1.11 or 1.13 and would like to run in Middle East (UAE), Asia Pacific (Hyderabad), Israel (Tel Aviv), Europe (Zurich), Middle East (UAE), Asia Pacific (Melbourne) or Asia Pacific (Jakarta) Regions, you might have to rebuild your application archive with an updated connector or upgrade to Flink 1.18.   
Apache Flink connectors are stored in their own open source repositories. If you're upgrading to version 1.18 or later, you must update your dependencies. To access the repository for Apache Flink AWS connectors, see [flink-connector-aws](https://github.com/apache/flink-connector-aws).  
The former Kinesis source `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` is discontinued and might be removed with a future release of Flink. Use [Kinesis Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source) instead.  
There is no state compatibility between the `FlinkKinesisConsumer` and `KinesisStreamsSource`. For details, see [Migrating existing jobs to new Kinesis Streams Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer) in the Apache Flink documentation.  
 Following are the recommended guidelines:   


**Connector upgrades**  

| Flink version | Connector used | Resolution | 
| --- | --- | --- | 
| 1.19, 1.20 | Kinesis Source |  When upgrading to Managed Service for Apache Flink version 1.19 and 1.20, make sure that you are using the most recent Kinesis Data Streams source connector. That must be any version 5.0.0 or later. For more information, see [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/).  | 
| 1.19, 1.20 | Kinesis Sink |  When upgrading to Managed Service for Apache Flink version 1.19 and 1.20, make sure that you are using the most recent Kinesis Data Streams sink connector. That must be any version 5.0.0 or later. For more information, see [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink).  | 
| 1.19, 1.20 | DynamoDB Streams Source |  When upgrading to Managed Service for Apache Flink version 1.19 and 1.20, make sure that you are using the most recent DynamoDB Streams source connector. That must be any version 5.0.0 or later. For more information, see [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/).  | 
| 1.19, 1.20 | DynamoDB Sink | When upgrading to Managed Service for Apache Flink version 1.19 and 1.20, make sure that you are using the most recent DynamoDB sink connector. That must be any version 5.0.0 or later. For more information, see [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/). | 
| 1.19, 1.20 | Amazon SQS Sink |  When upgrading to Managed Service for Apache Flink version 1.19 and 1.20, make sure that you are using the most recent Amazon SQS sink connector. That must be any version 5.0.0 or later. For more information, see [Amazon SQS Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/).  | 
| 1.19, 1.20 | Amazon Managed Service for Prometheus Sink |  When upgrading to Managed Service for Apache Flink version 1.19 and 1.20, make sure that you are using the most recent Amazon Managed Service for Prometheus sink connector. That must be any version 1.0.0 or later. For more information, see [Prometheus Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/).  | 

# Add streaming data sources to Managed Service for Apache Flink
<a name="how-sources"></a>

Apache Flink provides connectors for reading from files, sockets, collections, and custom sources. In your application code, you use an [Apache Flink source](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources) to receive data from a stream. This section describes the sources that are available for Amazon services.

## Use Kinesis data streams
<a name="input-streams"></a>

The `KinesisStreamsSource` provides streaming data to your application from an Amazon Kinesis data stream. 

### Create a `KinesisStreamsSource`
<a name="input-streams-create"></a>

The following code example demonstrates creating a `KinesisStreamsSource`:

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

For more information about using a `KinesisStreamsSource`, see [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) in the Apache Flink documentation and [our public KinesisConnectors example on Github](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors).

### Create a `KinesisStreamsSource` that uses an EFO consumer
<a name="input-streams-efo"></a>

The `KinesisStreamsSource` now supports [Enhanced Fan-Out (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/). 

If a Kinesis consumer uses EFO, the Kinesis Data Streams service gives it its own dedicated bandwidth, rather than having the consumer share the fixed bandwidth of the stream with the other consumers reading from the stream.

For more information about using EFO with the Kinesis consumer, see [ FLIP-128: Enhanced Fan Out for AWS Kinesis Consumers](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers).

You enable the EFO consumer by setting the following parameters on the Kinesis consumer:
+ **READER\$1TYPE: ** Set this parameter to **EFO** for your application to use an EFO consumer to access the Kinesis Data Stream data. 
+ **EFO\$1CONSUMER\$1NAME: ** Set this parameter to a string value that is unique among the consumers of this stream. Re-using a consumer name in the same Kinesis Data Stream will cause the previous consumer using that name to be terminated. 

To configure a `KinesisStreamsSource` to use EFO, add the following parameters to the consumer:

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

For an example of a Managed Service for Apache Flink application that uses an EFO consumer, see [our public Kinesis Connectors example on Github](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors).

## Use Amazon MSK
<a name="input-msk"></a>

The `KafkaSource` source provides streaming data to your application from an Amazon MSK topic. 

### Create a `KafkaSource`
<a name="input-msk-create"></a>

The following code example demonstrates creating a `KafkaSource`:

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

For more information about using a `KafkaSource`, see [MSK Replication](earlier.md#example-msk).

# Write data using sinks in Managed Service for Apache Flink
<a name="how-sinks"></a>

In your application code, you can use any [Apache Flink sink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) connector to write into external systems, including AWS services, such as Kinesis Data Streams and DynamoDB.

Apache Flink also provides sinks for files and sockets, and you can implement custom sinks. Among the several supported sinks, the following are frequently used:

## Use Kinesis data streams
<a name="sinks-streams"></a>

Apache Flink provides information about the [Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/) in the Apache Flink documentation.

For an example of an application that uses a Kinesis data stream for input and output, see [Tutorial: Get started using the DataStream API in Managed Service for Apache Flink](getting-started.md).

## Use Apache Kafka and Amazon Managed Streaming for Apache Kafka (MSK)
<a name="sinks-MSK"></a>

The [Apache Flink Kafka connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink) provides extensive support for publishing data to Apache Kafka and Amazon MSK, including exactly once guarantees. To learn how to write to Kafka, see [Kafka Connectors examples](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors) in the Apache Flink documentation.

## Use Amazon S3
<a name="sinks-s3"></a>

You can use the Apache Flink `StreamingFileSink` to write objects to an Amazon S3 bucket.

For an example about how to write objects to S3, see [Example: Writing to an Amazon S3 bucket](earlier.md#examples-s3). 

## Use Firehose
<a name="sinks-firehose"></a>

The `FlinkKinesisFirehoseProducer` is a reliable, scalable Apache Flink sink for storing application output using the [Firehose](https://docs.aws.amazon.com/firehose/latest/dev/) service. This section describes how to set up a Maven project to create and use a `FlinkKinesisFirehoseProducer`.

**Topics**
+ [

### Create a `FlinkKinesisFirehoseProducer`
](#sinks-firehose-create)
+ [

### `FlinkKinesisFirehoseProducer` Code Example
](#sinks-firehose-sample)

### Create a `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

The following code example demonstrates creating a `FlinkKinesisFirehoseProducer`:

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### `FlinkKinesisFirehoseProducer` Code Example
<a name="sinks-firehose-sample"></a>

The following code example demonstrates how to create and configure a `FlinkKinesisFirehoseProducer` and send data from an Apache Flink data stream to the Firehose service.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

For a complete tutorial about how to use the Firehose sink, see [Example: Writing to Firehose](earlier.md#get-started-exercise-fh).

# Use Asynchronous I/O in Managed Service for Apache Flink
<a name="how-async"></a>

An Asynchronous I/O operator enriches stream data using an external data source such as a database. Managed Service for Apache Flink enriches the stream events asynchronously so that requests can be batched for greater efficiency. 

For more information, see [Asynchronous I/O](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/) in the Apache Flink Documentation.

# Transform data using operators in Managed Service for Apache Flink with the DataStream API
<a name="how-operators"></a>

To transform incoming data in a Managed Service for Apache Flink, you use an Apache Flink *operator*. An Apache Flink operator transforms one or more data streams into a new data stream. The new data stream contains modified data from the original data stream. Apache Flink provides more than 25 pre-built stream processing operators. For more information, see [Operators](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) in the Apache Flink Documentation.

**Topics**
+ [

## Use transform operators
](#how-operators-transform)
+ [

## Use aggregation operators
](#how-operators-agg)

## Use transform operators
<a name="how-operators-transform"></a>

The following is an example of a simple text transformation on one of the fields of a JSON data stream. 

This code creates a transformed data stream. The new data stream has the same data as the original stream, with the string "` Company`" appended to the contents of the `TICKER` field.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## Use aggregation operators
<a name="how-operators-agg"></a>

The following is an example of an aggregation operator. The code creates an aggregated data stream. The operator creates a 5-second tumbling window and returns the sum of the `PRICE` values for the records in the window with the same `TICKER` value.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

For more code examples, see [Examples for creating and working with Managed Service for Apache Flink applications](examples-collapsibles.md). 

# Track events in Managed Service for Apache Flink using the DataStream API
<a name="how-time"></a>

Managed Service for Apache Flink tracks events using the following timestamps:
+ **Processing Time:** Refers to the system time of the machine that is executing the respective operation.
+ **Event Time:** Refers to the time that each individual event occurred on its producing device.
+ **Ingestion Time:** Refers to the time that events enter the Managed Service for Apache Flink service.

You set the time used by the streaming environment using `setStreamTimeCharacteristic`. 

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

For more information about timestamps, see [Generating Watermarks](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/) in the Apache Flink documentation.

# Review Table API components
<a name="how-table"></a>

Your Apache Flink application uses the [Apache Flink Table API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/tableapi/) to interact with data in a stream using a relational model. You use the Table API to access data using Table sources, and then use Table functions to transform and filter table data. You can transform and filter tabular data using either API functions or SQL commands. 

This section contains the following topics:
+ [Table API connectors](how-table-connectors.md): These components move data between your application and external data sources and destinations.
+ [Table API time attributes](how-table-timeattributes.md): This topic describes how Managed Service for Apache Flink tracks events when using the Table API.

# Table API connectors
<a name="how-table-connectors"></a>

In the Apache Flink programming model, connectors are components that your application uses to read or write data from external sources, such as other AWS services.

With the Apache Flink Table API, you can use the following types of connectors:
+ [Table API sources](#how-table-connectors-source): You use Table API source connectors to create tables within your `TableEnvironment` using either API calls or SQL queries.
+ [Table API sinks](#how-table-connectors-sink): You use SQL commands to write table data to external sources such as an Amazon MSK topic or an Amazon S3 bucket.

## Table API sources
<a name="how-table-connectors-source"></a>

You create a table source from a data stream. The following code creates a table from an Amazon MSK topic:

```
//create the table
    final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
    consumer.setStartFromEarliest();
    //Obtain stream
    DataStream<StockRecord> events = env.addSource(consumer);

    Table table = streamTableEnvironment.fromDataStream(events);
```

For more information about table sources, see [Table & SQL Connectors ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) in the Apache Flink Documentation.

## Table API sinks
<a name="how-table-connectors-sink"></a>

To write table data to a sink, you create the sink in SQL, and then run the SQL-based sink on the `StreamTableEnvironment` object.

The following code example demonstrates how to write table data to an Amazon S3 sink:

```
final String s3Sink = "CREATE TABLE sink_table (" +
    "event_time TIMESTAMP," +
    "ticker STRING," +
    "price DOUBLE," +
    "dt STRING," +
    "hr STRING" +
    ")" +
    " PARTITIONED BY (ticker,dt,hr)" +
    " WITH" +
    "(" +
    " 'connector' = 'filesystem'," +
    " 'path' = '" + s3Path + "'," +
    " 'format' = 'json'" +
    ") ";

    //send to s3
    streamTableEnvironment.executeSql(s3Sink);
    filteredTable.executeInsert("sink_table");
```

 You can use the `format` parameter to control what format Managed Service for Apache Flink uses to write the output to the sink. For information about formats, see [ Supported Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) in the Apache Flink Documentation.

## User-defined sources and sinks
<a name="how-table-connectors-userdef"></a>

You can use existing Apache Kafka connectors for sending data to and from other AWS services, such as Amazon MSK and Amazon S3. For interacting with other data sources and destinations, you can define your own sources and sinks. For more information, see [ User-defined Sources and Sinks](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sourcessinks/) in the Apache Flink Documentation.

# Table API time attributes
<a name="how-table-timeattributes"></a>

Each record in a data stream has several timestamps that define when events related to the record occurred:
+ **Event Time**: A user-defined timestamp that defines when the event that created the record occurred.
+ **Ingestion Time**: The time when your application retrieved the record from the data stream.
+ **Processing Time**: The time when your application processed the record.

When the Apache Flink Table API creates windows based on record times, you define which of these timestamps it uses by using the `setStreamTimeCharacteristic` method. 

For more information about using timestamps with the Table API, see [ Time Attributes](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/concepts/time_attributes/) and [ Timely Stream Processing](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/) in the Apache Flink Documentation.

# Use Python with Managed Service for Apache Flink
<a name="how-python"></a>

**Note**  
If you are developing Python Flink application on a new Mac with Apple Silicon chip, you may encounter some [known issues](https://issues.apache.org/jira/browse/FLINK-26981) with Python dependencies of PyFlink 1.15. In this case we recommend running the Python interpreter in Docker. For step-by-step instructions, see [PyFlink 1.15 development on Apple Silicon Mac](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/LocalDevelopmentOnAppleSilicon).

Apache Flink version 2.2 includes support for creating applications using Python version 3.12; support for Python version 3.8 is removed. For more information, see [Flink Python Docs](https://nightlies.apache.org/flink/flink-docs-release-2.2/api/python/). You create a Managed Service for Apache Flink application using Python by doing the following:
+ Create your Python application code as a text file with a `main` method.
+ Bundle your application code file and any Python or Java dependencies into a zip file, and upload it to an Amazon S3 bucket.
+ Create your Managed Service for Apache Flink application, specifying your Amazon S3 code location, application properties, and application settings.

At a high level, the Python Table API is a wrapper around the Java Table API. For information about the Python Table API, see the [ Table API Tutorial](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/table_api_tutorial/) in the Apache Flink Documentation.

# Program your Managed Service for Apache Flink Python application
<a name="how-python-programming"></a>

You code your Managed Service for Apache Flink for Python application using the Apache Flink Python Table API. The Apache Flink engine translates Python Table API statements (running in the Python VM) into Java Table API statements (running in the Java VM). 

You use the Python Table API by doing the following:
+ Create a reference to the `StreamTableEnvironment`.
+ Create `table` objects from your source streaming data by executing queries on the `StreamTableEnvironment` reference.
+ Execute queries on your `table` objects to create output tables.
+ Write your output tables to your destinations using a `StatementSet`.

To get started using the Python Table API in Managed Service for Apache Flink, see [Get started with Amazon Managed Service for Apache Flink for Python](gs-python.md).

## Read and write streaming data
<a name="how-python-programming-readwrite"></a>

To read and write streaming data, you execute SQL queries on the table environment.

### Create a table
<a name="how-python-programming-readwrite-createtable"></a>

The following code example demonstrates a user-defined function that creates a SQL query. The SQL query creates a table that interacts with a Kinesis stream:

```
def create_table(table_name, stream_name, region, stream_initpos):
   return """ CREATE TABLE {0} (
                `record_id` VARCHAR(64) NOT NULL,
                `event_time` BIGINT NOT NULL,
                `record_number` BIGINT NOT NULL,
                `num_retries` BIGINT NOT NULL,
                `verified` BOOLEAN NOT NULL
              )
              PARTITIONED BY (record_id)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.producer.collection-max-count' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(table_name, stream_name, region, stream_initpos)
```

### Read streaming data
<a name="how-python-programming-readwrite-read"></a>

The following code example demonstrates how to use preceding `CreateTable`SQL query on a table environment reference to read data:

```
   table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))
```

### Write streaming data
<a name="how-python-programming-readwrite-write"></a>

The following code example demonstrates how to use the SQL query from the `CreateTable` example to create an output table reference, and how to use a `StatementSet` to interact with the tables to write data to a destination Kinesis stream:

```
   table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                       .format(output_table_name, input_table_name))
```

## Read runtime properties
<a name="how-python-programming-properties"></a>

You can use runtime properties to configure your application without changing your application code.

You specify application properties for your application the same way as with a Managed Service for Apache Flink for Java application. You can specify runtime properties in the following ways:
+ Using the [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) action.
+ Using the [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) action.
+ Configuring your application by using the console.

You retrieve application properties in code by reading a json file called `application_properties.json` that the Managed Service for Apache Flink runtime creates.

The following code example demonstrates reading application properties from the `application_properties.json` file:

```
file_path = '/etc/flink/application_properties.json'
   if os.path.isfile(file_path):
       with open(file_path, 'r') as file:
           contents = file.read()
           properties = json.loads(contents)
```

The following user-defined function code example demonstrates reading a property group from the application properties object: retrieves:

```
def property_map(properties, property_group_id):
   for prop in props:
       if prop["PropertyGroupId"] == property_group_id:
           return prop["PropertyMap"]
```

The following code example demonstrates reading a property called INPUT\$1STREAM\$1KEY from a property group that the previous example returns:

```
input_stream = input_property_map[INPUT_STREAM_KEY]
```

## Create your application's code package
<a name="how-python-programming-package"></a>

Once you have created your Python application, you bundle your code file and dependencies into a zip file.

Your zip file must contain a python script with a `main` method, and can optionally contain the following:
+ Additional Python code files
+ User-defined Java code in JAR files
+ Java libraries in JAR files

**Note**  
Your application zip file must contain all of the dependencies for your application. You can't reference libraries from other sources for your application.

# Create your Managed Service for Apache Flink Python application
<a name="how-python-creating"></a>

## Specify your code files
<a name="how-python-creating-code"></a>

Once you have created your application's code package, you upload it to an Amazon S3 bucket. You then create your application using either the console or the [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) action.

When you create your application using the [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) action, you specify the code files and archives in your zip file using a special application property group called `kinesis.analytics.flink.run.options`. You can define the following types files:
+ **python**: A text file containing a Python main method.
+ **jarfile**: A Java JAR file containing Java user-defined functions.
+ **pyFiles**: A Python resource file containing resources to be used by the application.
+ **pyArchives**: A zip file containing resource files for the application.

For more information about Apache Flink Python code file types, see [ Command-Line Interface](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/) in the Apache Flink Documentation.

**Note**  
Managed Service for Apache Flink does not support the `pyModule`, `pyExecutable`, or `pyRequirements` file types. All of the code, requirements, and dependencies must be in your zip file. You can't specify dependencies to be installed using pip. 

The following example json snippet demonstrates how to specify file locations within your application's zip file:

```
"ApplicationConfiguration": {
    "EnvironmentProperties": {
      "PropertyGroups": [
        {
          "PropertyGroupId": "kinesis.analytics.flink.run.options",
          "PropertyMap": {
            "python": "MyApplication/main.py",
            "jarfile": "MyApplication/lib/myJarFile.jar",
            "pyFiles": "MyApplication/lib/myDependentFile.py",
            "pyArchives": "MyApplication/lib/myArchive.zip"
          }
        },
```

# Monitor your Managed Service for Apache Flink Python application
<a name="how-python-monitoring"></a>

You use your application's CloudWatch log to monitor your Managed Service for Apache Flink Python application.

Managed Service for Apache Flink logs the following messages for Python applications:
+ Messages written to the console using `print()` in the application's `main` method.
+ Messages sent in user-defined functions using the `logging` package. The following code example demonstrates writing to the application log from a user-defined function:

  ```
  import logging
  
  @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
  def doNothingUdf(i):
      logging.info("Got {} in the doNothingUdf".format(str(i)))
      return i
  ```
+ Error messages thrown by the application.

  If the application throws an exception in the `main` function, it will appear in your application's logs.

  The following example demonstrates a log entry for an exception thrown from Python code:

  ```
  2021-03-15 16:21:20.000   --------------------------- Python Process Started --------------------------
  2021-03-15 16:21:21.000   Traceback (most recent call last):
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 101, in <module>"
  2021-03-15 16:21:21.000       main()
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 54, in main"
  2021-03-15 16:21:21.000   "    table_env.register_function(""doNothingUdf"", doNothingUdf)"
  2021-03-15 16:21:21.000   NameError: name 'doNothingUdf' is not defined
  2021-03-15 16:21:21.000   --------------------------- Python Process Exited ---------------------------
  2021-03-15 16:21:21.000   Run python process failed
  2021-03-15 16:21:21.000   Error occurred when trying to start the job
  ```

**Note**  
Due to performance issues, we recommend that you only use custom log messages during application development. 

## Query logs with CloudWatch Insights
<a name="how-python-monitoring-insights"></a>

The following CloudWatch Insights query searches for logs created by the Python entrypoint while executing the main function of your application:

```
fields @timestamp, message
| sort @timestamp asc
| filter logger like /PythonDriver/
| limit 1000
```

# Use runtime properties in Managed Service for Apache Flink
<a name="how-properties"></a>

You can use *runtime properties* to configure your application without recompiling your application code. 

**Topics**
+ [

## Manage runtime properties using the console
](#how-properties-console)
+ [

## Manage runtime properties using the CLI
](#how-properties-cli)
+ [

## Access runtime properties in a Managed Service for Apache Flink application
](#how-properties-access)

## Manage runtime properties using the console
<a name="how-properties-console"></a>

You can add, update, or remove runtime properties from your Managed Service for Apache Flink application using the AWS Management Console.

**Note**  
If you are using an earlier supported version of Apache Flink and want to upgrade your existing applications to Apache Flink 1.19.1, you can do so using in-place Apache Flink version upgrades. With in-place version upgrades, you retain application traceability against a single ARN across Apache Flink versions, including snapshots, logs, metrics, tags, Flink configurations, and more. You can use this feature in `RUNNING` and `READY` state. For more information, see [Use in-place version upgrades for Apache Flink](how-in-place-version-upgrades.md).

**Update Runtime Properties for a Managed Service for Apache Flink application**

1. Sign in to the AWS Management Console, and open the Amazon MSF console at https://console.aws.amazon.com/flink.

1. Choose your Managed Service for Apache Flink application. Choose **Application details**.

1. On the page for your application, choose **Configure**.

1. Expand the **Properties** section.

1. Use the controls in the **Properties** section to define a property group with key-value pairs. Use these controls to add, update, or remove property groups and runtime properties.

1. Choose **Update**.

## Manage runtime properties using the CLI
<a name="how-properties-cli"></a>

You can add, update, or remove runtime properties using the [AWS CLI](https://docs.aws.amazon.com/cli). 

This section includes example requests for API actions for configuring runtime properties for an application. For information about how to use a JSON file for input for an API action, see [Managed Service for Apache Flink API example code](api-examples.md).

**Note**  
Replace the sample account ID (*`012345678901`*) in the examples following with your account ID.

### Add runtime properties when creating an application
<a name="how-properties-create"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) action adds two runtime property groups (`ProducerConfigProperties` and `ConsumerConfigProperties`) when you create an application:

```
{
    "ApplicationName": "MyApplication",
    "ApplicationDescription": "my java test app",
    "RuntimeEnvironment": "FLINK-1_19",
    "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role",
    "ApplicationConfiguration": {
        "ApplicationCodeConfiguration": {
            "CodeContent": {
                "S3ContentLocation": {
                    "BucketARN": "arn:aws:s3:::ka-app-code-username",
                    "FileKey": "java-getting-started-1.0.jar"
                }
            },
            "CodeContentType": "ZIPFILE"
        },
        "EnvironmentProperties":  { 
         "PropertyGroups": [ 
            { 
               "PropertyGroupId": "ProducerConfigProperties",
               "PropertyMap" : {
                    "flink.stream.initpos" : "LATEST",
                    "aws.region" : "us-west-2",
                    "AggregationEnabled" : "false"
               }
            },
            { 
               "PropertyGroupId": "ConsumerConfigProperties",
               "PropertyMap" : {
                    "aws.region" : "us-west-2"
               }
            }
         ]
      }
    }
}
```

### Add and update runtime properties in an existing application
<a name="how-properties-update"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) action adds or updates runtime properties for an existing application:

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 2,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": [ 
        { 
          "PropertyGroupId": "ProducerConfigProperties",
          "PropertyMap" : {
            "flink.stream.initpos" : "LATEST",
            "aws.region" : "us-west-2",
            "AggregationEnabled" : "false"
          }
        },
        { 
          "PropertyGroupId": "ConsumerConfigProperties",
          "PropertyMap" : {
            "aws.region" : "us-west-2"
          }
        }
      ]
    }
  }
}
```

**Note**  
If you use a key that has no corresponding runtime property in a property group, Managed Service for Apache Flink adds the key-value pair as a new property. If you use a key for an existing runtime property in a property group, Managed Service for Apache Flink updates the property value. 

### Remove runtime properties
<a name="how-properties-remove"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) action removes all runtime properties and property groups from an existing application:

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 3,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": []
    }
  }
}
```

**Important**  
If you omit an existing property group or an existing property key in a property group, that property group or property is removed.

## Access runtime properties in a Managed Service for Apache Flink application
<a name="how-properties-access"></a>

You retrieve runtime properties in your Java application code using the static `KinesisAnalyticsRuntime.getApplicationProperties()` method, which returns a `Map<String, Properties>` object.

The following Java code example retrieves runtime properties for your application:

```
 Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
```

You retrieve a property group (as a `Java.Util.Properties` object) as follows:

```
Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");
```

You typically configure an Apache Flink source or sink by passing in the `Properties` object without needing to retrieve the individual properties. The following code example demonstrates how to create an Flink source by passing in a `Properties` object retrieved from runtime properties:

```
private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
  Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
  FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<String>(new SimpleStringSchema(),
    applicationProperties.get("ProducerConfigProperties"));

  sink.setDefaultStream(outputStreamName);
  sink.setDefaultPartition("0");
  return sink;
}
```

For code examples, see [Examples for creating and working with Managed Service for Apache Flink applications](examples-collapsibles.md).

# Use Apache Flink connectors with Managed Service for Apache Flink
<a name="how-flink-connectors"></a>

Apache Flink connectors are software components that move data into and out of an Amazon Managed Service for Apache Flink application. Connectors are flexible integrations that let you read from files and directories. Connectors consist of complete modules for interacting with Amazon services and third-party systems.

Types of connectors include the following:
+ **Sources:** Provide data to your application from a Kinesis data stream, file, Apache Kafka topic, file, or other data sources.
+ **Sinks:** Send data from your application to a Kinesis data stream, Firehose stream, Apache Kafka topic, or other data destinations.
+ **Asynchronous I/O:** Provides asynchronous access to a data source such as a database to enrich streams. 

Apache Flink connectors are stored in their own source repositories. The version and artifact for Apache Flink connectors changes depending on the Apache Flink version you are using, and whether you are using the DataStream, Table, or SQL API. 

Amazon Managed Service for Apache Flink supports over 40 pre-built Apache Flink source and sink connectors. The following table provides a summary of the most popular connectors and their associated versions. You can also build custom sinks using the Async-sink framework. For more information, see [The Generic Asynchronous Base Sink](https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/) in the Apache Flink documentation.

 To access the repository for Apache Flink AWS connectors, see [flink-connector-aws](https://github.com/apache/flink-connector-aws).

## Connectors for Flink 2.2
<a name="connectors-flink-2-2"></a>

When upgrading to Flink 2.2, you need to update your connector dependencies to versions that are compatible with the Flink 2.x runtime. Flink connectors are released independently from the Flink runtime, and not all connectors have a Flink 2.x-compatible release yet. The following table summarizes the availability of commonly used connectors in Amazon Managed Service for Apache Flink as of this writing:


**Connectors for Flink 2.2**  

| Connector | Flink 2.0\$1 Version | Notes | 
| --- | --- | --- | 
| Apache Kafka | flink-connector-kafka 4.0.0-2.0 | Recommended for Flink 2.2 | 
| Kinesis Data Streams (source) | flink-connector-aws-kinesis-streams 6.0.0-2.0 | Recommended for Flink 2.2 | 
| Kinesis Data Streams (sink) | flink-connector-aws-kinesis-streams 6.0.0-2.0 | Recommended for Flink 2.2 | 
| FileSystem (S3, HDFS) | Bundled with Flink | Built into the Flink distribution — always available | 
| JDBC | Not yet released for 2.x | No Flink 2.x-compatible release available | 
| OpenSearch | Not yet released for 2.x | No Flink 2.x-compatible release available | 
| Elasticsearch | Not yet released for 2.x | Consider migrating to the OpenSearch connector | 
| Amazon Managed Service for Prometheus | Not yet released for 2.x | No Flink 2.x-compatible release at time of writing | 

If your application depends on a connector that does not yet have a Flink 2.2 release, you have two options: wait for the connector to release a compatible version, or evaluate whether you can replace it with an alternative (for example, using the JDBC catalog or a custom sink).

**Known issues**
+ Applications using the `KinesisStreamsSource` with EFO (Enhanced Fan-Out / SubscribeToShard) path introduced in connector v5.0.0 and v6.0.0 may fail when Kinesis streams undergo resharding. This is a known issue in the community. For more information, see [FLINK-37648](https://issues.apache.org/jira/browse/FLINK-37648).
+ Applications using the `KinesisStreamsSource` with EFO (Enhanced Fan-Out / SubscribeToShard) path introduced in connector v5.0.0 and v6.0.0 together with `KinesisStreamsSink` may experience deadlocks if the Flink application is under backpressure, resulting in a complete stop of data processing in one or more TaskManagers. A force stop operation and a start app operation are needed to recover the app. This is a sub-case of the known issue in the community: [FLINK-34071](https://issues.apache.org/jira/browse/FLINK-34071).

## Connectors for older Flink versions
<a name="connectors-older-versions"></a>


**Connectors for older Flink versions**  

| Connector | Flink version 1.15 | Flink version 1.18 | Flink versions 1.19 | Flink versions 1.20 | 
| --- | --- | --- | --- | --- | 
| Kinesis Data Stream - Source - DataStream and Table API | flink-connector-kinesis, 1.15.4 | flink-connector-kinesis, 4.3.0-1.18 | flink-connector-kinesis, 5.0.0-1.19 | flink-connector-kinesis, 5.0.0-1.20 | 
| Kinesis Data Stream - Sink - DataStream and Table API | flink-connector-aws-kinesis-streams, 1.15.4 | flink-connector-aws-kinesis-streams, 4.3.0-1.18 | flink-connector-aws-kinesis-streams, 5.0.0-1.19 | flink-connector-aws-kinesis-streams, 5.0.0-1.20 | 
| Kinesis Data Streams - Source/Sink - SQL | flink-sql-connector-kinesis, 1.15.4 | flink-sql-connector-kinesis, 4.3.0-1.18 | flink-sql-connector-kinesis, 5.0.0-1.19 | flink-sql-connector-kinesis-streams, 5.0.0-1.20 | 
| Kafka - DataStream and Table API | flink-connector-kafka, 1.15.4 | flink-connector-kafka, 3.2.0-1.18 | flink-connector-kafka, 3.3.0-1.19 | flink-connector-kafka, 3.3.0-1.20 | 
| Kafka - SQL | flink-sql-connector-kafka, 1.15.4 | flink-sql-connector-kafka, 3.2.0-1.18 | flink-sql-connector-kafka, 3.3.0-1.19 | flink-sql-connector-kafka, 3.3.0-1.20 | 
| Firehose - DataStream and Table API | flink-connector-aws-kinesis-firehose, 1.15.4 | flink-connector-aws-firehose, 4.3.0-1.18 | flink-connector-aws-firehose, 5.0.0-1.19 | flink-connector-aws-firehose, 5.0.0-1.20 | 
| Firehose - SQL | flink-sql-connector-aws-kinesis-firehose, 1.15.4 | flink-sql-connector-aws-firehose, 4.3.0-1.18 | flink-sql-connector-aws-firehose, 5.0.0-1.19 | flink-sql-connector-aws-firehose, 5.0.0-1.20 | 
| DynamoDB - DataStream and Table API | flink-connector-dynamodb, 3.0.0-1.15 | flink-connector-dynamodb, 4.3.0-1.18 | flink-connector-dynamodb, 5.0.0-1.19 | flink-connector-dynamodb, 5.0.0-1.20 | 
| DynamoDB - SQL | flink-sql-connector-dynamodb, 3.0.0-1.15 | flink-sql-connector-dynamodb, 4.3.0-1.18 | flink-sql-connector-dynamodb, 5.0.0-1.19 | flink-sql-connector-dynamodb, 5.0.0-1.20 | 
| OpenSearch - DataStream and Table API | - | flink-connector-opensearch, 1.2.0-1.18 | flink-connector-opensearch, 1.2.0-1.19 | flink-connector-opensearch, 1.2.0-1.19 | 
| OpenSearch - SQL | - | flink-sql-connector-opensearch, 1.2.0-1.18 | flink-sql-connector-opensearch, 1.2.0-1.19 | flink-sql-connector-opensearch, 1.2.0-1.19 | 
| Amazon Managed Service for Prometheus DataStream | - | flink-sql-connector-opensearch, 1.2.0-1.18 | flink-connector-prometheus, 1.0.0-1.19 | flink-connector-prometheus, 1.0.0-1.20 | 
| Amazon SQS DataStream and Table API | - | flink-sql-connector-opensearch, 1.2.0-1.18 | flink-connector-sqs, 5.0.0-1.19 | flink-connector-sqs, 5.0.0-1.20 | 

To learn more about connectors in Amazon Managed Service for Apache Flink, see:
+ [DataStream API connectors](https://docs.aws.amazon.com/managed-flink/latest/java/how-connectors.html)
+ [Table API connectors](https://docs.aws.amazon.com/managed-flink/latest/java/how-table-connectors.html)

### Known issues
<a name="connectors-known-issues"></a>

There is a known open source Apache Flink issue with the Apache Kafka connector in Apache Flink 1.15. This issue is resolved in later versions of Apache Flink. 

For more information, see [Known issues](flink-1-15-2.md#flink-1-15-known-issues). 

# Implement fault tolerance in Managed Service for Apache Flink
<a name="how-fault"></a>

Checkpointing is the method that is used for implementing fault tolerance in Amazon Managed Service for Apache Flink. A *checkpoint* is an up-to-date backup of a running application that is used to recover immediately from an unexpected application disruption or failover. 

For details on checkpointing in Apache Flink applications, see [Checkpoints](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/checkpoints/) in the Apache Flink Documentation.

A *snapshot* is a manually created and managed backup of application state. Snapshots let you restore your application to a previous state by calling [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html). For more information, see [Manage application backups using snapshots](how-snapshots.md).

If checkpointing is enabled for your application, then the service provides fault tolerance by creating and loading backups of application data in the event of unexpected application restarts. These unexpected application restarts could be caused by unexpected job restarts, instance failures, etc. This gives the application the same semantics as failure-free execution during these restarts. 

If snapshots are enabled for the application, and configured using the application's [ApplicationRestoreConfiguration](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html), then the service provides exactly-once processing semantics during application updates, or during service-related scaling or maintenance.

## Configure checkpointing in Managed Service for Apache Flink
<a name="how-fault-configure"></a>

You can configure your application's checkpointing behavior. You can define whether it persists the checkpointing state, how often it saves its state to checkpoints, and the minimum interval between the end of one checkpoint operation and the beginning of another.

You configure the following settings using the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) or [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) API operations:
+ `CheckpointingEnabled` — Indicates whether checkpointing is enabled in the application.
+ `CheckpointInterval` — Contains the time in milliseconds between checkpoint (persistence) operations.
+ `ConfigurationType` — Set this value to `DEFAULT` to use the default checkpointing behavior. Set this value to `CUSTOM` to configure other values.
**Note**  
The default checkpoint behavior is as follows:  
**CheckpointingEnabled:** true
**CheckpointInterval:** 60000
**MinPauseBetweenCheckpoints:** 5000
If **ConfigurationType** is set to `DEFAULT`, the preceding values will be used, even if they are set to other values using either using the AWS Command Line Interface, or by setting the values in the application code.
**Note**  
For Flink 1.15 onward, Managed Service for Apache Flink will use `stop-with-savepoint` during Automatic Snapshot Creation, that is, application update, scaling or stopping. 
+ `MinPauseBetweenCheckpoints` — The minimum time in milliseconds between the end of one checkpoint operation and the start of another. Setting this value prevents the application from checkpointing continuously when a checkpoint operation takes longer than the `CheckpointInterval`.

## Review checkpointing API examples
<a name="how-fault-examples"></a>

This section includes example requests for API actions for configuring checkpointing for an application. For information about how to use a JSON file for input for an API action, see [Managed Service for Apache Flink API example code](api-examples.md).

### Configure checkpointing for a new application
<a name="how-fault-examples-create-config"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) action configures checkpointing when you are creating an application:

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "true",
            "CheckpointInterval": 20000,
            "ConfigurationType": "CUSTOM",
            "MinPauseBetweenCheckpoints": 10000
         }
      }
}
```

### Disable checkpointing for a new application
<a name="how-fault-examples-create-disable"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) action disables checkpointing when you are creating an application:

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "false"
         }
      }
}
```

### Configure checkpointing for an existing application
<a name="how-fault-examples-update-config"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) action configures checkpointing for an existing application:

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": true,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

### Disable checkpointing for an existing application
<a name="how-fault-examples-update-update-disable"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) action disables checkpointing for an existing application:

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": false,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

# Manage application backups using snapshots
<a name="how-snapshots"></a>

A *snapshot* is the Managed Service for Apache Flink implementation of an Apache Flink *Savepoint*. A snapshot is a user- or service-triggered, created, and managed backup of the application state. For information about Apache Flink Savepoints, see [Savepoints](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/) in the Apache Flink Documentation. Using snapshots, you can restart an application from a particular snapshot of the application state.

**Note**  
We recommend that your application create a snapshot several times a day to restart properly with correct state data. The correct frequency for your snapshots depends on your application's business logic. Taking frequent snapshots lets you recover more recent data, but increases cost and requires more system resources.

In Managed Service for Apache Flink, you manage snapshots using the following API actions:
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html)

For the per-application limit on the number of snapshots, see [Managed Service for Apache Flink and Studio notebook quota](limits.md). If your application reaches the limit on snapshots, then manually creating a snapshot fails with a `LimitExceededException`. 

Managed Service for Apache Flink never deletes snapshots. You must manually delete your snapshots using the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) action.

To load a saved snapshot of application state when starting an application, use the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html) parameter of the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) or [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) action.

**Topics**
+ [

## Manage automatic snapshot creation
](#how-fault-snapshot-update)
+ [

## Restore from a snapshot that contains incompatible state data
](#how-fault-snapshot-restore)
+ [

## Review snapshot API examples
](#how-fault-snapshot-examples)

## Manage automatic snapshot creation
<a name="how-fault-snapshot-update"></a>

If `SnapshotsEnabled` is set to `true` in the [ ApplicationSnapshotConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html) for the application, Managed Service for Apache Flink automatically creates and uses snapshots when the application is updated, scaled, or stopped to provide exactly-once processing semantics.

**Note**  
Setting `ApplicationSnapshotConfiguration::SnapshotsEnabled` to `false` will lead to data loss during application updates.

**Note**  
Managed Service for Apache Flink triggers intermediate savepoints during snapshot creation. For Flink version 1.15 or greater, intermediate savepoints no longer commit any side effects. See [Triggering savepoints](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints).

Automatically created snapshots have the following qualities:
+ The snapshot is managed by the service, but you can see the snapshot using the [ ListApplicationSnapshots](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html) action. Automatically created snapshots count against your snapshot limit.
+ If your application exceeds the snapshot limit, manually created snapshots will fail, but the Managed Service for Apache Flink service will still successfully create snapshots when the application is updated, scaled, or stopped. You must manually delete snapshots using the [ DeleteApplicationSnapshot](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) action before creating more snapshots manually.

## Restore from a snapshot that contains incompatible state data
<a name="how-fault-snapshot-restore"></a>

Because snapshots contain information about operators, restoring state data from a snapshot for an operator that has changed since the previous application version may have unexpected results. An application will fault if it attempts to restore state data from a snapshot that does not correspond to the current operator. The faulted application will be stuck in either the `STOPPING` or `UPDATING` state. 

To allow an application to restore from a snapshot that contains incompatible state data, set the `AllowNonRestoredState` parameter of the [FlinkRunConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_FlinkRunConfiguration.html) to `true` using the [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) action.

You will see the following behavior when an application is restored from an obsolete snapshot:
+ **Operator added:** If a new operator is added, the savepoint has no state data for the new operator. No fault will occur, and it is not necessary to set `AllowNonRestoredState`.
+ **Operator deleted:** If an existing operator is deleted, the savepoint has state data for the missing operator. A fault will occur unless `AllowNonRestoredState` is set to `true`.
+ **Operator modified:** If compatible changes are made, such as changing a parameter's type to a compatible type, the application can restore from the obsolete snapshot. For more information about restoring from snapshots, see [Savepoints](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/) in the Apache Flink Documentation. An application that uses Apache Flink version 1.8 or later can possibly be restored from a snapshot with a different schema. An application that uses Apache Flink version 1.6 cannot be restored. For two-phase-commit sinks, we recommend using system snapshot (SwS) instead of user-created snapshot (CreateApplicationSnapshot).

  For Flink, Managed Service for Apache Flink triggers intermediate savepoints during snapshot creation. For Flink 1.15 onward, intermediate savepoints no longer commit any side effects. See [Triggering Savepoints](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints).

If you need to resume an application that is incompatible with existing savepoint data, we recommend that you skip restoring from the snapshot by setting the `ApplicationRestoreType` parameter of the [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) action to `SKIP_RESTORE_FROM_SNAPSHOT`.

For more information about how Apache Flink deals with incompatible state data, see [State Schema Evolution](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/) in the *Apache Flink Documentation*.

## Review snapshot API examples
<a name="how-fault-snapshot-examples"></a>

This section includes example requests for API actions for using snapshots with an application. For information about how to use a JSON file for input for an API action, see [Managed Service for Apache Flink API example code](api-examples.md).

### Enable snapshots for an application
<a name="how-fault-savepoint-examples-enable"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) action enables snapshots for an application:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSnapshotConfigurationUpdate": { 
         "SnapshotsEnabledUpdate": "true"
       }
    }
}
```

### Create a snapshot
<a name="how-fault-savepoint-examples-create"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html) action creates a snapshot of the current application state:

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### List snapshots for an application
<a name="how-fault-snapshot-examples-list"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html) action lists the first 50 snapshots for the current application state:

```
{
   "ApplicationName": "MyApplication",
   "Limit": 50
}
```

### List details for an application snapshot
<a name="how-fault-snapshot-examples-describe"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html) action lists details for a specific application snapshot:

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### Delete a snapshot
<a name="how-fault-snapshot-examples-delete"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) action deletes a previously saved snapshot. You can get the `SnapshotCreationTimestamp` value using either [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html) or [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html):

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot",
   "SnapshotCreationTimestamp": 12345678901.0,
}
```

### Restart an application using a named snapshot
<a name="how-fault-snapshot-examples-load-custom"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) action starts the application using the saved state from a specific snapshot:

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_CUSTOM_SNAPSHOT",
         "SnapshotName": "MyCustomSnapshot"
      }
   }
}
```

### Restart an application using the most recent snapshot
<a name="how-fault-snapshot-examples-load-recent"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) action starts the application using the most recent snapshot:

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
      }
   }
}
```

### Restart an application using no snapshot
<a name="how-fault-snapshot-examples-load-none"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) action starts the application without loading application state, even if a snapshot is present:

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
      }
   }
}
```

# Use in-place version upgrades for Apache Flink
<a name="how-in-place-version-upgrades"></a>

With in-place version upgrades for Apache Flink, you retain application traceability against a single ARN across Apache Flink versions. This includes snapshots, logs, metrics, tags, Flink configurations, resource limit increases, VPCs, and more. 

You can perform in-place version upgrades for Apache Flink to upgrade existing applications to a new Flink version in Amazon Managed Service for Apache Flink. To perform this task, you can use the AWS CLI, AWS CloudFormation, AWS SDK, or the AWS Management Console.

**Note**  
You can't use in-place version upgrades for Apache Flink with Amazon Managed Service for Apache Flink Studio.

**Topics**
+ [

# Upgrade applications using in-place version upgrades for Apache Flink
](upgrading-applications.md)
+ [

# Upgrade your application to a new Apache Flink version
](upgrading-application-new-version.md)
+ [

# Roll back application upgrades
](rollback.md)
+ [

# General best practices and recommendations for application upgrades
](best-practices-recommendations.md)
+ [

# Precautions and known issues with application upgrades
](precautions.md)
+ [

# Upgrading to Flink 2.2: Complete guide
](flink-2-2-upgrade-guide.md)
+ [

# State compatibility guide for Flink 2.2 upgrades
](state-compatibility.md)

# Upgrade applications using in-place version upgrades for Apache Flink
<a name="upgrading-applications"></a>

Before you begin, we recommend that you watch this video: [In-Place Version Upgrades](https://www.youtube.com/watch?v=f1qGGdaP2XI).

To perform in-place version upgrades for Apache Flink, you can use the AWS CLI, AWS CloudFormation, AWS SDK, or the AWS Management Console. You can use this feature with any existing applications that you use with Managed Service for Apache Flink in a `READY` or `RUNNING` state. It uses the UpdateApplication API to add the ability to change the Flink runtime.

## Before upgrading: Update your Apache Flink application
<a name="before-upgrading"></a>

When you write your Flink applications, you bundle them with their dependencies into an application JAR and upload the JAR to your Amazon S3 bucket. From there, Amazon Managed Service for Apache Flink runs the job in the new Flink runtime that you've selected. You might have to update your applications to achieve compatibility with the Flink runtime you want to upgrade to. There can be inconsistencies between Flink versions that cause the version upgrade to fail. Most commonly, this will be with connectors for sources (ingress) or destinations (sinks, egress) and Scala dependencies. Flink 1.15 and later versions in Managed Service for Apache Flink are Scala-agnostic, and your JAR must contain the version of Scala you plan to use.

**To update your application**

1. Read the advice from the Flink community on upgrading applications with state. See [Upgrading Applications and Flink Versions](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/).

1. Read the list of knowing issues and limitations. See [Precautions and known issues with application upgrades](precautions.md).

1. Update your dependencies and test your applications locally. These dependencies typically are:

   1. The Flink runtime and API.

   1. Connectors recommended for the new Flink runtime. You can find these on [Release versions](https://docs.aws.amazon.com/managed-flink/latest/java/release-version-list.html) for the specific runtime you want to update to.

   1. Scala – Apache Flink is Scala-agnostic starting with and including Flink 1.15. You must include the Scala dependencies you want to use in your application JAR.

1. Build a new application JAR on zipfile and upload it to Amazon S3. We recommend that you use a different name from the previous JAR/zipfile. If you need to roll back, you will use this information.

1. If you are running stateful applications, we strongly recommend that you take a snapshot of your current application. This lets you roll back statefully if you encounter issues during or after the upgrade. 

# Upgrade your application to a new Apache Flink version
<a name="upgrading-application-new-version"></a>

You can upgrade your Flink application by using the [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) action.

You can call the `UpdateApplication` API in multiple ways:
+ Use the existing **Configuration** workflow on the AWS Management Console.
  + Go to your app page on the AWS Management Console.
  + Choose **Configure**.
  + Select the new runtime and the snapshot that you want to start from, also known as restore configuration. Use the latest setting as the restore configuration to start the app from the latest snapshot. Point to the new upgraded application JAR/zip on Amazon S3.
+ Use the AWS CLI [update-application](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html) action.
+ Use CloudFormation (CFN).
  + Update the [RuntimeEnvironment](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisanalyticsv2-application.html#cfn-kinesisanalyticsv2-application-runtimeenvironment) field. Previously, CloudFormation deleted the application and created a new one, causing your snapshots and other app history to be lost. Now CloudFormation updates your RuntimeEnvironment in place and does not delete your application. 
+ Use the AWS SDK.
  + Consult the SDK documentation for the programming language of your choice. See [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html). 

You can perform the upgrade while the application is in `RUNNING` state or while the application is stopped in `READY` state. Amazon Managed Service for Apache Flink validates to verify the compatibility between the original runtime version and the target runtime version. This compatibility check runs when you perform [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) while in `RUNNING` state or at the next [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) if you upgrade while in `READY` state. 

## Upgrade an application in `RUNNING` state
<a name="upgrading-running"></a>

The following example shows upgrading an app in `RUNNING` state named `UpgradeTest` to Flink 1.18 in US East (N. Virginia) using the AWS CLI and starting the upgraded app from the latest snapshot. 

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --run-configuration-update '{"ApplicationRestoreConfiguration": '\
 '{"ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"}}' \
 --current-application-version-id ${current_application_version}
```
+ If you enabled service snapshots and want to continue the application from the latest snapshot, Amazon Managed Service for Apache Flink verifies that the current `RUNNING` application's runtime is compatible with the selected target runtime.
+ If you have specified a snapshot from which to continue the target runtime, Amazon Managed Service for Apache Flink verifies that the target runtime is compatible with the specified snapshot. If the compatibility check fails, your update request is rejected and your application remains untouched in the `RUNNING` state.
+ If you choose to start your application without a snapshot, Amazon Managed Service for Apache Flink doesn't run any compatibility checks.
+ If your upgraded application fails or gets stuck in a transitive `UPDATING` state, follow the instructions in the [Roll back application upgrades](rollback.md) section to return to the healthy state. 

**Process flow for running state applications**

![\[The following diagram represents the recommended workflow to upgrade the application while running. We assume that the application is stateful and that you enabled snapshots. For this workflow, on update, you restore the application from the latest snapshot that was automatically taken by Amazon Managed Service for Apache Flink before updating.\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/in-place-update-while-running.png)


## Upgrade an application in **READY** state
<a name="upgrading-ready"></a>

The following example shows upgrading an app in `READY` state named `UpgradeTest` to Flink 1.18 in US East (N. Virginia) using the AWS CLI. There is no specified snapshot to start the app because the application is not running. You can specify a snapshot when you issue the start application request.

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --current-application-version-id ${current_application_version}
```
+ You can update the runtime of your applications in `READY` state to any Flink version. Amazon Managed Service for Apache Flink does not run any checks until you start your application.
+  Amazon Managed Service for Apache Flink only runs compatibility checks against the snapshot you selected to start the app. These are basic compatibility checks following the [Flink Compatibility Table](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#compatibility-table). They only check the Flink version with which the snapshot was taken and the Flink version you are targeting. If the Flink runtime of the selected snapshot is incompatible with the app's new runtime, the start request might be rejected.

**Process flow for ready state applications**

![\[The following diagram represents the recommended workflow to upgrade the application while in ready state. We assume that the application is stateful and that you enabled snapshots. For this workflow, on update, you restore the application from the latest snapshot that was automatically taken by Amazon Managed Service for Apache Flink when the application was stopped.\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/in-place-update-while-ready.png)


# Roll back application upgrades
<a name="rollback"></a>

If you have issues with your application or find inconsistencies in your application code between Flink versions, you can roll back using the AWS CLI, AWS CloudFormation, AWS SDK, or the AWS Management Console. The following examples show what rolling back looks like in different failure scenarios.

## Runtime upgrade succeeded, the application is in `RUNNING` state, but the job is failing and continuously restarting
<a name="succeeded-restarting"></a>

Assume you are trying to upgrade a stateful application named `TestApplication` from Flink 1.15 to Flink 1.18 in US East (N. Virginia). However, the upgraded Flink 1.18 application is failing to start or is constantly restarting, even though the application is in `RUNNING` state. This is a common failure scenario. To avoid further downtime, we recommend that you roll back your application immediately to the previous running version (Flink 1.15), and diagnose the issue later.

To roll back the application to the previous running version, use the [rollback-application](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) AWS CLI command or the [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) API action. This API action rolls back the changes you've made that resulted in the latest version. Then it restarts your application using the latest successful snapshot. 

We strongly recommend that you take a snapshot with your existing app before you attempt to upgrade. This will help to avoid data loss or having to reprocess data. 

In this failure scenario, CloudFormation will not roll back the application for you. You must update the CloudFormation template to point to the previous runtime and to the previous code to force CloudFormation to update the application. Otherwise, CloudFormation assumes that your application has been updated when it transitions to the `RUNNING` state.

## Rolling back an application that is stuck in `UPDATING`
<a name="stuck-updating"></a>

If your application gets stuck in the `UPDATING` or `AUTOSCALING` state after an upgrade attempt, Amazon Managed Service for Apache Flink offers the [rollback-applications](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) AWS CLI command, or the [RollbackApplications](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) API action that can roll back the application to the version before the stuck `UPDATING` or `AUTOSCALING` state. This API rolls back the changes that you’ve made that caused the application to get stuck in `UPDATING` or `AUTOSCALING` transitive state.

# General best practices and recommendations for application upgrades
<a name="best-practices-recommendations"></a>
+ Test the new job/runtime without state on a non-production environment before attempting a production upgrade.
+ Consider testing the stateful upgrade with a non-production application first.
+ Make sure that your new job graph has a compatible state with the snapshot you will be using to start your upgraded application.
  + Make sure that the types stored in operator states stay the same. If the type has changed, Apache Flink can't restore the operator state.
  + Make sure that the Operator IDs you set using the `uid` method remain the same. Apache Flink has a strong recommendation for assigning unique IDs to operators. For more information, see [Assigning Operator IDs](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#assigning-operator-ids) in the Apache Flink documentation.

    If you don't assign IDs to your operators, Flink automatically generates them. In that case, they might depend on the program structure and, if changed, can cause compatibility issues. Flink uses Operator IDs to match state in snapshot to operator. Changing Operator IDs results in the application not starting, or state stored in the snapshot being dropped, and the new operator starting without state.
  + Don't change the key used to store the keyed state.
  + Don't modify the input type of stateful operators like window or join. This implicitly changes the type of the internal state of the operator, causing a state incompatibility.

# Precautions and known issues with application upgrades
<a name="precautions"></a>

## Kafka Commit on checkpointing fails repeatedly after a broker restart
<a name="apache-kafka-connector"></a>

There is a known open source Apache Flink issue with the Apache Kafka connector in Flink version 1.15 caused by a critical open source Kafka Client bug in Kafka Client 2.8.1. For more information, see [Kafka Commit on checkpointing fails repeatedly after a broker restart](https://issues.apache.org/jira/browse/FLINK-28060) and [KafkaConsumer is unable to recover connection to group coordinator after commitOffsetAsync exception](https://issues.apache.org/jira/browse/KAFKA-13840).

To avoid this issue, we recommend that you use Apache Flink 1.18 or later in Amazon Managed Service for Apache Flink.

## Known limitations of state compatibility
<a name="state-precautions"></a>
+ If you are using the Table API, Apache Flink doesn't guarantee state compatibility between Flink versions. For more information, see [Stateful Upgrades and Evolution](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution) in the Apache Flink documentation.
+ Flink 1.6 states are not compatible with Flink 1.18. The API rejects your request if you try to upgrade from 1.6 to 1.18 and later with state. You can upgrade to 1.8, 1.11, 1.13 and 1.15 and take a snapshot, and then upgrade to 1.18 and later. For more information, see [Upgrading Applications and Flink Versions](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/) in the Apache Flink documentation.

## Known issues with the Flink Kinesis Connector
<a name="kinesis-connector-precautions"></a>
+ If you are using Flink 1.11 or earlier and using the `amazon-kinesis-connector-flink` connector for Enhanced-fan-out (EFO) support, you must take extra steps for a stateful upgrade to Flink 1.13 or later. This is because of the change in the package name of the connector. For more information, see [amazon-kinesis-connector-flink](https://github.com/awslabs/amazon-kinesis-connector-flink).

  The `amazon-kinesis-connector-flink` connector for Flink 1.11 and earlier uses the packaging `software.amazon.kinesis`, whereas the Kinesis connector for Flink 1.13 and later uses `org.apache.flink.streaming.connectors.kinesis`. Use this tool to support your migration: [amazon-kinesis-connector-flink-state-migrator](https://github.com/awslabs/amazon-kinesis-connector-flink-state-migrator).
+ If you are using Flink 1.13 or earlier with `FlinkKinesisProducer` and upgrading to Flink 1.15 or later, for a stateful upgrade you must continue to use `FlinkKinesisProducer` in Flink 1.15 or later, instead of the newer `KinesisStreamsSink`. However, if you already have a custom `uid` set on your sink, you should be able to switch to `KinesisStreamsSink` because `FlinkKinesisProducer` doesn't keep state. Flink will treat it as the same operator because a custom `uid` is set.

## Flink applications written in Scala
<a name="scala-precautions"></a>
+ As of Flink 1.15, Apache Flink doesn't include Scala in the runtime. You must include the version of Scala you want to use and other Scala dependencies in your code JAR/zip when upgrading to Flink 1.15 or later. For more information, see [Amazon Managed Service for Apache Flink for Apache Flink 1.15.2 release](https://docs.aws.amazon.com/managed-flink/latest/java/flink-1-15-2.html).
+ If your application uses Scala and you are upgrading it from Flink 1.11 or earlier (Scala 2.11) to Flink 1.13 (Scala 2.12), make sure that your code uses Scala 2.12. Otherwise, your Flink 1.13 application may fail to find Scala 2.11 classes in the Flink 1.13 runtime.

## Things to consider when downgrading Flink application
<a name="downgrading-precautions"></a>
+ Downgrading Flink applications is possible, but limited to cases when the application was previously running with the older Flink version. For a stateful upgrade Managed Service for Apache Flink will require using a snapshot taken with matching or earlier version for the downgrade
+ If you are updating your runtime from Flink 1.13 or later to Flink 1.11 or earlier, and if your app uses the HashMap state backend, your application will continuously fail.

# Upgrading to Flink 2.2: Complete guide
<a name="flink-2-2-upgrade-guide"></a>

This guide provides step-by-step instructions for upgrading your Amazon Managed Service for Apache Flink application from Flink 1.x to Flink 2.2. This is a major version upgrade with breaking changes that require careful planning and testing.

**Major version upgrade is uni-directional**  
The Upgrade operation can move your application from Flink 1.x to 2.2 with state preservation, but you cannot move back from 2.2 to 1.x with 2.2 state. If your application becomes unhealthy after upgrading, use the Rollback API to return to the 1.x version with your original 1.x state from the latest snapshot.

## Prerequisites
<a name="upgrade-guide-prerequisites"></a>

Before beginning your upgrade:
+ Review [Breaking changes and deprecations](flink-2-2.md#flink-2-2-breaking-changes)
+ Review [State compatibility guide for Flink 2.2 upgrades](state-compatibility.md)
+ Ensure you have a non-production environment for testing
+ Document your current application configuration and dependencies

## Understanding your migration paths
<a name="upgrade-guide-migration-paths"></a>

Your upgrade experience depends on your application's compatibility with Flink 2.2. Understanding these paths helps you prepare appropriately and set realistic expectations.

**Path 1: Compatible binary and application state**

**What to expect:**
+ Invoke the Upgrade operation
+ Complete the migration to 2.2 with the application status transitioning: `RUNNING` → `UPDATING` → `RUNNING`
+ Preserve all application state without data loss or reprocessing
+ Same experience as minor version migrations

Best for: Stateless applications or applications using compatible serialization (Avro, compatible Protobuf schemas, POJOs without collections)

**Path 2: Binary incompatibilities**

**What to expect:**
+ Invoke the Upgrade operation
+ Operation fails and surfaces the binary incompatibility through Operations API and logs
+ With auto-rollback enabled: Applications automatically roll back within minutes without your intervention
+ With auto-rollback disabled: Applications remain in running state without data processing; you manually roll back to older version
+ Once the binary is fixed, use the [UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) for an experience similar to Path 1

Best for: Applications using removed APIs that are detected during Flink job startup

**Path 3: Incompatible application state**

**What to expect:**
+ Invoke the Upgrade operation
+ Migration appears to succeed initially
+ Applications enter restart loops within seconds as state restoration fails
+ Detect failures through CloudWatch Metrics showing continuous restarts
+ Manually invoke the Rollback operation
+ Return to production within minutes after initiating rollback
+ Review [State migration](state-compatibility.md#state-compat-migration) for your application

Best for: Applications with state serialization incompatibilities (POJOs with collections, certain Kryo-serialized state)

**Note**  
It is highly recommended to create a replica of your production application and test each of the following phases of the upgrade on the replica before following the same steps for your production application.

## Phase 1: Preparation
<a name="upgrade-guide-phase-1"></a>

**Update application code**

Update your application code to be compatible with Flink 2.2:
+ **Update Flink dependencies** to version 2.2.0 in your `pom.xml` or `build.gradle`
+ **Update connector dependencies** to Flink 2.2-compatible versions (see [Connector availability](flink-2-2.md#flink-2-2-connectors))
+ **Remove deprecated API usage**:
  + Replace DataSet API with DataStream API or Table API/SQL
  + Replace legacy `SourceFunction`/`SinkFunction` with FLIP-27 Source and FLIP-143 Sink APIs
  + Replace Scala API usage with Java API
+ **Update to Java 17**

**Upload updated application code**
+ Build your application JAR with Flink 2.2 dependencies
+ Upload to Amazon S3 with a **different file name** than your current JAR (for example, `my-app-flink-2.2.jar`)
+ Note the S3 bucket and key for use in the upgrade step

## Phase 2: Enable auto-rollback
<a name="upgrade-guide-phase-2"></a>

Auto-rollback allows Amazon Managed Service for Apache Flink to automatically revert to the previous version if the upgrade fails.

**Check auto-rollback status**

*AWS Management Console:*

1. Navigate to your application

1. Choose **Configuration**

1. Under **Application settings**, verify **System rollback** is enabled

*AWS CLI:*

```
aws kinesisanalyticsv2 describe-application \
    --application-name MyApplication \
    --query 'ApplicationDetail.ApplicationConfigurationDescription.ApplicationSystemRollbackConfigurationDescription.RollbackEnabled'
```

**Enable auto-rollback (if not enabled)**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --application-configuration-update '{
        "ApplicationSystemRollbackConfigurationUpdate": {
            "RollbackEnabledUpdate": true
        }
    }'
```

## Phase 3: Take snapshot (optional)
<a name="upgrade-guide-phase-3"></a>

If automatic snapshots are enabled for your application you can skip this step, otherwise take a snapshot of your application to save the state of your application before upgrading.

**Take snapshot from running application**

*AWS Management Console:*

1. Navigate to your application

1. Choose **Snapshots**

1. Choose **Create snapshot**

1. Enter a snapshot name (for example, `pre-flink-2.2-upgrade`)

1. Choose **Create**

*AWS CLI:*

```
aws kinesisanalyticsv2 create-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

**Verify snapshot creation**

```
aws kinesisanalyticsv2 describe-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

Wait until `SnapshotStatus` is `READY` before proceeding.

## Phase 4: Upgrade application
<a name="upgrade-guide-phase-4"></a>

You can upgrade your Flink application by using the [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) action.

You can call the `UpdateApplication` API in multiple ways:
+ **Use the AWS Management Console.**
  + Go to your app page on the AWS Management Console.
  + Choose **Configure**.
  + Select the new runtime and the snapshot that you want to start from, also known as restore configuration. Use the latest setting as the restore configuration to start the app from the latest snapshot. Point to the new upgraded application JAR/zip on Amazon S3.
+ **Use the AWS CLI** [https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html) action.
+ **Use CloudFormation.**
  + Update the `RuntimeEnvironment` field. Previously, CloudFormation deleted the application and created a new one, causing your snapshots and other app history to be lost. Now CloudFormation updates your `RuntimeEnvironment` in place and does not delete your application.
+ **Use the AWS SDK.**
  + Consult the SDK documentation for the programming language of your choice. See [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html).

You can perform the upgrade while the application is in `RUNNING` state or while the application is stopped in `READY` state. Amazon Managed Service for Apache Flink validates the compatibility between the original runtime version and the target runtime version. This compatibility check runs when you perform `UpdateApplication` while in `RUNNING` state or at the next `StartApplication` if you upgrade while in `READY` state.

**Upgrade from RUNNING state**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

**Upgrade from READY state**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

## Phase 5: Monitor upgrade
<a name="upgrade-guide-phase-5"></a>

**Compatibility check**
+ Use the Operations API to check the status of the upgrade. If there are binary incompatibilities or issues with job startup, the upgrade operation will fail with logs.
+ If the Upgrade Operation has succeeded but the application is stuck in restart loops, this means the state is incompatible with the new Flink version or there is a problem with the updated code. Review [State compatibility guide for Flink 2.2 upgrades](state-compatibility.md) on how to identify state incompatibility issues.

**Monitor application health**

*Application state:*
+ Application status should transition: `RUNNING` → `UPDATING` → `RUNNING`
+ Check the runtime of the application. If it is 2.2, the upgrade operation was successful.
+ If your application is in `RUNNING` but still on the older runtime, auto-rollback kicked in. Operations API will show operation as `FAILED`. Check logs to find the exception for failure.

In addition, monitor these metrics in CloudWatch:

*Restart metric:*
+ `numRestarts`: Monitor for unexpected restarts — the upgrade is successful if `numRestarts` is zero and `uptime` or `runningTime` is increasing.

*Checkpoint metrics:*
+ `lastCheckpointDuration`: Should be similar to pre-upgrade values
+ `numberOfFailedCheckpoints`: Should remain at 0

## Phase 6: Validate application behavior
<a name="upgrade-guide-phase-6"></a>

After the application is running on Flink 2.2:

**Functional validation**
+ Verify data is being read from sources
+ Verify data is being written to sinks
+ Verify business logic produces expected results
+ Compare output with pre-upgrade baseline

**Performance validation**
+ Monitor latency metrics (end-to-end processing time)
+ Monitor throughput metrics (records per second)
+ Monitor checkpoint duration and size
+ Monitor memory and CPU utilization

**Run for 24\$1 hours**

Allow the application to run for at least 24 hours in production to ensure:
+ No memory leaks
+ Stable checkpoint behavior
+ No unexpected restarts
+ Consistent throughput

## Phase 7: Rollback procedures
<a name="upgrade-guide-phase-7"></a>

If the upgrade fails or the application is running but unhealthy, roll back to the previous version.

**Automatic rollback**

If auto-rollback is enabled and the upgrade fails during startup, Amazon Managed Service for Apache Flink automatically reverts to the previous version.

**Manual rollback**

If the application is running but unhealthy, use the `RollbackApplication` API:

*AWS Management Console:*

1. Navigate to your application

1. Choose **Actions** → **Roll back**

1. Confirm the rollback

*AWS CLI:*

```
aws kinesisanalyticsv2 rollback-application \
    --application-name MyApplication \
    --current-application-version-id <version-id>
```

**What happens during rollback:**
+ Application stops
+ Runtime reverts to previous Flink version
+ Application code reverts to previous JAR
+ Application restarts from the last successful snapshot taken **before** the upgrade

**Important**  
You cannot restore a Flink 2.2 snapshot on Flink 1.x
Rollback uses the snapshot taken before the upgrade
Always take a snapshot before upgrading (Phase 3)

## Next steps
<a name="upgrade-guide-next-steps"></a>

For questions or issues during upgrade, see the [Troubleshoot Managed Service for Apache Flink](troubleshooting.md) or contact AWS Support.

# State compatibility guide for Flink 2.2 upgrades
<a name="state-compatibility"></a>

When upgrading from Flink 1.x to Flink 2.2, state compatibility issues may prevent your application from restoring from snapshots. This guide helps you identify potential compatibility issues and provides migration strategies.

## Understanding state compatibility changes
<a name="state-compat-understanding"></a>

Amazon Managed Service for Apache Flink 2.2 introduces several serialization changes that affect state compatibility. The following are the major ones:
+ **Kryo Version Upgrade**: Apache Flink 2.2 upgrades the bundled Kryo serializer from version 2 to version 5. Because Kryo v5 uses a different binary encoding format than Kryo v2, any operator state that was serialized via Kryo in a Flink 1.x savepoint cannot be restored in Flink 2.2.
+ **Java Collections Serialization**: In Flink 1.x, Java collections (such as `HashMap`, `ArrayList`, and `HashSet`) within POJOs were serialized using Kryo. Flink 2.2 introduces collection-specific optimized serializers that are incompatible with the Kryo-serialized state from 1.x. Applications using Java collections with POJO or Kryo serializers in 1.x cannot restore this state in Flink 2.2. See Flink [documentation](https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/datastream/fault-tolerance/serialization/types_serialization/) for more details on data types and serialization.
+ **Kinesis Connector Compatibility**: The Kinesis Data Streams (KDS) connector version lower than 5.0 maintains state that is not compatible with the Flink 2.2 Kinesis connector version 6.0. You must migrate to connector version 5.0 or greater before your upgrade.

## Serialization compatibility reference
<a name="state-compat-reference"></a>

Review all state declarations in your application and match serialization types to the table below. If any state type is incompatible, see the [State migration](#state-compat-migration) section before proceeding with your upgrade.


**Serialization compatibility reference**  

| Serialization Type | Compatible? | Details | 
| --- | --- | --- | 
| Avro (SpecificRecord, GenericRecord) | Yes | Uses its own binary format independent of Kryo. Ensure you are using Flink's native Avro type information, not Avro registered as a Kryo serializer. | 
| Protobuf | Yes | Uses its own binary encoding independent of Kryo. Verify schema changes follow backward-compatible evolution rules. | 
| POJOs without collections | Yes | Handled by Flink's POJO serializer — but only if the class meets all POJO criteria: public class, public no-arg constructor, all fields either public or accessible via getters/setters, and all field types themselves serializable by Flink. A POJO that violates any of these silently falls back to Kryo and becomes incompatible. | 
| Custom TypeSerializers | Yes | Compatible only if your serializer does not delegate to Kryo internally. | 
| SQL and Table API state | Yes (with caveat) | Uses Flink's internal serializers. However, Apache Flink does not guarantee state compatibility between major versions for Table API applications. Test in a non-production environment first. | 
| POJOs with Java collections (HashMap, ArrayList, HashSet) | No | In Flink 1.x, collections within POJOs were serialized via Kryo v2. Flink 2.2 introduces dedicated collection serializers whose binary format is incompatible with the Kryo v2 format. | 
| Scala case classes | No | Serialized via Kryo in Flink 1.x. The Kryo v2 to v5 upgrade changes the binary format. | 
| Java records | No | Typically fall back to Kryo serialization in Flink 1.x. Verify by testing with disableGenericTypes(). | 
| Third-party library types | No | Types without a registered custom serializer fall back to Kryo. The Kryo v2 to v5 binary format change breaks compatibility. | 
| Any type using Kryo fallback | No | If Flink cannot handle a type with a built-in or registered serializer, it falls back to Kryo. All Kryo-serialized state from 1.x is incompatible with 2.2. | 

## Diagnostic methods
<a name="state-compat-diagnostics"></a>

You can either identify state compatibility issues proactively by looking at application logs or inspecting logs after the [UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) operation.

**Identify Kryo fallback in your application**

You can use the following regex pattern in your logs to identify Kryo fallback in your application:

```
Class class (?<className>[^\s]+) cannot be used as a POJO type
```

Sample log:

```
Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance and
schema evolution.
```

If the upgrade fails using the UpdateApplication API, the following exceptions might signal that you are encountering serializer-based state incompatibility:

**IndexOutOfBoundsException**

```
Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923)
    ... 23 more
```

**StateMigrationException (POJOSerializer)**

```
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be
incompatible with the old state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).
```

## Pre-upgrade checklist
<a name="state-compat-checklist"></a>
+ Review all state declarations in your application
+ Check for POJOs with collections (`HashMap`, `ArrayList`, `HashSet`)
+ Verify serialization methods for each state type
+ Create a prod replica application and test state compatibility using UpdateApplication API on this replica
+ If state is incompatible, select a strategy from [State migration](#state-compat-migration)
+ Enable auto-rollback in your production Flink application configuration

## State migration
<a name="state-compat-migration"></a>

**Rebuild complete state**

Best for applications where state can be rebuilt from source data.

If your application can rebuild state from source data:

1. Stop the Flink 1.x application

1. Upgrade to Flink 2.x with updated code

1. Start with `SKIP_RESTORE_FROM_SNAPSHOT`

1. Allow application to rebuild state

```
aws kinesisanalyticsv2 start-application \
    --application-name MyApplication \
    --run-configuration '{
        "ApplicationRestoreConfiguration": {
            "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
        }
    }'
```

## Best practices
<a name="state-compat-best-practices"></a>

1. **Always use Avro or Protobuf for complex state** — These provide schema evolution and are Kryo-independent

1. **Avoid collections in POJOs** — Use Flink's native `ListState` and `MapState` instead

1. **Test state restoration locally** — Before production upgrade, test with actual snapshots

1. **Take snapshots frequently** — Especially before major version upgrades

1. **Enable auto-rollback** — Configure your MSF application to automatically rollback on failure

1. **Document your state types** — Maintain documentation of all state types and their serialization methods

1. **Monitor checkpoint sizes** — Growing checkpoint sizes may indicate serialization issues

## Next steps
<a name="state-compat-next-steps"></a>

**Plan your upgrade**: See [Upgrading to Flink 2.2: Complete guide](flink-2-2-upgrade-guide.md).

For questions or issues during migration, see the [Troubleshoot Managed Service for Apache Flink](troubleshooting.md) or contact AWS Support.

# Implement application scaling in Managed Service for Apache Flink
<a name="how-scaling"></a>

You can configure the parallel execution of tasks and the allocation of resources for Amazon Managed Service for Apache Flink to implement scaling. For information about how Apache Flink schedules parallel instances of tasks, see [Parallel Execution](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/) in the Apache Flink Documentation.

**Topics**
+ [

## Configure application parallelism and ParallelismPerKPU
](#how-parallelism)
+ [

## Allocate Kinesis Processing Units
](#how-scaling-kpus)
+ [

## Update your application's parallelism
](#how-scaling-howto)
+ [

# Use automatic scaling in Managed Service for Apache Flink
](how-scaling-auto.md)
+ [

## maxParallelism considerations
](#how-scaling-auto-max-parallelism)

## Configure application parallelism and ParallelismPerKPU
<a name="how-parallelism"></a>

You configure the parallel execution for your Managed Service for Apache Flink application tasks (such as reading from a source or executing an operator) using the following [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html) properties: 
+ `Parallelism` — Use this property to set the default Apache Flink application parallelism. All operators, sources, and sinks execute with this parallelism unless they are overridden in the application code. The default is `1`, and the default maximum is `256`.
+ `ParallelismPerKPU` — Use this property to set the number of parallel tasks that can be scheduled per Kinesis Processing Unit (KPU) of your application. The default is `1`, and the maximum is `8`. For applications that have blocking operations (for example, I/O), a higher value of `ParallelismPerKPU` leads to full utilization of KPU resources.

**Note**  
The limit for `Parallelism` is equal to `ParallelismPerKPU` times the limit for KPUs (which has a default of 64). The KPUs limit can be increased by requesting a limit increase. For instructions on how to request a limit increase, see "To request a limit increase" in [Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html).

For information about setting task parallelism for a specific operator, see [ Setting the Parallelism: Operator](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#operator-level) in the Apache Flink Documentation.

## Allocate Kinesis Processing Units
<a name="how-scaling-kpus"></a>

Managed Service for Apache Flink provisions capacity as KPUs. A single KPU provides you with 1 vCPU and 4 GB of memory. For every KPU allocated, 50 GB of running application storage is also provided. 

Managed Service for Apache Flink calculates the KPUs that are needed to run your application using the `Parallelism` and `ParallelismPerKPU` properties, as follows:

```
Allocated KPUs for the application = Parallelism/ParallelismPerKPU
```

Managed Service for Apache Flink quickly gives your applications resources in response to spikes in throughput or processing activity. It removes resources from your application gradually after the activity spike has passed. To disable the automatic allocation of resources, set the `AutoScalingEnabled` value to `false`, as described later in [Update your application's parallelism](#how-scaling-howto). 

The default limit for KPUs for your application is 64. For instructions on how to request an increase to this limit, see "To request a limit increase" in [Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html).

**Note**  
An additional KPU is charged for orchestrations purposes. For more information, see [Managed Service for Apache Flink pricing](https://aws.amazon.com/kinesis/data-analytics/pricing/).

## Update your application's parallelism
<a name="how-scaling-howto"></a>

This section contains sample requests for API actions that set an application's parallelism. For more examples and instructions for how to use request blocks with API actions, see [Managed Service for Apache Flink API example code](api-examples.md).

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) action sets parallelism when you are creating an application:

```
{
   "ApplicationName": "string",
   "RuntimeEnvironment":"FLINK-1_18",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
         "S3ContentLocation":{
            "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
            "FileKey":"myflink.jar",
            "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
            }
         },
      "CodeContentType":"ZIPFILE"
   },   
      "FlinkApplicationConfiguration": { 
         "ParallelismConfiguration": { 
            "AutoScalingEnabled": "true",
            "ConfigurationType": "CUSTOM",
            "Parallelism": 4,
            "ParallelismPerKPU": 4
         }
      }
   }
}
```

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) action sets parallelism for an existing application:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "true",
            "ConfigurationTypeUpdate": "CUSTOM",
            "ParallelismPerKPUUpdate": 4,
            "ParallelismUpdate": 4
         }
      }
   }
}
```

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) action disables parallelism for an existing application:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "false"
         }
      }
   }
}
```

# Use automatic scaling in Managed Service for Apache Flink
<a name="how-scaling-auto"></a>

Managed Service for Apache Flink elastically scales your application’s parallelism to accommodate the data throughput of your source and your operator complexity for most scenarios. Automatic scaling is enabled by default. Managed Service for Apache Flink monitors the resource (CPU) usage of your application, and elastically scales your application's parallelism up or down accordingly:
+ Your application scales up (increases parallelism) if CloudWatch metric maximum `containerCPUUtilization` is larger than 75 percent or above for 15 minutes. That means the `ScaleUp` action is initiated when there are 15 consecutive datapoints with 1 minute period equal to or over 75 percent. A `ScaleUp` action doubles the `CurrentParallelism` of your application. `ParallelismPerKPU` is not modified. As a consequence, the number of allocated KPUs also doubles. 
+ Your application scales down (decreases parallelism) when your CPU usage remains below 10 percent for six hours. That means the `ScaleDown` action is initiated when there are 360 consecutive datapoints with 1 minute period less than 10 percent. A `ScaleDown` action halves (rounded up) the parallelism of the application. `ParallelismPerKPU` is not modified, and the number of allocated KPUs also halves (rounded up). 

**Note**  
Max of `containerCPUUtilization` over 1 minute period can be referenced to find the correlation with a datapoint used for Scaling action, but it’s not necessary to reflect the exact moment when the action is initialized.

Managed Service for Apache Flink will not reduce your application's `CurrentParallelism` value to less than your application's `Parallelism` setting.

When the Managed Service for Apache Flink service is scaling your application, it will be in the `AUTOSCALING` status. You can check your current application status using the [ DescribeApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_DescribeApplication.html) or [ ListApplications](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html) actions. While the service is scaling your application, the only valid API action you can use is [ StopApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html) with the `Force` parameter set to `true`.

You can use the `AutoScalingEnabled` property (part of [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html) ) to enable or disable auto scaling behavior. Your AWS account is charged for KPUs that Managed Service for Apache Flink provisions which is a function of your application's `parallelism` and `parallelismPerKPU` settings. An activity spike increases your Managed Service for Apache Flink costs.

For information about pricing, see [Amazon Managed Service for Apache Flink pricing](https://aws.amazon.com/kinesis/data-analytics/pricing/). 

Note the following about application scaling:
+ Automatic scaling is enabled by default.
+ Scaling doesn't apply to Studio notebooks. However, if you deploy a Studio notebook as an application with durable state, then scaling will apply to the deployed application.
+ Your application has a default limit of 64 KPUs. For more information, see [Managed Service for Apache Flink and Studio notebook quota](limits.md).
+ When autoscaling updates application parallelism, the application experiences downtime. To avoid this downtime, do the following:
  + Disable automatic scaling
  + Configure your application's `parallelism` and `parallelismPerKPU` with the [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) action. For more information about setting your application's parallelism settings, see [Update your application's parallelism](how-scaling.md#how-scaling-howto).
  + Periodically monitor your application's resource usage to verify that your application has the correct parallelism settings for its workload. For information about monitoring allocation resource usage, see [Metrics and dimensions in Managed Service for Apache Flink](metrics-dimensions.md).

## Implement custom autoscaling
<a name="how-scaling-custom-autoscaling"></a>

If you want finer grained control on autoscaling or use trigger metrics other than `containerCPUUtilization`, you can use this example: 
+ [AutoScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/AutoScaling)

  This examples illustrates how to scale your Managed Service for Apache Flink application using a different CloudWatch metric from the Apache Flink application, including metrics from Amazon MSK and Amazon Kinesis Data Streams, used as sources or sink.

For additional information, see [Enhanced monitoring and automatic scaling for Apache Flink](https://aws.amazon.com/blogs/big-data/enhanced-monitoring-and-automatic-scaling-for-apache-flink/).

## Implement scheduled autoscaling
<a name="how-scaling-scheduled-autoscaling"></a>

If your workload follows a predictable profile over time, you might prefer to scale your Apache Flink application preemptively. This scales your application at a scheduled time, as opposed to scaling reactively based on a metric. To set up scaling up and down at fixed hours of the day, you can use this example:
+ [ScheduledScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/ScheduledScaling)

## maxParallelism considerations
<a name="how-scaling-auto-max-parallelism"></a>

The maximum parallelism a Flink job can scale is limited by the *minimum* `maxParallelism` across all operators of the job. For example, if you have a simple job with only a source and a sink, and the source has a `maxParallelism` of 16 and the sink has 8, the application can't scale beyond parallelism of 8.

To learn how the default `maxParallelism` of an operator is calculated and how to override the default, refer to [Setting the Maximum Parallelism](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism) in the Apache Flink docummentation.

As a basic rule, be aware that that if you don't define `maxParallelism` for any operator and you start your application with parallelism less than or equal to 128, all operators will have a `maxParallelism` of 128.

**Note**  
The job's maximum parallelism is the upper limit of parallelism for scaling your application retaining the state.   
If you modify `maxParallelism` of an existing application, the application won't be able to restart from a previous snapshot taken with the old `maxParallelism`. You can only restart the application without snapshot.   
If you plan to scale your application to a parallelism greater that 128, you must explicitly set the `maxParallelism` in your application.
+ Autoscaling logic will prevent scaling a Flink job to a parallelism that will exceed maximum parallelism of the job.
+ If you use a custom autoscaling or scheduled scaling, configure them so that they don't exceed the maximum parallelism of the job.
+ If you manually scale your application beyond maximum parallelism, the application fails to start.

# Add tags to Managed Service for Apache Flink applications
<a name="how-tagging"></a>



This section describes how to add key-value metadata tags to Managed Service for Apache Flink applications. These tags can be used for the following purposes:
+ Determining billing for individual Managed Service for Apache Flink applications. For more information, see [Using Cost Allocation Tags](https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/cost-alloc-tags.html) in the *Billing and Cost Management Guide*.
+ Controlling access to application resources based on tags. For more information, see [Controlling Access Using Tags](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_tags.html) in the *AWS Identity and Access Management User Guide*.
+ User-defined purposes. You can define application functionality based on the presence of user tags.

Note the following information about tagging:
+ The maximum number of application tags includes system tags. The maximum number of user-defined application tags is 50.
+ If an action includes a tag list that has duplicate `Key` values, the service throws an `InvalidArgumentException`.

**Topics**
+ [

# Add tags when an application is created
](how-tagging-create.md)
+ [

# Add or update tags for an existing application
](how-tagging-add.md)
+ [

# List tags for an application
](how-tagging-list.md)
+ [

# Remove tags from an application
](how-tagging-remove.md)

# Add tags when an application is created
<a name="how-tagging-create"></a>

You add tags when creating an application using the `tags` parameter of the [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) action.

The following example request shows the `Tags` node for a `CreateApplication` request:

```
"Tags": [ 
    { 
        "Key": "Key1",
        "Value": "Value1"
    },
    { 
        "Key": "Key2",
        "Value": "Value2"
    }
]
```

# Add or update tags for an existing application
<a name="how-tagging-add"></a>

You add tags to an application using the [TagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_TagResource.html) action. You cannot add tags to an application using the [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) action.

To update an existing tag, add a tag with the same key of the existing tag.

The following example request for the `TagResource` action adds new tags or updates existing tags:

```
{
   "ResourceARN": "string",
   "Tags": [ 
      { 
         "Key": "NewTagKey",
         "Value": "NewTagValue"
      },
      { 
         "Key": "ExistingKeyOfTagToUpdate",
         "Value": "NewValueForExistingTag"
      }
   ]
}
```

# List tags for an application
<a name="how-tagging-list"></a>

To list existing tags, you use the [ListTagsForResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListTagsForResource.html) action.

The following example request for the `ListTagsForResource` action lists tags for an application:

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication"
}
```

# Remove tags from an application
<a name="how-tagging-remove"></a>

To remove tags from an application, you use the [UntagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UntagResource.html) action.

The following example request for the `UntagResource` action removess tags from an application:

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication",
   "TagKeys": [ "KeyOfFirstTagToRemove", "KeyOfSecondTagToRemove" ]
}
```

# Use CloudFormation with Managed Service for Apache Flink
<a name="lambda-cfn-flink"></a>

The following exercise shows how to start a Flink application created with CloudFormation using a Lambda function in the same stack. 

## Before you begin
<a name="before-you-begin"></a>

Before you begin this exercise, follow the steps on creating a Flink application using CloudFormation at [AWS::KinesisAnalytics::Application](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesis-analyticsapplication.html).

## Write a Lambda function
<a name="write-lambda-function"></a>

To start a Flink application after creation or update, we use the kinesisanalyticsv2 [start-application](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/start-application.html) API. The call will be triggered by an CloudFormation event after Flink application creation. We’ll discuss how to set up the stack to trigger the Lambda function later in this exercise, but first we focus on the Lambda function declaration and its code. We use `Python3.8` runtime in this example. 

```
StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration. 
              run_configuration = { 
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': 'RESTORE_FROM_LATEST_SNAPSHOT',
                }
              }
                            
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
```

In the preceding code, Lambda processes incoming CloudFormation events, filters out everything besides `Create` and `Update`, gets the application state and start it if the state is `READY`. To get the application state, you must create the Lambda role, as shown following.

## Create a Lambda role
<a name="create-lambda-role"></a>

You create a role for Lambda to successfully “talk” to the application and write logs. This role uses default managed policies, but you might want to narrow it down to using custom policies.

```
StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
```

Note that the Lambda resources will be created after creation of the Flink application in the same stack because they depend on it.

## Invoke the Lambda function
<a name="invoking-lambda-function"></a>

Now all that is left is to invoke the Lambda function. You do this by using a [custom resource](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cfn-customresource.html).

```
StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
```

This is all you need to start your Flink application using Lambda. You are now ready to create your own stack or use the full example below to see how all those steps work in practice.

## Review an extended example
<a name="lambda-cfn-flink-full-example"></a>

The following example is a slightly extended version of the previous steps with an additional `RunConfiguration` adjusting done via [template parameters](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/parameters-section-structure.html). This is a working stack for you to try. Be sure to read the accompanying notes: 

stack.yaml

```
Description: 'kinesisanalyticsv2 CloudFormation Test Application'
Parameters:
  ApplicationRestoreType:
    Description: ApplicationRestoreConfiguration option, can be SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT or RESTORE_FROM_CUSTOM_SNAPSHOT.
    Type: String
    Default: SKIP_RESTORE_FROM_SNAPSHOT
    AllowedValues: [ SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT, RESTORE_FROM_CUSTOM_SNAPSHOT ]
  SnapshotName:
    Description: ApplicationRestoreConfiguration option, name of a snapshot to restore to, used with RESTORE_FROM_CUSTOM_SNAPSHOT ApplicationRestoreType.
    Type: String
    Default: ''
  AllowNonRestoredState:
    Description: FlinkRunConfiguration option, can be true or false.
    Default: true
    Type: String
    AllowedValues: [ true, false ]
  CodeContentBucketArn:
    Description: ARN of a bucket with application code.
    Type: String
  CodeContentFileKey:
    Description: A jar filename with an application code inside a bucket.
    Type: String
Conditions:
  IsSnapshotNameEmpty: !Equals [ !Ref SnapshotName, '' ]
Resources:
  TestServiceExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service: 
                - kinesisanlaytics.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonKinesisFullAccess
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
      Path: /
  InputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  OutputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  TestFlinkApplication:
    Type: 'AWS::kinesisanalyticsv2::Application'
    Properties:
      ApplicationName: 'CFNTestFlinkApplication'
      ApplicationDescription: 'Test Flink Application'
      RuntimeEnvironment: 'FLINK-1_18'
      ServiceExecutionRole: !GetAtt TestServiceExecutionRole.Arn
      ApplicationConfiguration:
        EnvironmentProperties:
          PropertyGroups:
            - PropertyGroupId: 'KinesisStreams'
              PropertyMap:
                INPUT_STREAM_NAME: !Ref InputKinesisStream
                OUTPUT_STREAM_NAME: !Ref OutputKinesisStream
                AWS_REGION: !Ref AWS::Region
        FlinkApplicationConfiguration:
          CheckpointConfiguration:
            ConfigurationType: 'CUSTOM'
            CheckpointingEnabled: True
            CheckpointInterval: 1500
            MinPauseBetweenCheckpoints: 500
          MonitoringConfiguration:
            ConfigurationType: 'CUSTOM'
            MetricsLevel: 'APPLICATION'
            LogLevel: 'INFO'
          ParallelismConfiguration:
            ConfigurationType: 'CUSTOM'
            Parallelism: 1
            ParallelismPerKPU: 1
            AutoScalingEnabled: True
        ApplicationSnapshotConfiguration:
          SnapshotsEnabled: True
        ApplicationCodeConfiguration:
          CodeContent:
            S3ContentLocation:
              BucketARN: !Ref CodeContentBucketArn
              FileKey: !Ref CodeContentFileKey
          CodeContentType: 'ZIPFILE'     
  StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
  StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration from passed parameters. 
              run_configuration = { 
                'FlinkRunConfiguration': {
                  'AllowNonRestoredState': event['ResourceProperties']['AllowNonRestoredState'] == 'true'
                },
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': event['ResourceProperties']['ApplicationRestoreType'],
                }
              }
              
              # add SnapshotName to RunConfiguration if specified.
              if event['ResourceProperties']['SnapshotName'] != '':
                run_configuration['ApplicationRestoreConfiguration']['SnapshotName'] = event['ResourceProperties']['SnapshotName']
              
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
  StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
      ApplicationRestoreType: !Ref ApplicationRestoreType
      SnapshotName: !Ref SnapshotName
      AllowNonRestoredState: !Ref AllowNonRestoredState
```

Again, you might want to adjust the roles for Lambda as well as an application itself.

Before creating the stack above, don’t forget to specify your parameters.

parameters.json

```
[
  {
    "ParameterKey": "CodeContentBucketArn",
    "ParameterValue": "YOUR_BUCKET_ARN"
  },
  {
    "ParameterKey": "CodeContentFileKey",
    "ParameterValue": "YOUR_JAR"
  },
  {
    "ParameterKey": "ApplicationRestoreType",
    "ParameterValue": "SKIP_RESTORE_FROM_SNAPSHOT"
  },
  {
    "ParameterKey": "AllowNonRestoredState",
    "ParameterValue": "true"
  }
]
```

Replace `YOUR_BUCKET_ARN` and `YOUR_JAR` with your specific requirements. You can follow this [guide](https://docs.aws.amazon.com/managed-flink/latest/java/get-started-exercise.html) to create an Amazon S3 bucket and an application jar.

Now create the stack (replace YOUR\$1REGION with a region of your choice, e.g. us-east-1):

```
aws cloudformation create-stack --region YOUR_REGION --template-body "file://stack.yaml" --parameters "file://parameters.json" --stack-name "TestManaged Service for Apache FlinkStack" --capabilities CAPABILITY_NAMED_IAM
```

You can now navigate to [https://console.aws.amazon.com/cloudformation](https://console.aws.amazon.com/cloudformation) and view the progress. Once created you should see your Flink application in `Starting` state. It may take a few minutes until it will start `Running`. 

For more information, see the following:
+ [Four ways to retrieve any AWS service property using AWS CloudFormation (Part 1 of 3)](https://aws.amazon.com/blogs/mt/four-ways-to-retrieve-any-aws-service-property-using-aws-cloudformation-part-1/).
+ [Walkthrough: Looking up Amazon Machine Image IDs](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/walkthrough-custom-resources-lambda-lookup-amiids.html).

# Use the Apache Flink Dashboard with Managed Service for Apache Flink
<a name="how-dashboard"></a>

You can use your application's Apache Flink Dashboard to monitor your Managed Service for Apache Flink application's health. Your application's dashboard shows the following information:
+ Resources in use, including Task Managers and Task Slots. 
+ Information about Jobs, including those that are running, completed, canceled, and failed. 

For information about Apache Flink Task Managers, Task Slots, and Jobs, see [Apache Flink Architecture](https://flink.apache.org/what-is-flink/flink-architecture/) on the Apache Flink website. 

Note the following about using the Apache Flink Dashboard with Managed Service for Apache Flink applications:
+ The Apache Flink Dashboard for Managed Service for Apache Flink applications is read-only. You can't make changes to your Managed Service for Apache Flink application using the Apache Flink Dashboard.
+ The Apache Flink Dashboard is not compatible with Microsoft Internet Explorer.

## Access your application's Apache Flink Dashboard
<a name="how-dashboard-accessing"></a>

You can access your application's Apache Flink Dashboard either through the Managed Service for Apache Flink console, or by requesting a secure URL endpoint using the CLI.

### Access your application's Apache Flink Dashboard using the Managed Service for Apache Flink console
<a name="how-dashboard-accessing-console"></a>

To access your application's Apache Flink Dashboard from the console, choose **Apache Flink Dashboard** on your application's page.

**Note**  
When you open the dashboard from the Managed Service for Apache Flink console, the URL that the console generates will be valid for 12 hours.

### Access your application's Apache Flink Dashboard using the Managed Service for Apache Flink CLI
<a name="how-dashboard-accessing-cli"></a>

You can use the Managed Service for Apache Flink CLI to generate a URL to access your application dashboard. The URL that you generate is valid for a specified amount of time.

**Note**  
If you don't access the generated URL within three minutes, it will no longer be valid.

You generate your dashboard URL using the [ CreateApplicationPresignedUrl](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationPresignedUrl.html) action. You specify the following parameters for the action: 
+ The application name
+ The time in seconds that the URL will be valid
+ You specify `FLINK_DASHBOARD_URL` as the URL type.