

# Logging and monitoring in Amazon Managed Service for Apache Flink
<a name="monitoring-overview"></a>

Monitoring is an important part of maintaining the reliability, availability, and performance of Managed Service for Apache Flink applications. You should collect monitoring data from all of the parts of your AWS solution so that you can more easily debug a multipoint failure if one occurs. 

Before you start monitoring Managed Service for Apache Flink, you should create a monitoring plan that includes answers to the following questions:
+ What are your monitoring goals?
+ What resources will you monitor?
+ How often will you monitor these resources?
+ What monitoring tools will you use?
+ Who will perform the monitoring tasks?
+ Who should be notified when something goes wrong?

The next step is to establish a baseline for normal Managed Service for Apache Flink performance in your environment. You do this by measuring performance at various times and under different load conditions. As you monitor Managed Service for Apache Flink, you can store historical monitoring data. You can then compare it with current performance data, identify normal performance patterns and performance anomalies, and devise methods to address issues.

**Topics**
+ [

# Logging in Managed Service for Apache Flink
](logging.md)
+ [

# Monitoring in Managed Service for Apache Flink
](monitoring.md)
+ [

# Set up application logging in Managed Service for Apache Flink
](cloudwatch-logs.md)
+ [

# Analyze logs with CloudWatch Logs Insights
](cloudwatch-logs-reading.md)
+ [

# Metrics and dimensions in Managed Service for Apache Flink
](metrics-dimensions.md)
+ [

# Write custom messages to CloudWatch Logs
](cloudwatch-logs-writing.md)
+ [

# Log Managed Service for Apache Flink API calls with AWS CloudTrail
](logging-using-cloudtrail.md)

# Logging in Managed Service for Apache Flink
<a name="logging"></a>

Logging is important for production applications to understand errors and failures. However, the logging subsystem needs to collect and forward log entries to CloudWatch Logs While some logging is fine and desirable, extensive logging can overload the service and cause the Flink application to fall behind. Logging exceptions and warnings is certainly a good idea. But you cannot generate a log message for each and every message that is processed by the Flink application. Flink is optimized for high throughout and low latency, the logging subsystem is not. In case it is really required to generate log output for every processed message, use an additional DataStream inside the Flink application and a proper sink to send the data to Amazon S3 or CloudWatch. Do not use the Java logging system for this purpose. Moreover, Managed Service for Apache Flink’ `Debug Monitoring Log Level` setting generates a large amount of traffic, which can create backpressure. You should only use it while actively investigating issues with the application.

## Query logs with CloudWatch Logs Insights
<a name="logging-querying"></a>

CloudWatch Logs Insights is a powerful service to query log at scale. Customers should leverage its capabilities to quickly search through logs to identify and mitigate errors during operational events.

 The following query looks for exceptions in all task manager logs and orders them according to the time they occurred.

```
fields @timestamp, @message
| filter isPresent(throwableInformation.0) or isPresent(throwableInformation) or @message like /(Error|Exception)/
| sort @timestamp desc
```

For other useful queries, see [Example Queries](https://docs.aws.amazon.com/managed-flink/latest/java/cloudwatch-logs-reading.html#cloudwatch-logs-reading-examples).

# Monitoring in Managed Service for Apache Flink
<a name="monitoring"></a>

When running streaming applications in production, you set out to execute the application continuously and indefinitely. It is crucial to implement monitoring and proper alarming of all components not only the Flink application. Otherwise you risk to miss emerging problems early on and only realize an operational event once it is fully unravelling and much harder to mitigate. General things to monitor include:
+ Is the source ingesting data?
+ Is data read from the source (from the perspective of the source)?
+ Is the Flink application receiving data?
+ Is the Flink application able to keep up or is it falling behind?
+ Is the Flink application persisting data into the sink (from the application perspective)?
+ Is the sink receiving data?

More specific metrics should then be considered for the Flink application. This [CloudWatch dashboard](https://github.com/aws-samples/kda-metrics-dashboard) provides a good starting point. For more information on what metrics to monitor for production applications, see [Use CloudWatch Alarms with Amazon Managed Service for Apache Flink](monitoring-metrics-alarms.md). These metrics include:
+ **records\$1lag\$1max** and **millisbehindLatest** – If the application is consuming from Kinesis or Kafka, these metrics indicate if the application is falling behind and needs to be scaled in order to keep up with the current load. This is a good generic metric that is easy to track for all kinds of applications. But it can only be used for reactive scaling, i.e., when the application has already fallen behind.
+ **cpuUtilization** and **heapMemoryUtilization** – These metrics give a good indication of the overall resource utilization of the application and can be used for proactive scaling unless the application is I/O bound.
+ **downtime** – A downtime greater than zero indicates that the application has failed. If the value is larger than 0, the application is not processing any data.
+ **lastCheckpointSize** and *lastCheckpointDuration* – These metrics monitor how much data is stored in state and how long it takes to take a checkpoint. If checkpoints grow or take long, the application is continuously spending time on checkpointing and has less cycles for actual processing. At some points, checkpoints may grow too large or take so long that they fail. In addition to monitoring absolute values, customers should also considering monitoring the change rate with `RATE(lastCheckpointSize)` and `RATE(lastCheckpointDuration)`.
+ **numberOfFailedCheckpoints** – This metric counts the number of failed checkpoints since the application started. Depending on the application, it can be tolerable if checkpoints fail occasionally. But if checkpoints are regularly failing, the application is likely unhealthy and needs further attention. We recommend monitoring `RATE(numberOfFailedCheckpoints)` to alarm on the gradient and not on absolute values.

# Set up application logging in Managed Service for Apache Flink
<a name="cloudwatch-logs"></a>

By adding an Amazon CloudWatch logging option to your Managed Service for Apache Flink application, you can monitor for application events or configuration problems.

This topic describes how to configure your application to write application events to a CloudWatch Logs stream. A CloudWatch logging option is a collection of application settings and permissions that your application uses to configure the way it writes application events to CloudWatch Logs. You can add and configure a CloudWatch logging option using either the AWS Management Console or the AWS Command Line Interface (AWS CLI).

Note the following about adding a CloudWatch logging option to your application:
+ When you add a CloudWatch logging option using the console, Managed Service for Apache Flink creates the CloudWatch log group and log stream for you and adds the permissions your application needs to write to the log stream. 
+ When you add a CloudWatch logging option using the API, you must also create the application's log group and log stream, and add the permissions your application needs to write to the log stream.

## Set up CloudWatch logging using the console
<a name="cloudwatch-logs-console"></a>

When you enable CloudWatch logging for your application in the console, a CloudWatch log group and log stream is created for you. Also, your application's permissions policy is updated with permissions to write to the stream. 

Managed Service for Apache Flink creates a log group named using the following convention, where *ApplicationName* is your application's name.

```
/aws/kinesis-analytics/ApplicationName
```

Managed Service for Apache Flink creates a log stream in the new log group with the following name.

```
kinesis-analytics-log-stream
```

You set the application monitoring metrics level and monitoring log level using the **Monitoring log level** section of the **Configure application** page. For information about application log levels, see [Control application monitoring levels](#cloudwatch_levels).

## Set up CloudWatch logging using the CLI
<a name="cloudwatch-logs-api"></a>

To add a CloudWatch logging option using the AWS CLI, you complete the following: 
+ Create a CloudWatch log group and log stream.
+ Add a logging option when you create an application by using the [CreateApplication](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) action, or add a logging option to an existing application using the [AddApplicationCloudWatchLoggingOption](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_AddApplicationCloudWatchLoggingOption.html) action.
+ Add permissions to your application's policy to write to the logs.

### Create a CloudWatch log group and log stream
<a name="cloudwatch-logs-api-create"></a>

You create a CloudWatch log group and stream using either the CloudWatch Logs console or the API. For information about creating a CloudWatch log group and log stream, see [Working with Log Groups and Log Streams](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html).

### Work with application CloudWatch logging options
<a name="adding_cloudwatch"></a>

Use the following API actions to add a CloudWatch log option to a new or existing application or change a log option for an existing application. For information about how to use a JSON file for input for an API action, see [Managed Service for Apache Flink API example code](api-examples.md).

#### Add a CloudWatch log option when creating an application
<a name="add_cloudwatch_create"></a>

The following example demonstrates how to use the `CreateApplication` action to add a CloudWatch log option when you create an application. In the example, replace *Amazon Resource Name (ARN) of the CloudWatch Log stream to add to the new application* with your own information. For more information about the action, see [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html).

```
{
    "ApplicationName": "test",
    "ApplicationDescription": "test-application-description",
    "RuntimeEnvironment": "FLINK-1_15",
    "ServiceExecutionRole": "arn:aws:iam::123456789123:role/myrole",
    "ApplicationConfiguration": {
        "ApplicationCodeConfiguration": {
            "CodeContent": {
                "S3ContentLocation":{
                              "BucketARN": "arn:aws:s3:::amzn-s3-demo-bucket",
                              "FileKey": "myflink.jar"
                }
            },
            "CodeContentType": "ZIPFILE"
        }
    },
    "CloudWatchLoggingOptions": [{
      "LogStreamARN": "<Amazon Resource Name (ARN) of the CloudWatch log stream to add to the new application>"
	}]
}
```

#### Add a CloudWatch log option to an existing application
<a name="add_to_existing_app"></a>

The following example demonstrates how to use the `AddApplicationCloudWatchLoggingOption` action to add a CloudWatch log option to an existing application. In the example, replace each *user input placeholder* with your own information. For more information about the action, see [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_AddApplicationCloudWatchLoggingOption.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_AddApplicationCloudWatchLoggingOption.html).

```
{
   "ApplicationName": "<Name of the application to add the log option to>",
   "CloudWatchLoggingOption": { 
      "LogStreamARN": "<ARN of the log stream to add to the application>"
   },
   "CurrentApplicationVersionId": <Version of the application to add the log to>
}
```

#### Update an existing CloudWatch log option
<a name="update_existing"></a>

The following example demonstrates how to use the `UpdateApplication` action to modify an existing CloudWatch log option. In the example, replace each *user input placeholder* with your own information. For more information about the action, see [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html).

```
{
   "ApplicationName": "<Name of the application to update the log option for>",
   "CloudWatchLoggingOptionUpdates": [ 
         { 
            "CloudWatchLoggingOptionId": "<ID of the logging option to modify>",
            "LogStreamARNUpdate": "<ARN of the new log stream to use>"
         }
      ],
   "CurrentApplicationVersionId": <ID of the application version to modify>
}
```

#### Delete a CloudWatch log option from an application
<a name="delete-log"></a>

The following example demonstrates how to use the `DeleteApplicationCloudWatchLoggingOption` action to delete an existing CloudWatch log option. In the example, replace each *user input placeholder* with your own information. For more information about the action, see [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationCloudWatchLoggingOption.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationCloudWatchLoggingOption.html).

```
{
   "ApplicationName": "<Name of application to delete log option from>",
   "CloudWatchLoggingOptionId": "<ID of the application log option to delete>",
   "CurrentApplicationVersionId": <Version of the application to delete the log option from>
}
```

#### Set the application logging level
<a name="cloudwatch-level"></a>

To set the level of application logging, use the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_MonitoringConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_MonitoringConfiguration.html) parameter of the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) action or the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_MonitoringConfigurationUpdate.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_MonitoringConfigurationUpdate.html) parameter of the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) action. 

For information about application log levels, see [Control application monitoring levels](#cloudwatch_levels).

##### Set the application logging level when creating an application
<a name="cloudwatch-level-create"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) action sets the application log level to `INFO`.

```
{
   "ApplicationName": "MyApplication",                    
   "ApplicationDescription": "My Application Description",
   "ApplicationConfiguration": {
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "CodeContentType":"ZIPFILE"
      },
      "FlinkApplicationConfiguration": 
         "MonitoringConfiguration": { 
            "ConfigurationType": "CUSTOM",
            "LogLevel": "INFO"
         }
      },
   "RuntimeEnvironment": "FLINK-1_15",
   "ServiceExecutionRole": "arn:aws:iam::123456789123:role/myrole"
}
```

##### Update the application logging level
<a name="cloudwatch-level-update"></a>

The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) action sets the application log level to `INFO`.

```
{
   "ApplicationConfigurationUpdate": {
      "FlinkApplicationConfigurationUpdate": { 
         "MonitoringConfigurationUpdate": { 
            "ConfigurationTypeUpdate": "CUSTOM",
            "LogLevelUpdate": "INFO"
         }
      }
   }
}
```

### Add permissions to write to the CloudWatch log stream
<a name="enable_putlogevents"></a>

Managed Service for Apache Flink needs permissions to write misconfiguration errors to CloudWatch. You can add these permissions to the AWS Identity and Access Management (IAM) role that Managed Service for Apache Flink assumes.

For more information about using an IAM role for Managed Service for Apache Flink, see [Identity and Access Management for Amazon Managed Service for Apache Flink](security-iam.md).

#### Trust policy
<a name="enable_putlogevents_trust_policy"></a>

To grant Managed Service for Apache Flink permissions to assume an IAM role, you can attach the following trust policy to the service execution role.

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kinesisanalytics.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```

------

#### Permissions policy
<a name="enable_putlogevents_permissions_policy"></a>

To grant permissions to an application to write log events to CloudWatch from a Managed Service for Apache Flink resource, you can use the following IAM permissions policy. Provide the correct Amazon Resource Names (ARNs) for your log group and stream.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt0123456789000",
            "Effect": "Allow",
            "Action": [
                "logs:PutLogEvents",
                "logs:DescribeLogGroups",
                "logs:DescribeLogStreams"
            ],
            "Resource": [
                "arn:aws:logs:us-east-1:123456789012:log-group:my-log-group:log-stream:my-log-stream*",
                "arn:aws:logs:us-east-1:123456789012:log-group:my-log-group:*",
                "arn:aws:logs:us-east-1:123456789012:log-group:*"
            ]
        }
    ]
}
```

------

## Control application monitoring levels
<a name="cloudwatch_levels"></a>

You control the generation of application log messages using the application's *Monitoring Metrics Level* and *Monitoring Log Level*.

The application's monitoring metrics level controls the granularity of log messages. Monitoring metrics levels are defined as follows:
+ **Application**: Metrics are scoped to the entire application.
+ **Task**: Metrics are scoped to each task. For information about tasks, see [Implement application scaling in Managed Service for Apache Flink](how-scaling.md).
+ **Operator**: Metrics are scoped to each operator. For information about operators, see [Transform data using operators in Managed Service for Apache Flink with the DataStream API](how-operators.md).
+ **Parallelism**: Metrics are scoped to application parallelism. You can only set this metrics level using the [ MonitoringConfigurationUpdate](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_MonitoringConfigurationUpdate.html) parameter of the [ UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) API. You cannot set this metrics level using the console. For information about parallelism, see [Implement application scaling in Managed Service for Apache Flink](how-scaling.md).

The application's monitoring log level controls the verbosity of the application's log. Monitoring log levels are defined as follows:
+ **Error**: Potential catastrophic events of the application.
+ **Warn**: Potentially harmful situations of the application.
+ **Info**: Informational and transient failure events of the application. We recommend that you use this logging level.
+ **Debug**: Fine-grained informational events that are most useful to debug an application. *Note*: Only use this level for temporary debugging purposes. 

## Apply logging best practices
<a name="cloudwatch_bestpractices"></a>

We recommend that your application use the **Info** logging level. We recommend this level to ensure that you see Apache Flink errors, which are logged at the **Info** level rather than the **Error** level.

We recommend that you use the **Debug** level only temporarily while investigating application issues. Switch back to the **Info** level when the issue is resolved. Using the **Debug** logging level will significantly affect your application's performance.

Excessive logging can also significantly impact application performance. We recommend that you do not write a log entry for every record processed, for example. Excessive logging can cause severe bottlenecks in data processing and can lead to back pressure in reading data from the sources.

## Perform logging troubleshooting
<a name="cloudwatch_troubleshooting"></a>

If application logs are not being written to the log stream, verify the following: 
+ Verify that your application's IAM role and policies are correct. Your application's policy needs the following permissions to access your log stream:
  + `logs:PutLogEvents`
  + `logs:DescribeLogGroups`
  + `logs:DescribeLogStreams`

  For more information, see [Add permissions to write to the CloudWatch log stream](#enable_putlogevents).
+ Verify that your application is running. To check your application's status, view your application's page in the console, or use the [DescribeApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplication.html) or [ListApplications](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplications.html) actions.
+ Monitor CloudWatch metrics such as `downtime` to diagnose other application issues. For information about reading CloudWatch metrics, see [Metrics and dimensions in Managed Service for Apache Flink](metrics-dimensions.md).

## Use CloudWatch Logs Insights
<a name="cloudwatch_next"></a>

After you have enabled CloudWatch logging in your application, you can use CloudWatch Logs Insights to analyze your application logs. For more information, see [Analyze logs with CloudWatch Logs Insights](cloudwatch-logs-reading.md).

# Analyze logs with CloudWatch Logs Insights
<a name="cloudwatch-logs-reading"></a>

After you've added a CloudWatch logging option to your application as described in the previous section, you can use CloudWatch Logs Insights to query your log streams for specific events or errors.

CloudWatch Logs Insights enables you to interactively search and analyze your log data in CloudWatch Logs. 

For information on getting started with CloudWatch Logs Insights, see [Analyze Log Data with CloudWatch Logs Insights](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AnalyzingLogData.html).

## Run a sample query
<a name="cloudwatch-logs-reading-run"></a>

This section describes how to run a sample CloudWatch Logs Insights query.

**Prerequisites**
+ Existing log groups and log streams set up in CloudWatch Logs.
+ Existing logs stored in CloudWatch Logs.

If you use services such as AWS CloudTrail, Amazon Route 53, or Amazon VPC, you've probably already set up logs from those services to go to CloudWatch Logs. For more information about sending logs to CloudWatch Logs, see [Getting Started with CloudWatch Logs](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_GettingStarted.html).

Queries in CloudWatch Logs Insights return either a set of fields from log events, or the result of a mathematical aggregation or other operation performed on log events. This section demonstrates a query that returns a list of log events.

**To run a CloudWatch Logs Insights sample query**

1. Open the CloudWatch console at [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. In the navigation pane, choose **Insights**.

   

1. The query editor near the top of the screen contains a default query that returns the 20 most recent log events. Above the query editor, select a log group to query.

   

   When you select a log group, CloudWatch Logs Insights automatically detects fields in the data in the log group and displays them in **Discovered fields** in the right pane. It also displays a bar graph of log events in this log group over time. This bar graph shows the distribution of events in the log group that matches your query and time range, not just the events displayed in the table.

1. Choose **Run query**.

   The results of the query appear. In this example, the results are the most recent 20 log events of any type.

1. To see all of the fields for one of the returned log events, choose the arrow to the left of that log event.

For more information about how to run and modify CloudWatch Logs Insights queries, see [Run and Modify a Sample Query](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_AnalyzeLogData_RunSampleQuery.html).

## Review example queries
<a name="cloudwatch-logs-reading-examples"></a>

This section contains CloudWatch Logs Insights example queries for analyzing Managed Service for Apache Flink application logs. These queries search for several example error conditions, and serve as templates for writing queries that find other error conditions. 

**Note**  
Replace the Region (*us-west-2*), Account ID (*012345678901*) and application name (*YourApplication*) in the following query examples with your application's Region and your Account ID.

**Topics**
+ [

### Analyze operations: Distribution of tasks
](#cloudwatch-logs-reading-tm)
+ [

### Analyze operations: Change in parallelism
](#cloudwatch-logs-reading-auto)
+ [

### Analyze errors: Access denied
](#cloudwatch-logs-reading-access)
+ [

### Analyze errors: Source or sink not found
](#cloudwatch-logs-reading-con)
+ [

### Analyze errors: Application task-related failures
](#cloudwatch-logs-reading-apps)

### Analyze operations: Distribution of tasks
<a name="cloudwatch-logs-reading-tm"></a>

The following CloudWatch Logs Insights query returns the number of tasks the Apache Flink Job Manager distributes between Task Managers. You need to set the query's time frame to match one job run so that the query doesn't return tasks from previous jobs. For more information about Parallelism, see [Implement application scaling](how-scaling.md). 

```
fields @timestamp, message
| filter message like /Deploying/
| parse message " to flink-taskmanager-*" as @tmid
| stats count(*) by @tmid
| sort @timestamp desc
| limit 2000
```

The following CloudWatch Logs Insights query returns the subtasks assigned to each Task Manager. The total number of subtasks is the sum of every task's parallelism. Task parallelism is derived from operator parallelism, and is the same as the application's parallelism by default, unless you change it in code by specifying `setParallelism`. For more information about setting operator parallelism, see [ Setting the Parallelism: Operator Level](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/parallel.html#operator-level) in the [Apache Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

```
fields @timestamp, @tmid, @subtask
| filter message like /Deploying/
| parse message "Deploying * to flink-taskmanager-*" as @subtask, @tmid
| sort @timestamp desc
| limit 2000
```

For more information about task scheduling, see [Jobs and Scheduling](https://nightlies.apache.org/flink/flink-docs-release-1.15/internals/job_scheduling.html) in the [Apache Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

### Analyze operations: Change in parallelism
<a name="cloudwatch-logs-reading-auto"></a>

The following CloudWatch Logs Insights query returns changes to an application's parallelism (for example, due to automatic scaling). This query also returns manual changes to the application's parallelism. For more information about automatic scaling, see [Use automatic scaling in Managed Service for Apache Flink](how-scaling-auto.md).

```
fields @timestamp, @parallelism
| filter message like /property: parallelism.default, /
| parse message "default, *" as @parallelism
| sort @timestamp asc
```

### Analyze errors: Access denied
<a name="cloudwatch-logs-reading-access"></a>

The following CloudWatch Logs Insights query returns `Access Denied` logs.

```
fields @timestamp, @message, @messageType
| filter applicationARN like /arn:aws:kinesisanalyticsus-west-2:012345678901:application\/YourApplication/
| filter @message like /AccessDenied/
| sort @timestamp desc
```

### Analyze errors: Source or sink not found
<a name="cloudwatch-logs-reading-con"></a>

The following CloudWatch Logs Insights query returns `ResourceNotFound` logs. `ResourceNotFound` logs result if a Kinesis source or sink is not found.

```
fields @timestamp,@message
| filter applicationARN like /arn:aws:kinesisanalyticsus-west-2:012345678901:application\/YourApplication/
| filter @message like /ResourceNotFoundException/
| sort @timestamp desc
```

### Analyze errors: Application task-related failures
<a name="cloudwatch-logs-reading-apps"></a>

The following CloudWatch Logs Insights query returns an application's task-related failure logs. These logs result if an application's status switches from `RUNNING` to `RESTARTING`.

```
fields @timestamp,@message
| filter applicationARN like /arn:aws:kinesisanalyticsus-west-2:012345678901:application\/YourApplication/
| filter @message like /switched from RUNNING to RESTARTING/
| sort @timestamp desc
```

For applications using Apache Flink version 1.8.2 and prior, task-related failures will result in the application status switching from `RUNNING` to `FAILED` instead. When using Apache Flink 1.8.2 and prior, use the following query to search for application task-related failures:

```
fields @timestamp,@message
| filter applicationARN like /arn:aws:kinesisanalyticsus-west-2:012345678901:application\/YourApplication/
| filter @message like /switched from RUNNING to FAILED/
| sort @timestamp desc
```

# Metrics and dimensions in Managed Service for Apache Flink
<a name="metrics-dimensions"></a>

When your Managed Service for Apache Flink processes a data source, Managed Service for Apache Flink reports the following metrics and dimensions to Amazon CloudWatch.

**Flink 2.2 metric changes**  
Flink 2.2 introduces metric changes that may affect your monitoring and alarms. Review the following changes before upgrading:  
The `fullRestarts` metric has been removed. Use `numRestarts` instead.
The `uptime` and `downtime` metrics are deprecated and will be removed in a future release. Migrate to the new state-specific metrics.
The `bytesRequestedPerFetch` metric for Kinesis Data Streams connector 6.0.0 has been removed.

## Application metrics
<a name="metrics-dimensions-jobs"></a>


| Metric | Unit | Description | Level | Usage Notes | 
| --- | --- | --- | --- | --- | 
| backPressuredTimeMsPerSecond\$1 | Milliseconds | The time (in milliseconds) this task or operator is back pressured per second. | Task, Operator, Parallelism | \$1Available for Managed Service for Apache Flink applications running Flink version 1.13 only. These metrics can be useful in identifying bottlenecks in an application. | 
| busyTimeMsPerSecond\$1 | Milliseconds | The time (in milliseconds) this task or operator is busy (neither idle nor back pressured) per second. Can be NaN, if the value could not be calculated. | Task, Operator, Parallelism | \$1Available for Managed Service for Apache Flink applications running Flink version 1.13 only. These metrics can be useful in identifying bottlenecks in an application. | 
| cpuUtilization | Percentage | Overall percentage of CPU utilization across task managers. For example, if there are five task managers, Managed Service for Apache Flink publishes five samples of this metric per reporting interval. | Application | You can use this metric to monitor minimum, average, and maximum CPU utilization in your application. The CPUUtilization metric only accounts for CPU usage of the TaskManager JVM process running inside the container.  | 
| containerCPUUtilization | Percentage | Overall percentage of CPU utilization across task manager containers in Flink application cluster. For example, if there are five task managers, correspondingly there are five TaskManager containers and Managed Service for Apache Flink publishes 2 \$1 five samples of this metric per 1 minute reporting interval. | Application | It is calculated per container as: *Total CPU time (in seconds) consumed by container \$1 100 / Container CPU limit (in CPUs/seconds)* The `CPUUtilization` metric only accounts for CPU usage of the TaskManager JVM process running inside the container. There are other components running outside the JVM within the same container. The `containerCPUUtilization` metric gives you a more complete picture, including all processes in terms of CPU exhaustion at the container and failures resulting from that.  | 
| containerMemoryUtilization | Percentage | Overall percentage of memory utilization across task manager containers in Flink application cluster. For example, if there are five task managers, correspondingly there are five TaskManager containers and Managed Service for Apache Flink publishes 2 \$1 five samples of this metric per 1 minute reporting interval. | Application | It is calculated per container as: *Container memory usage (bytes) \$1 100 / Container memory limit as per pod deployment spec (in bytes)* The `HeapMemoryUtilization` and `ManagedMemoryUtilzations` metrics only account for specific memory metrics like Heap Memory Usage of TaskManager JVM or Managed Memory (memory usage outside JVM for native processes like [RocksDB State Backend](https://flink.apache.org/2021/01/18/rocksdb.html#:~:text=Conclusion-,The%20RocksDB%20state%20backend%20(i.e.%2C%20RocksDBStateBackend)%20is%20one%20of,with%20exactly%2Donce%20processing%20guarantees.)). The `containerMemoryUtilization` metric gives you a more complete picture by including the working set memory, which is a better tracker of total memory exhaustion. Upon its exhaustion, it will result in `Out of Memory Error` for the TaskManager pod.  | 
| containerDiskUtilization | Percentage | Overall percentage of disk utilization across task manager containers in Flink application cluster. For example, if there are five task managers, correspondingly there are five TaskManager containers and Managed Service for Apache Flink publishes 2 \$1 five samples of this metric per 1 minute reporting interval. | Application | It is calculated per container as: *Disk usage in bytes \$1 100 / Disk Limit for container in bytes* For containers, it represents utilization of the filesystem on which root volume of the container is set up.  | 
| currentInputWatermark | Milliseconds | The last watermark this application/operator/task/thread has received | Application, Operator, Task, Parallelism | This record is only emitted for dimensions with two inputs. This is the minimum value of the last received watermarks. | 
| currentOutputWatermark | Milliseconds | The last watermark this application/operator/task/thread has emitted | Application, Operator, Task, Parallelism |  | 
| downtime [DEPRECATED] | Milliseconds | For jobs currently in a failing/recovering situation, the time elapsed during this outage. | Application | This metric measures the time elapsed while a job is failing or recovering. This metric returns 0 for running jobs and -1 for completed jobs. If this metric is not 0 or -1, this indicates that the Apache Flink job for the application failed to run. **Deprecated in Flink 2.2.** Use `restartingTime`, `cancellingTime`, and/or `failingTime` instead. | 
| failingTime | Milliseconds | The time (in milliseconds) that the application has spent in a failing state. Use this metric to monitor application failures and trigger alerts. | Application, Flow | Available from Flink 2.2. Replaces part of the deprecated downtime metric. | 
| heapMemoryUtilization | Percentage | Overall heap memory utilization across task managers. For example, if there are five task managers, Managed Service for Apache Flink publishes five samples of this metric per reporting interval. | Application | You can use this metric to monitor minimum, average, and maximum heap memory utilization in your application. The HeapMemoryUtilization only accounts for specific memory metrics like Heap Memory Usage of TaskManager JVM. | 
| idleTimeMsPerSecond\$1 | Milliseconds | The time (in milliseconds) this task or operator is idle (has no data to process) per second. Idle time excludes back pressured time, so if the task is back pressured it is not idle. | Task, Operator, Parallelism | \$1Available for Managed Service for Apache Flink applications running Flink version 1.13 only. These metrics can be useful in identifying bottlenecks in an application. | 
| lastCheckpointSize | Bytes | The total size of the last checkpoint | Application | You can use this metric to determine running application storage utilization. If this metric is increasing in value, this may indicate that there is an issue with your application, such as a memory leak or bottleneck. | 
| lastCheckpointDuration | Milliseconds | The time it took to complete the last checkpoint | Application | This metric measures the time it took to complete the most recent checkpoint. If this metric is increasing in value, this may indicate that there is an issue with your application, such as a memory leak or bottleneck. In some cases, you can troubleshoot this issue by disabling checkpointing. | 
| managedMemoryUsed\$1 | Bytes | The amount of managed memory currently used. | Application, Operator, Task, Parallelism | \$1Available for Managed Service for Apache Flink applications running Flink version 1.13 only. This relates to memory managed by Flink outside the Java heap. It is used for the RocksDB state backend, and is also available to applications. | 
| managedMemoryTotal\$1 | Bytes | The total amount of managed memory. | Application, Operator, Task, Parallelism | \$1Available for Managed Service for Apache Flink applications running Flink version 1.13 only. This relates to memory managed by Flink outside the Java heap. It is used for the RocksDB state backend, and is also available to applications. The `ManagedMemoryUtilzations` metric only accounts for specific memory metrics like Managed Memory (memory usage outside JVM for native processes like [RocksDB State Backend](https://flink.apache.org/2021/01/18/rocksdb.html#:~:text=Conclusion-,The%20RocksDB%20state%20backend%20(i.e.%2C%20RocksDBStateBackend)%20is%20one%20of,with%20exactly%2Donce%20processing%20guarantees.)) | 
| managedMemoryUtilization\$1 | Percentage | Derived by managedMemoryUsed/managedMemoryTotal | Application, Operator, Task, Parallelism | \$1Available for Managed Service for Apache Flink applications running Flink version 1.13 only. This relates to memory managed by Flink outside the Java heap. It is used for the RocksDB state backend, and is also available to applications. | 
| numberOfFailedCheckpoints | Count | The number of times checkpointing has failed. | Application | You can use this metric to monitor application health and progress. Checkpoints may fail due to application problems, such as throughput or permissions issues.  | 
| numRecordsIn\$1 | Count | The total number of records this application, operator, or task has received. | Application, Operator, Task, Parallelism | \$1To apply the SUM statistic over a period of time (second/minute): [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/managed-flink/latest/java/metrics-dimensions.html) The metric's Level specifies whether this metric measures the total number of records the entire application, a specific operator, or a specific task has received. | 
| numRecordsInPerSecond\$1 | Count/Second | The total number of records this application, operator or task has received per second. | Application, Operator, Task, Parallelism | \$1To apply the SUM statistic over a period of time (second/minute): [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/managed-flink/latest/java/metrics-dimensions.html) The metric's Level specifies whether this metric measures the total number of records the entire application, a specific operator, or a specific task has received per second. | 
| numRecordsOut\$1 | Count | The total number of records this application, operator or task has emitted. | Application, Operator, Task, Parallelism |  \$1To apply the SUM statistic over a period of time (second/minute): [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/managed-flink/latest/java/metrics-dimensions.html) The metric's Level specifies whether this metric measures the total number of records the entire application, a specific operator, or a specific task has emitted. | 
| numLateRecordsDropped\$1 | Count | Application, Operator, Task, Parallelism |  | \$1To apply the SUM statistic over a period of time (second/minute): [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/managed-flink/latest/java/metrics-dimensions.html) The number of records this operator or task has dropped due to arriving late. | 
| numRecordsOutPerSecond\$1 | Count/Second | The total number of records this application, operator or task has emitted per second. | Application, Operator, Task, Parallelism |  \$1To apply the SUM statistic over a period of time (second/minute): [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/managed-flink/latest/java/metrics-dimensions.html) The metric's Level specifies whether this metric measures the total number of records the entire application, a specific operator, or a specific task has emitted per second. | 
| oldGenerationGCCount | Count | The total number of old garbage collection operations that have occurred across all task managers.  | Application |  | 
| oldGenerationGCTime | Milliseconds | The total time spent performing old garbage collection operations.  | Application | You can use this metric to monitor sum, average, and maximum garbage collection time. | 
| threadsCount | Count | The total number of live threads used by the application.  | Application | This metric measures the number of threads used by the application code. This is not the same as application parallelism. | 
| cancellingTime | Milliseconds | The time (in milliseconds) that the application has spent in a cancelling state. Use this metric to monitor application cancellation operations. | Application, Flow | Available from Flink 2.2. Replaces part of the deprecated downtime metric. | 
| restartingTime | Milliseconds | The time (in milliseconds) that the application has spent in a restarting state. Use this metric to monitor application restart behavior. | Application, Flow | Available from Flink 2.2. Replaces part of the deprecated downtime metric. | 
| runningTime | Milliseconds | The time (in milliseconds) that the application has been running without interruption. Replaces the deprecated uptime metric. | Application, Flow | Available from Flink 2.2. Use as a direct replacement for the deprecated uptime metric. | 
| uptime [DEPRECATED] | Milliseconds | The time that the job has been running without interruption. | Application | You can use this metric to determine if a job is running successfully. This metric returns -1 for completed jobs. **Deprecated in Flink 2.2.** Use `runningTime` instead. | 
| jobmanagerFileDescriptorsMax | Count | The maximum number of file descriptors available to the JobManager. | Application, Flow, Host | Use this metric to monitor file descriptor capacity. | 
| jobmanagerFileDescriptorsOpen | Count | The current number of open file descriptors for the JobManager. | Application, Flow, Host | Use this metric to monitor file descriptor usage and detect potential resource exhaustion. | 
| taskmanagerFileDescriptorsMax | Count | The maximum number of file descriptors available to each TaskManager. | Application, Flow, Host, tm\$1id | Use this metric to monitor file descriptor capacity. | 
| taskmanagerFileDescriptorsOpen | Count | The current number of open file descriptors for each TaskManager. | Application, Flow, Host, tm\$1id | Use this metric to monitor file descriptor usage and detect potential resource exhaustion. | 
| KPUs\$1 | Count | The total number of KPUs used by the application. | Application | \$1This metric receives one sample per billing period (one hour). To visualize the number of KPUs over time, use MAX or AVG over a period of at least one (1) hour. The KPU count includes the `orchestration` KPU. For more information, see [Managed Service for Apache Flink Pricing](https://aws.amazon.com/managed-service-apache-flink/pricing/). | 

**Flink 2.2 metric migration guidance**  
**Migration from fullRestarts:** The `fullRestarts` metric has been removed in Flink 2.2. Use the `numRestarts` metric instead. The `numRestarts` metric provides equivalent functionality and can be used as a direct replacement in CloudWatch alarms without requiring threshold adjustments.  
**Migration from uptime:** The `uptime` metric is deprecated in Flink 2.2 and will be removed in a future release. Use the `runningTime` metric instead. The `runningTime` metric provides equivalent functionality and can be used as a direct replacement in CloudWatch alarms without requiring threshold adjustments.  
**Migration from downtime:** The `downtime` metric is deprecated in Flink 2.2 and will be removed in a future release. Depending on what you want to monitor, use one or more of the following metrics:  
`restartingTime`: Monitor time spent restarting the application
`cancellingTime`: Monitor time spent cancelling the application
`failingTime`: Monitor time spent in a failing state

## Kinesis Data Streams connector metrics
<a name="metrics-dimensions-stream"></a>

AWS emits all records for Kinesis Data Streams in addition to the following:


| Metric | Unit | Description | Level | Usage Notes | 
| --- | --- | --- | --- | --- | 
| millisbehindLatest | Milliseconds | The number of milliseconds the consumer is behind the head of the stream, indicating how far behind current time the consumer is. | Application (for Stream), Parallelism (for ShardId) | [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/managed-flink/latest/java/metrics-dimensions.html)  | 

**Note**  
The `bytesRequestedPerFetch` metric has been removed in Flink AWS connector version 6.0.0 (the only connector version compatible with Flink 2.2). The only Kinesis Data Streams connector metric available in Flink 2.2 is `millisBehindLatest`.

## Amazon MSK connector metrics
<a name="metrics-dimensions-msk"></a>

AWS emits all records for Amazon MSK in addition to the following:


| Metric | Unit | Description | Level | Usage Notes | 
| --- | --- | --- | --- | --- | 
| currentoffsets | N/A | The consumer's current read offset, for each partition. A particular partition's metric can be specified by topic name and partition id. | Application (for Topic), Parallelism (for PartitionId) |  | 
| commitsFailed | N/A | The total number of offset commit failures to Kafka, if offset committing and checkpointing are enabled.  | Application, Operator, Task, Parallelism | Committing offsets back to Kafka is only a means to expose consumer progress, so a commit failure does not affect the integrity of Flink's checkpointed partition offsets. | 
| commitsSucceeded | N/A | The total number of successful offset commits to Kafka, if offset committing and checkpointing are enabled.  | Application, Operator, Task, Parallelism |  | 
| committedoffsets | N/A | The last successfully committed offsets to Kafka, for each partition. A particular partition's metric can be specified by topic name and partition id. | Application (for Topic), Parallelism (for PartitionId) |  | 
| records\$1lag\$1max | Count | The maximum lag in terms of number of records for any partition in this window | Application, Operator, Task, Parallelism |  | 
| bytes\$1consumed\$1rate | Bytes | The average number of bytes consumed per second for a topic | Application, Operator, Task, Parallelism |  | 

## Apache Zeppelin metrics
<a name="metrics-dimensions-zeppelin"></a>

For Studio notebooks, AWS emits the following metrics at the application level: `KPUs`, `cpuUtilization`, `heapMemoryUtilization`, `oldGenerationGCTime`, `oldGenerationGCCount`, and `threadCount`. In addition, it emits the metrics shown in the following table, also at the application level.


****  

| Metric | Unit | Description | Prometheus name | 
| --- | --- | --- | --- | 
| zeppelinCpuUtilization | Percentage | Overall percentage of CPU utilization in the Apache Zeppelin server. | process\$1cpu\$1usage | 
| zeppelinHeapMemoryUtilization | Percentage | Overall percentage of heap memory utilization for the Apache Zeppelin server. | jvm\$1memory\$1used\$1bytes | 
| zeppelinThreadCount | Count | The total number of live threads used by the Apache Zeppelin server. | jvm\$1threads\$1live\$1threads | 
| zeppelinWaitingJobs | Count | The number of queued Apache Zeppelin jobs waiting for a thread. | jetty\$1threads\$1jobs | 
| zeppelinServerUptime | Seconds | The total time that the server has been up and running. | process\$1uptime\$1seconds | 

# View CloudWatch metrics
<a name="metrics-dimensions-viewing"></a>

You can view CloudWatch metrics for your application using the Amazon CloudWatch console or the AWS CLI.

**To view metrics using the CloudWatch console**

1. Open the CloudWatch console at [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. In the navigation pane, choose **Metrics**.

1. In the **CloudWatch Metrics by Category** pane for Managed Service for Apache Flink, choose a metrics category.

1. In the upper pane, scroll to view the full list of metrics.

**To view metrics using the AWS CLI**
+ At a command prompt, use the following command.

  ```
  1. aws cloudwatch list-metrics --namespace "AWS/KinesisAnalytics" --region region
  ```

# Set CloudWatch metrics reporting levels
<a name="cloudwatch-logs-levels"></a>

You can control the level of application metrics that your application creates. Managed Service for Apache Flink supports the following metrics levels:
+ **Application:** The application only reports the highest level of metrics for each application. Managed Service for Apache Flink metrics are published at the Application level by default.
+ **Task:** The application reports task-specific metric dimensions for metrics defined with the Task metric reporting level, such as number of records in and out of the application per second.
+ **Operator:** The application reports operator-specific metric dimensions for metrics defined with the Operator metric reporting level, such as metrics for each filter or map operation.
+ **Parallelism:** The application reports `Task` and `Operator` level metrics for each execution thread. This reporting level is not recommended for applications with a Parallelism setting above 64 due to excessive costs. 
**Note**  
You should only use this metric level for troubleshooting because of the amount of metric data that the service generates. You can only set this metric level using the CLI. This metric level is not available in the console.

The default level is **Application**. The application reports metrics at the current level and all higher levels. For example, if the reporting level is set to **Operator**, the application reports **Application**, **Task**, and **Operator** metrics.

You set the CloudWatch metrics reporting level using the `MonitoringConfiguration` parameter of the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) action, or the `MonitoringConfigurationUpdate` parameter of the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) action. The following example request for the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) action sets the CloudWatch metrics reporting level to **Task**:

```
{
   "ApplicationName": "MyApplication",  
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "MonitoringConfigurationUpdate": { 
            "ConfigurationTypeUpdate": "CUSTOM",
            "MetricsLevelUpdate": "TASK"
         }
      }
   }
}
```

You can also configure the logging level using the `LogLevel` parameter of the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) action or the `LogLevelUpdate` parameter of the [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) action. You can use the following log levels:
+ `ERROR`: Logs potentially recoverable error events.
+ `WARN`: Logs warning events that might lead to an error.
+ `INFO`: Logs informational events.
+ `DEBUG`: Logs general debugging events. 

For more information about Log4j logging levels, see [Custom Log Levels](https://logging.apache.org/log4j/2.x/manual/customloglevels.html) in the [Apache Log4j](https://logging.apache.org/log4j/2.x/) documentation.

# Use custom metrics with Amazon Managed Service for Apache Flink
<a name="monitoring-metrics-custom"></a>

Managed Service for Apache Flink exposes 19 metrics to CloudWatch, including metrics for resource usage and throughput. In addition, you can create your own metrics to track application-specific data, such as processing events or accessing external resources.

**Topics**
+ [

## How it works
](#monitoring-metrics-custom-howitworks)
+ [

## View examples for creating a mapping class
](#monitoring-metrics-custom-examples)
+ [

## View custom metrics
](#monitoring-metrics-custom-examples-viewing)

## How it works
<a name="monitoring-metrics-custom-howitworks"></a>

Custom metrics in Managed Service for Apache Flink use the Apache Flink metric system. Apache Flink metrics have the following attributes:
+ **Type:** A metric's type describes how it measures and reports data. Available Apache Flink metric types include Count, Gauge, Histogram, and Meter. For more information about Apache Flink metric types, see [Metric Types](https://nightlies.apache.org/flink/flink-docs-release-1.15/monitoring/metrics.html#metric-types).
**Note**  
AWS CloudWatch Metrics does not support the Histogram Apache Flink metric type. CloudWatch can only display Apache Flink metrics of the Count, Gauge, and Meter types.
+ **Scope:** A metric's scope consists of its identifier and a set of key-value pairs that indicate how the metric will be reported to CloudWatch. A metric's identifier consists of the following:
  + A system scope, which indicates the level at which the metric is reported (e.g. Operator).
  + A user scope, that defines attributes such as user variables or the metric group names. These attributes are defined using [https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/metrics/MetricGroup.html#addGroup-java.lang.String-java.lang.String-](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/metrics/MetricGroup.html#addGroup-java.lang.String-java.lang.String-) or [https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/metrics/MetricGroup.html#addGroup-java.lang.String-](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/metrics/MetricGroup.html#addGroup-java.lang.String-).

  For more information about metric scope, see [Scope](https://nightlies.apache.org/flink/flink-docs-release-1.15/monitoring/metrics.html#scope).

For more information about Apache Flink metrics, see [Metrics](https://nightlies.apache.org/flink/flink-docs-release-1.15/monitoring/metrics.html) in the [Apache Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

To create a custom metric in your Managed Service for Apache Flink, you can access the Apache Flink metric system from any user function that extends `RichFunction` by calling [https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getMetricGroup--](https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getMetricGroup--). This method returns a [MetricGroup](https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/metrics/MetricGroup.html) object you can use to create and register custom metrics. Managed Service for Apache Flink reports all metrics created with the group key `KinesisAnalytics` to CloudWatch. Custom metrics that you define have the following characteristics:
+ Your custom metric has a metric name and a group name. These names must consist of alphanumeric characters according to [Prometheus naming rules](https://prometheus.io/docs/instrumenting/writing_exporters/#naming).
+ Attributes that you define in user scope (except for the `KinesisAnalytics` metric group) are published as CloudWatch dimensions.
+ Custom metrics are published at the `Application` level by default.
+ Dimensions (Task/ Operator/ Parallelism) are added to the metric based on the application's monitoring level. You set the application's monitoring level using the [MonitoringConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_MonitoringConfiguration.html) parameter of the [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) action, or the or [MonitoringConfigurationUpdate](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_MonitoringConfigurationUpdate.html) parameter of the [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) action.

## View examples for creating a mapping class
<a name="monitoring-metrics-custom-examples"></a>

The following code examples demonstrate how to create a mapping class that creates and increments a custom metric, and how to implement the mapping class in your application by adding it to a `DataStream` object.

### Record count custom metric
<a name="monitoring-metrics-custom-examples-recordcount"></a>

The following code example demonstrates how to create a mapping class that creates a metric that counts records in a data stream (the same functionality as the `numRecordsIn` metric):

```
    private static class NoOpMapperFunction extends RichMapFunction<String, String> {
        private transient int valueToExpose = 0;
        private final String customMetricName;
 
        public NoOpMapperFunction(final String customMetricName) {
            this.customMetricName = customMetricName;
        }
 
        @Override
        public void open(Configuration config) {
            getRuntimeContext().getMetricGroup()
                    .addGroup("KinesisAnalytics")
                    .addGroup("Program", "RecordCountApplication")
                    .addGroup("NoOpMapperFunction")
                    .gauge(customMetricName, (Gauge<Integer>) () -> valueToExpose);
        }
 
        @Override
        public String map(String value) throws Exception {
            valueToExpose++;
            return value;
        }
    }
```

In the preceding example, the `valueToExpose` variable is incremented for each record that the application processes. 

After defining your mapping class, you then create an in-application stream that implements the map:

```
DataStream<String> noopMapperFunctionAfterFilter =
    kinesisProcessed.map(new NoOpMapperFunction("FilteredRecords"));
```

For the complete code for this application, see [Record Count Custom Metric Application](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/CustomMetrics/RecordCount).

### Word count custom metric
<a name="monitoring-metrics-custom-examples-wordcount"></a>

The following code example demonstrates how to create a mapping class that creates a metric that counts words in a data stream:

```
private static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
     
            private transient Counter counter;
     
            @Override
            public void open(Configuration config) {
                this.counter = getRuntimeContext().getMetricGroup()
                        .addGroup("KinesisAnalytics")
                        .addGroup("Service", "WordCountApplication")
                        .addGroup("Tokenizer")
                        .counter("TotalWords");
            }
     
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>>out) {
                // normalize and split the line
                String[] tokens = value.toLowerCase().split("\\W+");
     
                // emit the pairs
                for (String token : tokens) {
                    if (token.length() > 0) {
                        counter.inc();
                        out.collect(new Tuple2<>(token, 1));
                    }
                }
            }
        }
```

In the preceding example, the `counter` variable is incremented for each word that the application processes. 

After defining your mapping class, you then create an in-application stream that implements the map:

```
// Split up the lines in pairs (2-tuples) containing: (word,1), and
// group by the tuple field "0" and sum up tuple field "1"
DataStream<Tuple2<String, Integer>> wordCountStream = input.flatMap(new Tokenizer()).keyBy(0).sum(1);
     
// Serialize the tuple to string format, and publish the output to kinesis sink
wordCountStream.map(tuple -> tuple.toString()).addSink(createSinkFromStaticConfig());
```

For the complete code for this application, see [Word Count Custom Metric Application](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/CustomMetrics/WordCount).

## View custom metrics
<a name="monitoring-metrics-custom-examples-viewing"></a>

Custom metrics for your application appear in the CloudWatch Metrics console in the **AWS/KinesisAnalytics** dashboard, under the **Application** metric group. 

# Use CloudWatch Alarms with Amazon Managed Service for Apache Flink
<a name="monitoring-metrics-alarms"></a>

Using Amazon CloudWatch metric alarms, you watch a CloudWatch metric over a time period that you specify. The alarm performs one or more actions based on the value of the metric or expression relative to a threshold over a number of time periods. An example of an action is sending a notification to an Amazon Simple Notification Service (Amazon SNS) topic. 

For more information about CloudWatch alarms, see [Using Amazon CloudWatch Alarms](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html).

## Review recommended alarms
<a name="monitoring-metrics-alarms-recommended"></a>

This section contains the recommended alarms for monitoring Managed Service for Apache Flink applications.

The table describes the recommended alarms and has the following columns:
+ **Metric Expression:** The metric or metric expression to test against the threshold.
+ **Statistic:** The statistic used to check the metric—for example, **Average**.
+ **Threshold:** Using this alarm requires you to determine a threshold that defines the limit of expected application performance. You need to determine this threshold by monitoring your application under normal conditions.
+ **Description:** Causes that might trigger this alarm, and possible solutions for the condition.


| Metric Expression | Statistic | Threshold | Description | 
| --- |--- |--- |--- |
| downtime > 0 | Average | 0 |  A downtime greater than zero indicates that the application has failed. If the value is larger than 0, the application is not processing any data. Recommended for all applications. The Downtime metric measures the duration of an outage. A downtime greater than zero indicates that the application has failed. For troubleshooting, see [Application is restarting](troubleshooting-rt-restarts.md). | 
| RATE (numberOfFailedCheckpoints) > 0 | Average | 0 | This metric counts the number of failed checkpoints since the application started. Depending on the application, it can be tolerable if checkpoints fail occasionally. But if checkpoints are regularly failing, the application is likely unhealthy and needs further attention. We recommend monitoring RATE(numberOfFailedCheckpoints) to alarm on the gradient and not on absolute values. Recommended for all applications. Use this metric to monitor application health and checkpointing progress. The application saves state data to checkpoints when it's healthy. Checkpointing can fail due to timeouts if the application isn't making progress in processing the input data. For troubleshooting, see [Checkpointing is timing out](troubleshooting-chk-timeout.md). | 
| Operator.numRecordsOutPerSecond < threshold | Average | The minimum number of records emitted from the application during normal conditions.  | Recommended for all applications. Falling below this threshold can indicate that the application isn't making expected progress on the input data. For troubleshooting, see [Throughput is too slow](troubleshooting-rt-throughput.md). | 
| records\$1lag\$1max\$1millisbehindLatest > threshold | Maximum | The maximum expected latency during normal conditions. | If the application is consuming from Kinesis or Kafka, these metrics indicate if the application is falling behind and needs to be scaled in order to keep up with the current load. This is a good generic metric that is easy to track for all kinds of applications. But it can only be used for reactive scaling, i.e., when the application has already fallen behind. Recommended for all applications. Use the records\$1lag\$1max metric for a Kafka source, or the millisbehindLatest for a Kinesis stream source. Rising above this threshold can indicate that the application isn't making expected progress on the input data. For troubleshooting, see [Throughput is too slow](troubleshooting-rt-throughput.md). | 
| lastCheckpointDuration > threshold | Maximum | The maximum expected checkpoint duration during normal conditions. | Monitors how much data is stored in state and how long it takes to take a checkpoint. If checkpoints grow or take long, the application is continuously spending time on checkpointing and has less cycles for actual processing. At some points, checkpoints may grow too large or take so long that they fail. In addition to monitoring absolute values, customers should also considering monitoring the change rate with RATE(lastCheckpointSize) and RATE(lastCheckpointDuration). If the lastCheckpointDuration continuously increases, rising above this threshold can indicate that the application isn't making expected progress on the input data, or that there are problems with application health such as backpressure. For troubleshooting, see [Unbounded state growth](troubleshooting-rt-stateleaks.md). | 
| lastCheckpointSize > threshold | Maximum | The maximum expected checkpoint size during normal conditions. | Monitors how much data is stored in state and how long it takes to take a checkpoint. If checkpoints grow or take long, the application is continuously spending time on checkpointing and has less cycles for actual processing. At some points, checkpoints may grow too large or take so long that they fail. In addition to monitoring absolute values, customers should also considering monitoring the change rate with RATE(lastCheckpointSize) and RATE(lastCheckpointDuration). If the lastCheckpointSize continuously increases, rising above this threshold can indicate that the application is accumulating state data. If the state data becomes too large, the application can run out of memory when recovering from a checkpoint, or recovering from a checkpoint might take too long. For troubleshooting, see [Unbounded state growth](troubleshooting-rt-stateleaks.md). | 
| heapMemoryUtilization > threshold | Maximum | This gives a good indication of the overall resource utilization of the application and can be used for proactive scaling unless the application is I/O bound. The maximum expected heapMemoryUtilization size during normal conditions, with a recommended value of 90 percent. | You can use this metric to monitor the maximum memory utilization of task managers across the application. If the application reaches this threshold, you need to provision more resources. You do this by enabling automatic scaling or increasing the application parallelism. For more information about increasing resources, see [Implement application scaling](how-scaling.md). | 
| cpuUtilization > threshold | Maximum | This gives a good indication of the overall resource utilization of the application and can be used for proactive scaling unless the application is I/O bound. The maximum expected cpuUtilization size during normal conditions, with a recommended value of 80 percent. | You can use this metric to monitor the maximum CPU utilization of task managers across the application. If the application reaches this threshold, you need to provision more resources You do this by enabling automatic scaling or increasing the application parallelism. For more information about increasing resources, see [Implement application scaling](how-scaling.md). | 
| threadsCount > threshold | Maximum | The maximum expected threadsCount size during normal conditions. | You can use this metric to watch for thread leaks in task managers across the application. If this metric reaches this threshold, check your application code for threads being created without being closed. | 
| (oldGarbageCollectionTime \$1 100)/60\$1000 over 1 min period') > threshold | Maximum | The maximum expected oldGarbageCollectionTime duration. We recommend setting a threshold such that typical garbage collection time is 60 percent of the specified threshold, but the correct threshold for your application will vary. | If this metric is continually increasing, this can indicate that there is a memory leak in task managers across the application. | 
| RATE(oldGarbageCollectionCount)  > threshold | Maximum | The maximum expected oldGarbageCollectionCount under normal conditions. The correct threshold for your application will vary. | If this metric is continually increasing, this can indicate that there is a memory leak in task managers across the application. | 
| Operator.currentOutputWatermark - Operator.currentInputWatermark  > threshold | Minimum | The minimum expected watermark increment under normal conditions. The correct threshold for your application will vary. | If this metric is continually increasing, this can indicate that either the application is processing increasingly older events, or that an upstream subtask has not sent a watermark in an increasingly long time. | 

# Write custom messages to CloudWatch Logs
<a name="cloudwatch-logs-writing"></a>

You can write custom messages to your Managed Service for Apache Flink application's CloudWatch log. You do this by using the Apache [https://logging.apache.org/log4j/](https://logging.apache.org/log4j/) library or the [https://www.slf4j.org/](https://www.slf4j.org/) library.

**Topics**
+ [

## Write to CloudWatch logs using Log4J
](#cloudwatch-logs-writing-log4j)
+ [

## Write to CloudWatch logs using SLF4J
](#cloudwatch-logs-writing-slf4j)

## Write to CloudWatch logs using Log4J
<a name="cloudwatch-logs-writing-log4j"></a>

1. Add the following dependencies to your application's `pom.xml` file:

   ```
   <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
       <version>2.6.1</version>
   </dependency>
   <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-core</artifactId>
       <version>2.6.1</version>
   </dependency>
   ```

1. Include the object from the library:

   ```
   import org.apache.logging.log4j.Logger;
   ```

1. Instantiate the `Logger` object, passing in your application class:

   ```
   private static final Logger log = LogManager.getLogger.getLogger(YourApplicationClass.class);
   ```

1. Write to the log using `log.info`. A large number of messages are written to the application log. To make your custom messages easier to filter, use the `INFO` application log level.

   ```
   log.info("This message will be written to the application's CloudWatch log");
   ```

The application writes a record to the log with a message similar to the following:

```
{
  "locationInformation": "com.amazonaws.services.managed-flink.StreamingJob.main(StreamingJob.java:95)", 
  "logger": "com.amazonaws.services.managed-flink.StreamingJob", 
  "message": "This message will be written to the application's CloudWatch log", 
  "threadName": "Flink-DispatcherRestEndpoint-thread-2", 
  "applicationARN": "arn:aws:kinesisanalyticsus-east-1:123456789012:application/test", 
  "applicationVersionId": "1", "messageSchemaVersion": "1", 
  "messageType": "INFO" 
}
```

## Write to CloudWatch logs using SLF4J
<a name="cloudwatch-logs-writing-slf4j"></a>

1. Add the following dependency to your application's `pom.xml` file:

   ```
   <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
       <version>1.7.7</version>
       <scope>runtime</scope>
   </dependency>
   ```

1. Include the objects from the library:

   ```
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   ```

1. Instantiate the `Logger` object, passing in your application class:

   ```
   private static final Logger log = LoggerFactory.getLogger(YourApplicationClass.class);
   ```

1. Write to the log using `log.info`. A large number of messages are written to the application log. To make your custom messages easier to filter, use the `INFO` application log level.

   ```
   log.info("This message will be written to the application's CloudWatch log");
   ```

The application writes a record to the log with a message similar to the following:

```
{
  "locationInformation": "com.amazonaws.services.managed-flink.StreamingJob.main(StreamingJob.java:95)", 
  "logger": "com.amazonaws.services.managed-flink.StreamingJob", 
  "message": "This message will be written to the application's CloudWatch log", 
  "threadName": "Flink-DispatcherRestEndpoint-thread-2", 
  "applicationARN": "arn:aws:kinesisanalyticsus-east-1:123456789012:application/test", 
  "applicationVersionId": "1", "messageSchemaVersion": "1", 
  "messageType": "INFO" 
}
```

# Log Managed Service for Apache Flink API calls with AWS CloudTrail
<a name="logging-using-cloudtrail"></a>

Managed Service for Apache Flink is integrated with AWS CloudTrail, a service that provides a record of actions taken by a user, role, or an AWS service in Managed Service for Apache Flink. CloudTrail captures all API calls for Managed Service for Apache Flink as events. The calls captured include calls from the Managed Service for Apache Flink console and code calls to the Managed Service for Apache Flink API operations. If you create a trail, you can enable continuous delivery of CloudTrail events to an Amazon S3 bucket, including events for Managed Service for Apache Flink. If you don't configure a trail, you can still view the most recent events in the CloudTrail console in **Event history**. Using the information collected by CloudTrail, you can determine the request that was made to Managed Service for Apache Flink, the IP address from which the request was made, who made the request, when it was made, and additional details. 

To learn more about CloudTrail, see the [AWS CloudTrail User Guide](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/).

## Managed Service for Apache Flink information in CloudTrail
<a name="service-name-info-in-cloudtrail"></a>

CloudTrail is enabled on your AWS account when you create the account. When activity occurs in Managed Service for Apache Flink, that activity is recorded in a CloudTrail event along with other AWS service events in **Event history**. You can view, search, and download recent events in your AWS account. For more information, see [Viewing Events with CloudTrail Event History](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/view-cloudtrail-events.html). 

For an ongoing record of events in your AWS account, including events for Managed Service for Apache Flink, create a trail. A *trail* enables CloudTrail to deliver log files to an Amazon S3 bucket. By default, when you create a trail in the console, the trail applies to all AWS Regions. The trail logs events from all Regions in the AWS partition and delivers the log files to the Amazon S3 bucket that you specify. Additionally, you can configure other AWS services to further analyze and act upon the event data collected in CloudTrail logs. For more information, see the following: 
+ [Overview for Creating a Trail](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-create-and-update-a-trail.html)
+ [CloudTrail Supported Services and Integrations](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-aws-service-specific-topics.html#cloudtrail-aws-service-specific-topics-integrations)
+ [Configuring Amazon SNS Notifications for CloudTrail](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/getting_notifications_top_level.html)
+ [Receiving CloudTrail Log Files from Multiple Regions](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/receive-cloudtrail-log-files-from-multiple-regions.html) and [Receiving CloudTrail Log Files from Multiple Accounts](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-receive-logs-from-multiple-accounts.html)

All Managed Service for Apache Flink actions are logged by CloudTrail and are documented in the [Managed Service for Apache Flink API reference](/managed-flink/latest/apiv2/Welcome.html). For example, calls to the `[CreateApplication](/managed-flink/latest/apiv2/API_CreateApplication.html)` and ` [UpdateApplication](/managed-flink/latest/apiv2/API_UpdateApplication.html)` actions generate entries in the CloudTrail log files. 

Every event or log entry contains information about who generated the request. The identity information helps you determine the following: 
+ Whether the request was made with root or AWS Identity and Access Management (IAM) user credentials.
+ Whether the request was made with temporary security credentials for a role or federated user.
+ Whether the request was made by another AWS service.

For more information, see the [CloudTrail userIdentity Element](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-event-reference-user-identity.html).

## Understand Managed Service for Apache Flink log file entries
<a name="understanding-service-name-entries"></a>

A trail is a configuration that enables delivery of events as log files to an Amazon S3 bucket that you specify. CloudTrail log files contain one or more log entries. An event represents a single request from any source and includes information about the requested action, the date and time of the action, request parameters, and so on. CloudTrail log files aren't an ordered stack trace of the public API calls, so they don't appear in any specific order. 

The following example shows a CloudTrail log entry that demonstrates the [AddApplicationCloudWatchLoggingOption](/managed-flink/latest/apiv2/API_AddApplicationCloudWatchLoggingOption.html) and [DescribeApplication](/managed-flink/latest/apiv2/API_DescribeApplication.html) actions.

```
{
    "Records": [
        {
            "eventVersion": "1.05",
            "userIdentity": {
                "type": "IAMUser",
                "principalId": "EX_PRINCIPAL_ID",
                "arn": "arn:aws:iam::012345678910:user/Alice",
                "accountId": "012345678910",
                "accessKeyId": "EXAMPLE_KEY_ID",
                "userName": "Alice"
            },
            "eventTime": "2019-03-07T01:19:47Z",
            "eventSource": "kinesisanlaytics.amazonaws.com",
            "eventName": "AddApplicationCloudWatchLoggingOption",
            "awsRegion": "us-east-1",
            "sourceIPAddress": "127.0.0.1",
            "userAgent": "aws-sdk-java/unknown-version Linux/x.xx",
            "requestParameters": {
                "applicationName": "cloudtrail-test",
                "currentApplicationVersionId": 1,
                "cloudWatchLoggingOption": {
                    "logStreamARN": "arn:aws:logs:us-east-1:012345678910:log-group:cloudtrail-test:log-stream:flink-cloudwatch"
                }
            },
            "responseElements": {
                "cloudWatchLoggingOptionDescriptions": [
                    {
                        "cloudWatchLoggingOptionId": "2.1",
                        "logStreamARN": "arn:aws:logs:us-east-1:012345678910:log-group:cloudtrail-test:log-stream:flink-cloudwatch"
                    }
                ],
                "applicationVersionId": 2,
                "applicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678910:application/cloudtrail-test"
            },
            "requestID": "18dfb315-4077-11e9-afd3-67f7af21e34f",
            "eventID": "d3c9e467-db1d-4cab-a628-c21258385124",
            "eventType": "AwsApiCall",
            "apiVersion": "2018-05-23",
            "recipientAccountId": "012345678910"
        },
        {
            "eventVersion": "1.05",
            "userIdentity": {
                "type": "IAMUser",
                "principalId": "EX_PRINCIPAL_ID",
                "arn": "arn:aws:iam::012345678910:user/Alice",
                "accountId": "012345678910",
                "accessKeyId": "EXAMPLE_KEY_ID",
                "userName": "Alice"
            },
            "eventTime": "2019-03-12T02:40:48Z",
            "eventSource": "kinesisanlaytics.amazonaws.com",
            "eventName": "DescribeApplication",
            "awsRegion": "us-east-1",
            "sourceIPAddress": "127.0.0.1",
            "userAgent": "aws-sdk-java/unknown-version Linux/x.xx",
            "requestParameters": {
                "applicationName": "sample-app"
            },
            "responseElements": null,
            "requestID": "3e82dc3e-4470-11e9-9d01-e789c4e9a3ca",
            "eventID": "90ffe8e4-9e47-48c9-84e1-4f2d427d98a5",
            "eventType": "AwsApiCall",
            "apiVersion": "2018-05-23",
            "recipientAccountId": "012345678910"
        }
    ]
}
```