

# Use a Studio notebook with Managed Service for Apache Flink
<a name="how-notebook"></a>

Studio notebooks for Managed Service for Apache Flink allows you to interactively query data streams in real time, and easily build and run stream processing applications using standard SQL, Python, and Scala. With a few clicks in the AWS Management console, you can launch a serverless notebook to query data streams and get results in seconds. 

A notebook is a web-based development environment. With notebooks, you get a simple interactive development experience combined with the advanced capabilities provided by Apache Flink. Studio notebooks uses notebooks powered by [Apache Zeppelin](https://zeppelin.apache.org/), and uses [Apache Flink](https://flink.apache.org/) as the stream processing engine. Studio notebooks seamlessly combines these technologies to make advanced analytics on data streams accessible to developers of all skill sets. 

Apache Zeppelin provides your Studio notebooks with a complete suite of analytics tools, including the following:
+ Data Visualization
+ Exporting data to files
+ Controlling the output format for easier analysis

To get started using Managed Service for Apache Flink and Apache Zeppelin, see [Tutorial: Create a Studio notebook in Managed Service for Apache Flink](example-notebook.md). For more information about Apache Zeppelin, see the [Apache Zeppelin documentation](http://zeppelin.apache.org).

 With a notebook, you model queries using the Apache Flink [ Table API & SQL](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/) in SQL, Python, or Scala, or [DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) in Scala. With a few clicks, you can then promote the Studio notebook to a continuously-running, non-interactive, Managed Service for Apache Flink stream-processing application for your production workloads.

**Topics**
+ [Use the correct Studio notebook Runtime version](studio-notebook-versions.md)
+ [Create a Studio notebook](how-zeppelin-creating.md)
+ [

# Perform an interactive analysis of streaming data
](how-zeppelin-interactive.md)
+ [

# Deploy as an application with durable state
](how-notebook-durable.md)
+ [IAM permissions](how-zeppelin-iam.md)
+ [Use connectors and dependencies](how-zeppelin-connectors.md)
+ [User-defined functions](how-zeppelin-udf.md)
+ [

# Enable checkpointing
](how-zeppelin-checkpoint.md)
+ [

# Upgrade Studio Runtime
](upgrading-studio-runtime.md)
+ [

# Work with AWS Glue
](how-zeppelin-glue.md)
+ [Examples and tutorials for Studio notebooks in Managed Service for Apache Flink](how-zeppelin-examples.md)
+ [

# Troubleshoot Studio notebooks for Managed Service for Apache Flink
](how-zeppelin-troubleshooting.md)
+ [

# Create custom IAM policies for Managed Service for Apache Flink Studio notebooks
](how-zeppelin-appendix-iam.md)

# Use the correct Studio notebook Runtime version
<a name="studio-notebook-versions"></a>

With Amazon Managed Service for Apache Flink Studio, you can query data streams in real time and build and run stream processing applications using standard SQL, Python, and Scala in an interactive notebook. Studio notebooks are powered by [Apache Zeppelin](https://zeppelin.apache.org/) and use [Apache Flink](https://flink.apache.org/) as the stream processing engine. 

**Note**  
We will deprecate Studio Runtime with **Apache Flink version 1.11 on November 5, 2024**. Starting from this date, you will not be able to run new notebooks or create new applications using this version. We recommend that you upgrade to the latest runtime (Apache Flink 1.15 and Apache Zeppelin 0.10) before that time. For guidance on how to upgrade your notebook, see [Upgrade Studio Runtime](upgrading-studio-runtime.md).


**Studio Runtime**  

| Apache Flink version | Apache Zeppelin version | Python version |  | 
| --- | --- | --- | --- | 
| 1.15 | 0.1 | 3.8 | Recommended | 
| 1.13 | 0.9 | 3.8 | Supported until October 16, 2024 | 
| 1.11 | 0.9 | 3.7 | Deprecating on February 24, 2025 | 

# Create a Studio notebook
<a name="how-zeppelin-creating"></a>

A Studio notebook contains queries or programs written in SQL, Python, or Scala that runs on streaming data and returns analytic results. You create your application using either the console or the CLI, and provide queries for analyzing the data from your data source.

Your application has the following components:
+ A data source, such as an Amazon MSK cluster, a Kinesis data stream, or an Amazon S3 bucket.
+ An AWS Glue database. This database contains tables, which store your data source and destination schemas and endpoints. For more information, see [Work with AWS Glue](how-zeppelin-glue.md).
+ Your application code. Your code implements your analytics query or program.
+ Your application settings and runtime properties. For information about application settings and runtime properties, see the following topics in the [Developer Guide for Apache Flink Applications](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html):
  + **Application Parallelism and Scaling: ** You use your application's Parallelism setting to control the number of queries that your application can execute simultaneously. Your queries can also take advantage of increased parallelism if they have multiple paths of execution, such as in the following circumstances:
    + When processing multiple shards of a Kinesis data stream
    + When partitioning data using the `KeyBy` operator.
    + When using multiple window operators

    For more information about application scaling, see [ Application Scaling in Managed Service for Apache Flink for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html).
  + **Logging and Monitoring: ** For information about application logging and monitoring, see [ Logging and Monitoring in Amazon Managed Service for Apache Flink for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/monitoring-overview.html).
  + Your application uses checkpoints and savepoints for fault tolerance. Checkpoints and savepoints are not enabled by default for Studio notebooks.

You can create your Studio notebook using either the AWS Management Console or the AWS CLI. 

When creating the application from the console, you have the following options:
+ In the Amazon MSK console choose your cluster, then choose **Process data in real time**.
+ In the Kinesis Data Streams console choose your data stream, then on the **Applications** tab choose **Process data in real time**.
+ In the Managed Service for Apache Flink console choose the **Studio** tab, then choose **Create Studio notebook**.

# Perform an interactive analysis of streaming data
<a name="how-zeppelin-interactive"></a>

You use a serverless notebook powered by Apache Zeppelin to interact with your streaming data. Your notebook can have multiple notes, and each note can have one or more paragraphs where you can write your code.

The following example SQL query shows how to retrieve data from a data source:

```
%flink.ssql(type=update)
select * from stock;
```

For more examples of Flink Streaming SQL queries, see [Examples and tutorials for Studio notebooks in Managed Service for Apache Flink](how-zeppelin-examples.md) following, and [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/overview/) in the Apache Flink documentation.

You can use Flink SQL queries in the Studio notebook to query streaming data. You may also use Python (Table API) and Scala (Table and Datastream APIs) to write programs to query your streaming data interactively. You can view the results of your queries or programs, update them in seconds, and re-run them to view updated results.

## Flink interpreters
<a name="how-zeppelin-interactive-interpreters"></a>

You specify which language Managed Service for Apache Flink uses to run your application by using an *interpreter*. You can use the following interpreters with Managed Service for Apache Flink:


| Name | Class | Description | 
| --- |--- |--- |
| %flink | FlinkInterpreter | Creates ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment and provides a Scala environment | 
| %flink.pyflink | PyFlinkInterpreter | Provides a python environment | 
| %flink.ipyflink | IPyFlinkInterpreter | Provides an ipython environment | 
| %flink.ssql | FlinkStreamSqlInterpreter | Provides a stream sql environment | 
| %flink.bsql | FlinkBatchSqlInterpreter | Provides a batch sql environment | 

For more information about Flink interpreters, see [ Flink interpreter for Apache Zeppelin](https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html).

If you are using `%flink.pyflink` or `%flink.ipyflink` as your interpreters, you will need to use the `ZeppelinContext` to visualize the results within the notebook.

For more PyFlink specific examples, see [Query your data streams interactively using Managed Service for Apache Flink Studio and Python](https://aws.amazon.com/blogs/big-data/query-your-data-streams-interactively-using-kinesis-data-analytics-studio-and-python/).

## Apache Flink table environment variables
<a name="how-zeppelin-interactive-env-vars"></a>

Apache Zeppelin provides access to table environment resources using environment variables. 

You access Scala table environment resources with the following variables:


| Variable | Resource | 
| --- |--- |
| senv | StreamExecutionEnvironment | 
| stenv | StreamTableEnvironment for blink planner | 

You access Python table environment resources with the following variables:


| Variable | Resource | 
| --- |--- |
| s\$1env | StreamExecutionEnvironment | 
| st\$1env | StreamTableEnvironment for blink planner | 

For more information about using table environments, see [ Concepts and Common API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/common/) in the Apache Flink documentation. 

# Deploy as an application with durable state
<a name="how-notebook-durable"></a>

You can build your code and export it to Amazon S3. You can promote the code that you wrote in your note to a continuously running stream processing application. There are two modes of running an Apache Flink application on Managed Service for Apache Flink: With a Studio notebook, you have the ability to develop your code interactively, view results of your code in real time, and visualize it within your note. After you deploy a note to run in streaming mode, Managed Service for Apache Flink creates an application for you that runs continuously, reads data from your sources, writes to your destinations, maintains long-running application state, and autoscales automatically based on the throughput of your source streams. 

**Note**  
The S3 bucket to which you export your application code must be in the same Region as your Studio notebook.

You can only deploy a note from your Studio notebook if it meets the following criteria:
+ Paragraphs must be ordered sequentially. When you deploy your application, all paragraphs within a note will be executed sequentially (left-to-right, top-to-bottom) as they appear in your note. You can check this order by choosing **Run All Paragraphs** in your note.
+ Your code is a combination of Python and SQL or Scala and SQL. We do not support Python and Scala together at this time for deploy-as-application.
+ Your note should have only the following interpreters: `%flink`, `%flink.ssql`, `%flink.pyflink`, `%flink.ipyflink`, `%md`.
+ The use of the [Zeppelin context](https://zeppelin.apache.org/docs/0.9.0/usage/other_features/zeppelin_context.html) object `z` is not supported. Methods that return nothing will do nothing except log a warning. Other methods will raise Python exceptions or fail to compile in Scala.
+ A note must result in a single Apache Flink job. 
+ Notes with [dynamic forms](https://zeppelin.apache.org/docs/0.9.0/usage/dynamic_form/intro.html) are unsupported for deploying as an application.
+ %md ([Markdown](https://zeppelin.apache.org/docs/0.9.0/interpreter/markdown.html)) paragraphs will be skipped in deploying as an application, as these are expected to contain human-readable documentation that is unsuitable for running as part of the resulting application.
+ Paragraphs disabled for running within Zeppelin will be skipped in deploying as an application. Even if a disabled paragraph uses an incompatible interpreter, for example, `%flink.ipyflink` in a note with `%flink` `and %flink.ssql` interpreters, it will be skipped while deploying the note as an application, and will not result in an error.
+ There must be at least one paragraph present with source code (Flink SQL, PyFlink or Flink Scala) that is enabled for running for the application deployment to succeed.
+ Setting parallelism in the interpreter directive within a paragraph (e.g. `%flink.ssql(parallelism=32)`) will be ignored in applications deployed from a note. Instead, you can update the deployed application through the AWS Management Console, AWS Command Line Interface or AWS API to change the Parallelism and/or ParallelismPerKPU settings according to the level of parallelism your application requires, or you can enable autoscaling for your deployed application.
+ If you are deploying as an application with durable state your VPC must have internet access. If your VPC does not have internet access, see [Deploy as an application with durable state in a VPC with no internet access](how-zeppelin-troubleshooting.md#how-zeppelin-troubleshooting-deploying-no-internet). 

## Scala/Python criteria
<a name="how-notebook-durable-scala"></a>
+ In your Scala or Python code, use the [Blink planner](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/#dependency-structure) (`senv`, `stenv` for Scala; `s_env`, `st_env` for Python) and not the older "Flink" planner (`stenv_2` for Scala, `st_env_2` for Python). The Apache Flink project recommends the use of the Blink planner for production use cases, and this is the default planner in Zeppelin and in Flink.
+ Your Python paragraphs must not use [shell invocations/assignments](https://ipython.readthedocs.io/en/stable/interactive/python-ipython-diff.html#shell-assignment) using `!` or [IPython magic commands](https://ipython.readthedocs.io/en/stable/interactive/magics.html) like `%timeit` or `%conda` in notes meant to be deployed as applications.
+ You can't use Scala case classes as parameters of functions passed to higher-order dataflow operators like `map` and `filter`. For information about Scala case classes, see [CASE CLASSES](https://docs.scala-lang.org/overviews/scala-book/case-classes.html) in the Scala documentation.

## SQL criteria
<a name="how-notebook-durable-sql"></a>
+ Simple SELECT statements are not permitted, as there’s nowhere equivalent to a paragraph’s output section where the data can be delivered.
+ In any given paragraph, DDL statements (`USE`, `CREATE`, `ALTER`, `DROP`, `SET`, `RESET`) must precede DML (`INSERT`) statements. This is because DML statements in a paragraph must be submitted together as a single Flink job.
+ There should be at most one paragraph that has DML statements in it. This is because, for the deploy-as-application feature, we only support submitting a single job to Flink.

For more information and an example, see [ Translate, redact and analyze streaming data using SQL functions with Amazon Managed Service for Apache Flink, Amazon Translate, and Amazon Comprehend](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesisanalytics-MyApplicatioamazon-translate-and-amazon-comprehend/).

# Review IAM permissions for Studio notebooks
<a name="how-zeppelin-iam"></a>

Managed Service for Apache Flink creates an IAM role for you when you create a Studio notebook through the AWS Management Console. It also associates with that role a policy that allows the following access:


****  

| Service | Access  | 
| --- | --- | 
| CloudWatch Logs | List | 
| Amazon EC2 | List | 
| AWS Glue | Read, Write | 
| Managed Service for Apache Flink | Read | 
| Managed Service for Apache Flink V2 | Read | 
| Amazon S3 | Read, Write | 

# Use connectors and dependencies
<a name="how-zeppelin-connectors"></a>

Connectors enable you to read and write data across various technologies. Managed Service for Apache Flink bundles three default connectors with your Studio notebook. You can also use custom connectors. For more information about connectors, see [Table & SQL Connectors](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/) in the Apache Flink documentation.

## Default connectors
<a name="zeppelin-default-connectors"></a>

If you use the AWS Management Console to create your Studio notebook, Managed Service for Apache Flink includes the following custom connectors by default: `flink-sql-connector-kinesis`, `flink-connector-kafka_2.12` and `aws-msk-iam-auth`. To create a Studio notebook through the console without these custom connectors, choose the **Create with custom settings** option. Then, when you get to the **Configurations** page, clear the checkboxes next to the two connectors.

If you use the [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) API to create your Studio notebook, the `flink-sql-connector-flink` and `flink-connector-kafka` connectors aren't included by default. To add them, specify them as a `MavenReference` in the `CustomArtifactsConfiguration` data type as shown in the following examples.

The `aws-msk-iam-auth` connector is the connector to use with Amazon MSK that includes the feature to automatically authenticate with IAM. 

**Note**  
The connector versions shown in the following example are the only versions that we support.

```
For the Kinesis connector:

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "org.apache.flink",

      "ArtifactId": "flink-sql-connector-kinesis",
      "Version": "1.15.4"

   }      
}]

For authenticating with AWS MSK through AWS IAM:

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "software.amazon.msk",
      "ArtifactId": "aws-msk-iam-auth",
      "Version": "1.1.6"
   }      
}]
            
For the Apache Kafka connector:  

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "org.apache.flink",

      "ArtifactId": "flink-connector-kafka",
      "Version": "1.15.4"

   }      
}]
```

To add these connectors to an existing notebook, use the [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) API operation and specify them as a `MavenReference` in the `CustomArtifactsConfigurationUpdate` data type.

**Note**  
You can set `failOnError` to true for the `flink-sql-connector-kinesis` connector in the table API.

## Add dependencies and custom connectors
<a name="zeppelin-custom-connectors"></a>

To use the AWS Management Console to add a dependency or a custom connector to your Studio notebook, follow these steps:

1. Upload your custom connector's file to Amazon S3.

1. In the AWS Management Console, choose the **Custom create** option for creating your Studio notebook.

1. Follow the Studio notebook creation workflow until you get to the **Configurations** step.

1. In the **Custom connectors** section, choose **Add custom connector**.

1. Specify the Amazon S3 location of the dependency or the custom connector.

1. Choose **Save changes**.

To add a dependency JAR or a custom connector when you create a new Studio notebook using the [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) API, specify the Amazon S3 location of the dependency JAR or the custom connector in the `CustomArtifactsConfiguration` data type. To add a dependency or a custom connector to an existing Studio notebook, invoke the [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) API operation and specify the Amazon S3 location of the dependency JAR or the custom connector in the `CustomArtifactsConfigurationUpdate` data type.

**Note**  
When you include a dependency or a custom connector, you must also include all its transitive dependencies that aren't bundled within it.

# Implement user-defined functions
<a name="how-zeppelin-udf"></a>

User-defined functions (UDFs) are extension points that allow you to call frequently-used logic or custom logic that can't be expressed otherwise in queries. You can use Python or a JVM language like Java or Scala to implement your UDFs in paragraphs inside your Studio notebook. You can also add to your Studio notebook external JAR files that contain UDFs implemented in a JVM language. 

When implementing JARs that register abstract classes that subclass `UserDefinedFunction` (or your own abstract classes), use provided scope in Apache Maven, `compileOnly` dependency declarations in Gradle, provided scope in SBT, or an equivalent directive in your UDF project build configuration. This allows the UDF source code to compile against the Flink APIs, but the Flink API classes are not themselves included in the build artifacts. Refer to this [pom](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47) from the UDF jar example which adheres to such prerequisite on a Maven project. 

**Note**  
For an example setup, see [Translate, redact and analyze streaming data using SQL functions with Amazon Managed Service for Apache Flink, Amazon Translate, and Amazon Comprehend](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/) on the *AWS Machine Learning Blog*.

To use the console to add UDF JAR files to your Studio notebook, follow these steps:

1. Upload your UDF JAR file to Amazon S3.

1. In the AWS Management Console, choose the **Custom create** option for creating your Studio notebook.

1. Follow the Studio notebook creation workflow until you get to the **Configurations** step.

1. In the **User-defined functions** section, choose **Add user-defined function**.

1. Specify the Amazon S3 location of the JAR file or the ZIP file that has the implementation of your UDF.

1. Choose **Save changes**.

To add a UDF JAR when you create a new Studio notebook using the [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) API, specify the JAR location in the `CustomArtifactConfiguration` data type. To add a UDF JAR to an existing Studio notebook, invoke the [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) API operation and specify the JAR location in the `CustomArtifactsConfigurationUpdate` data type. Alternatively, you can use the AWS Management Console to add UDF JAR files to you Studio notebook.

## Considerations with user-defined functions
<a name="how-zeppelin-udf-considerations"></a>
+ Managed Service for Apache Flink Studio uses the [Apache Zeppelin terminology](https://zeppelin.apache.org/docs/0.9.0/quickstart/explore_ui.html) wherein a notebook is a Zeppelin instance that can contain multiple notes. Each note can then contain multiple paragraphs. With Managed Service for Apache Flink Studio the interpreter process is shared across all the notes in the notebook. So if you perform an explicit function registration using [createTemporarySystemFunction](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableEnvironment.html#createTemporarySystemFunction-java.lang.String-java.lang.Class-) in one note, the same can be referenced as-is in another note of same notebook. 

  The *Deploy as application* operation however works on an *individual* note and not all notes in the notebook. When you perform deploy as application, only active note's contents are used to generate the application. Any explicit function registration performed in other notebooks are not part of the generated application dependencies. Additionally, during Deploy as application option an implicit function registration occurs by converting the main class name of JAR to a lowercase string.

   For example, if `TextAnalyticsUDF` is the main class for UDF JAR, then an implicit registration will result in function name `textanalyticsudf`. So if an explicit function registration in note 1 of Studio occurs like the following, then all other notes in that notebook (say note 2) can refer the function by name `myNewFuncNameForClass` because of the shared interpreter:

  `stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())`

   However during deploy as application operation on note 2, this explicit registration *will not be included* in the dependencies and hence the deployed application will not perform as expected. Because of the implicit registration, by default all references to this function is expected to be with `textanalyticsudf` and not `myNewFuncNameForClass`.

   If there is a need for custom function name registration then note 2 itself is expected to contain another paragraph to perform another explicit registration as follows: 

  ```
  %flink(parallelism=l)
  import com.amazonaws.kinesis.udf.textanalytics.TextAnalyticsUDF 
  # re-register the JAR for UDF with custom name
  stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())
  ```

  ```
  %flink. ssql(type=update, parallelism=1) 
  INSERT INTO
      table2
  SELECT
      myNewFuncNameForClass(column_name)
  FROM
      table1
  ;
  ```
+ If your UDF JAR includes Flink SDKs, then configure your Java project so that the UDF source code can compile against the Flink SDKs, but the Flink SDK classes are not themselves included in the build artifact, for example the JAR. 

  You can use `provided` scope in Apache Maven, `compileOnly` dependency declarations in Gradle, `provided` scope in SBT, or equivalent directive in their UDF project build configuration. You can refer to this [pom](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47) from the UDF jar example, which adheres to such a prerequisite on a maven project. For a complete step-by-step tutorial, see this [Translate, redact and analyze streaming data using SQL functions with Amazon Managed Service for Apache Flink, Amazon Translate, and Amazon Comprehend](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/).

# Enable checkpointing
<a name="how-zeppelin-checkpoint"></a>

You enable checkpointing by using environment settings. For information about checkpointing, see [Fault Tolerance](https://docs.aws.amazon.com/managed-flink/latest/java/how-fault.html) in the [Managed Service for Apache Flink Developer Guide](https://docs.aws.amazon.com/managed-flink/latest/java/).

## Set the checkpointing interval
<a name="how-zeppelin-checkpoint-interval"></a>

The following Scala code example sets your application's checkpoint interval to one minute:

```
// start a checkpoint every 1 minute
stenv.enableCheckpointing(60000)
```

The following Python code example sets your application's checkpoint interval to one minute:

```
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.interval", "1min"
)
```

## Set the checkpointing type
<a name="how-zeppelin-checkpoint-type"></a>

The following Scala code example sets your application's checkpoint mode to `EXACTLY_ONCE` (the default):

```
// set mode to exactly-once (this is the default)
stenv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
```

The following Python code example sets your application's checkpoint mode to `EXACTLY_ONCE` (the default):

```
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.mode", "EXACTLY_ONCE"
)
```

# Upgrade Studio Runtime
<a name="upgrading-studio-runtime"></a>

This section contains information about how to upgrade your Studio notebook Runtime. We recommend that you always upgrade to the latest supported Studio Runtime.

## Upgrade your notebook to a new Studio Runtime
<a name="upgrading-notebook"></a>

Depending on how you use Studio, the steps to upgrade your Runtime differ. Select the option that fits your use case.

### SQL queries or Python code with no external dependencies
<a name="notebook-no-dependencies"></a>

If you are using SQL or Python without any external dependencies, use the following Runtime upgrade process. We recommend that you upgrade to the latest Runtime version. The upgrade process is the same, reardless of the Runtime version you are upgrading from. 

1. Create a new Studio notebook using the latest Runtime.

1. Copy and paste the code of every note from the old notebook to the new notebook.

1. In the new notebook, adjust the code to make it compatible with any Apache Flink feature that has changed from the previous version.
   + Run the new notebook. Open the notebook and run it note by note, in sequence, and test if it works.
   + Make any required changes to the code.
   + Stop the new notebook.

1. If you had deployed the old notebook as application:
   + Deploy the new notebook as a separate, new application.
   + Stop the old application.
   + Run the new application without snapshot.

1. Stop the old notebook if it's running. Start the new notebook, as required, for interactive use.

**Process flow for upgrading without external dependencies**

![\[The following diagram represents the recommended workflow to upgrade your notebook without external dependencies.\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/MSF-Studio-upgrade-without-dependencies.png)


### SQL queries or Python code with external dependencies
<a name="notebook-dependencies"></a>

Follow this process if you are using SQL or Python and using external dependencies such as connectors or custom artifacts, like user-defined functions implemented in Python or Java. We recommend that you upgrade to the latest Runtime. The process is the same, regardless of the Runtime version that you are upgrading from.

1. Create a new Studio notebook using the latest Runtime.

1. Copy and paste the code of every note from the old notebook to the new notebook.

1. Update the external dependencies and custom artifacts.
   + Look for new connectors compatible with the Apache Flink version of the new Runtime. Refer to [Table & SQL Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/overview/) in the Apache Flink documentation to find the correct connectors for the Flink version.
   + Update the code of user-defined functions to match changes in the Apache Flink API, and any Python or JAR dependencies used by the user-defined functions. Re-package your updated custom artifact.
   + Add these new connectors and artifacts to the new notebook.

1. In the new notebook, adjust the code to make it compatible with any Apache Flink feature that has changed from the previous version.
   + Run the new notebook. Open the notebook and run it note by note, in sequence, and test if it works.
   + Make any required changes to the code.
   + Stop the new notebook.

1. If you had deployed the old notebook as application:
   + Deploy the new notebook as a separate, new application.
   + Stop the old application.
   + Run the new application without snapshot.

1. Stop the old notebook if it's running. Start the new notebook, as required, for interactive use.

**Process flow for upgrading with external dependencies**

![\[The following diagram represents the recommended workflow to upgrade your notebook with external dependencies..\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/MSF-Studio-upgrade-with-dependencies.png)


# Work with AWS Glue
<a name="how-zeppelin-glue"></a>

Your Studio notebook stores and gets information about its data sources and sinks from AWS Glue. When you create your Studio notebook, you specify the AWS Glue database that contains your connection information. When you access your data sources and sinks, you specify AWS Glue tables contained in the database. Your AWS Glue tables provide access to the AWS Glue connections that define the locations, schemas, and parameters of your data sources and destinations.

Studio notebooks use table properties to store application-specific data. For more information, see [Table properties](how-zeppelin-glue-properties.md).

For an example of how to set up a AWS Glue connection, database, and table for use with Studio notebooks, see [Create an AWS Glue database](example-notebook.md#example-notebook-glue) in the [Tutorial: Create a Studio notebook in Managed Service for Apache Flink](example-notebook.md) tutorial.

# Table properties
<a name="how-zeppelin-glue-properties"></a>

In addition to data fields, your AWS Glue tables provide other information to your Studio notebook using table properties. Managed Service for Apache Flink uses the following AWS Glue table properties:
+ [Define Apache Flink time values](#how-zeppelin-glue-timestamp): These properties define how Managed Service for Apache Flink emits Apache Flink internal data processing time values.
+ [Use Flink connector and format properties](#how-zeppelin-glue-connector): These properties provide information about your data streams.

To add a property to an AWS Glue table, do the following:

1. Sign in to the AWS Management Console and open the AWS Glue console at [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. From the list of tables, choose the table that your application uses to store its data connection information. Choose **Action**, **Edit table details**.

1. Under **Table Properties**, enter **managed-flink.proctime** for **key** and **user\$1action\$1time** for **Value**.

## Define Apache Flink time values
<a name="how-zeppelin-glue-timestamp"></a>

Apache Flink provides time values that describe when stream processing events occured, such as [ Processing Time](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#processing-time) and [ Event Time](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#event-time). To include these values in your application output, you define properties on your AWS Glue table that tell the Managed Service for Apache Flink runtime to emit these values into the specified fields. 

The keys and values you use in your table properties are as follows:


| Timestamp Type | Key | Value | 
| --- |--- |--- |
| [ Processing Time](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#processing-time) | managed-flink.proctime | The column name that AWS Glue will use to expose the value. This column name does not correspond to an existing table column. | 
| [ Event Time](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#event-time) | managed-flink.rowtime | The column name that AWS Glue will use to expose the value. This column name corresponds to an existing table column. | 
| managed-flink.watermark.*column\$1name*.milliseconds | The watermark interval in milliseconds | 

## Use Flink connector and format properties
<a name="how-zeppelin-glue-connector"></a>

You provide information about your data sources to your application's Flink connectors using AWS Glue table properties. Some examples of the properties that Managed Service for Apache Flink uses for connectors are as follows:


| Connector Type | Key | Value | 
| --- |--- |--- |
| [ Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/kafka.html#connector-options) | format | The format used to deserialize and serialize Kafka messages, e.g. json or csv. | 
| scan.startup.mode | The startup mode for the Kafka consumer, e.g. earliest-offset or timestamp. | 
| [ Kinesis](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kinesis.html#connector-options) | format | The format used to deserialize and serialize Kinesis data stream records, e.g. json or csv. | 
| aws.region | The AWS region where the stream is defined.  | 
| [ S3 (Filesystem)](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html) | format | The format used to deserialize and serialize files, e.g. json or csv. | 
| path | The Amazon S3 path, e.g. s3://mybucket/. | 

For more information about other connectors besides Kinesis and Apache Kafka, see your connector's documentation.

# Examples and tutorials for Studio notebooks in Managed Service for Apache Flink
<a name="how-zeppelin-examples"></a>

**Topics**
+ [

# Tutorial: Create a Studio notebook in Managed Service for Apache Flink
](example-notebook.md)
+ [

# Tutorial: Deploy a Studio notebook as a Managed Service for Apache Flink application with durable state
](example-notebook-deploy.md)
+ [

# View example queries to analyza data in a Studio notebook
](how-zeppelin-sql-examples.md)

# Tutorial: Create a Studio notebook in Managed Service for Apache Flink
<a name="example-notebook"></a>

The following tutorial demonstrates how to create a Studio notebook that reads data from a Kinesis data stream or an Amazon MSK cluster.

**Topics**
+ [

## Complete the prerequisites
](#example-notebook-setup)
+ [

## Create an AWS Glue database
](#example-notebook-glue)
+ [

## Next steps: Create a Studio notebook with Kinesis Data Streams or Amazon MSK
](#examples-notebook-nextsteps)
+ [

# Create a Studio notebook with Kinesis Data Streams
](example-notebook-streams.md)
+ [

# Create a Studio notebook with Amazon MSK
](example-notebook-msk.md)
+ [

# Clean up your application and dependent resources
](example-notebook-cleanup.md)

## Complete the prerequisites
<a name="example-notebook-setup"></a>

Make sure that your AWS CLI is version 2 or later. To install the latest AWS CLI, see [ Installing, updating, and uninstalling the AWS CLI version 2](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html).

## Create an AWS Glue database
<a name="example-notebook-glue"></a>

Your Studio notebook uses an [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) database for metadata about your Amazon MSK data source.

**Create an AWS Glue Database**

1. Open the AWS Glue console at [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Choose **Add database**. In the **Add database** window, enter **default** for **Database name**. Choose **Create**. 

## Next steps: Create a Studio notebook with Kinesis Data Streams or Amazon MSK
<a name="examples-notebook-nextsteps"></a>

With this tutorial, you can create a Studio notebook that uses either Kinesis Data Streams or Amazon MSK:
+ [Create a Studio notebook with Kinesis Data Streams](example-notebook-streams.md) : With Kinesis Data Streams, you quickly create an application that uses a Kinesis data stream as a source. You only need to create a Kinesis data stream as a dependent resource.
+ [Create a Studio notebook with Amazon MSK](example-notebook-msk.md) : With Amazon MSK, you create an application that uses a Amazon MSK cluster as a source. You need to create an Amazon VPC, an Amazon EC2 client instance, and an Amazon MSK cluster as dependent resources.

# Create a Studio notebook with Kinesis Data Streams
<a name="example-notebook-streams"></a>

This tutorial describes how to create a Studio notebook that uses a Kinesis data stream as a source.

**Topics**
+ [

## Complete the prerequisites
](#example-notebook-streams-setup)
+ [

## Create an AWS Glue table
](#example-notebook-streams-glue)
+ [

## Create a Studio notebook with Kinesis Data Streams
](#example-notebook-streams-create)
+ [

## Send data to your Kinesis data stream
](#example-notebook-streams-send)
+ [

## Test your Studio notebook
](#example-notebook-streams-test)

## Complete the prerequisites
<a name="example-notebook-streams-setup"></a>

Before you create a Studio notebook, create a Kinesis data stream (`ExampleInputStream`). Your application uses this stream for the application source.

You can create this stream using either the Amazon Kinesis console or the following AWS CLI command. For console instructions, see [Creating and Updating Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) in the *Amazon Kinesis Data Streams Developer Guide*. Name the stream **ExampleInputStream** and set the **Number of open shards** to **1**.

To create the stream (`ExampleInputStream`) using the AWS CLI, use the following Amazon Kinesis `create-stream` AWS CLI command.

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## Create an AWS Glue table
<a name="example-notebook-streams-glue"></a>

Your Studio notebook uses an [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) database for metadata about your Kinesis Data Streams data source.

**Note**  
You can either manually create the database first or you can let Managed Service for Apache Flink create it for you when you create the notebook. Similarly, you can either manually create the table as described in this section, or you can use the create table connector code for Managed Service for Apache Flink in your notebook within Apache Zeppelin to create your table via a DDL statement. You can then check in AWS Glue to make sure the table was correctly created.

**Create a Table**

1. Sign in to the AWS Management Console and open the AWS Glue console at [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. If you don't already have a AWS Glue database, choose **Databases** from the left navigation bar. Choose **Add Database**. In the **Add database** window, enter **default** for **Database name**. Choose **Create**.

1. In the left navigation bar, choose **Tables**. In the **Tables** page, choose **Add tables**, **Add table manually**.

1. In the **Set up your table's properties** page, enter **stock** for the **Table name**. Make sure you select the database you created previously. Choose **Next**.

1. In the **Add a data store** page, choose **Kinesis**. For the **Stream name**, enter **ExampleInputStream**. For **Kinesis source URL**, choose enter **https://kinesis.us-east-1.amazonaws.com**. If you copy and paste the **Kinesis source URL**, be sure to delete any leading or trailing spaces. Choose **Next**.

1. In the **Classification** page, choose **JSON**. Choose **Next**.

1. In the **Define a Schema** page, choose Add Column to add a column. Add columns with the following properties:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/managed-flink/latest/java/example-notebook-streams.html)

   Choose **Next**.

1. On the next page, verify your settings, and choose **Finish**.

1. Choose your newly created table from the list of tables.

1. Choose **Edit table** and add a property with the key `managed-flink.proctime` and the value `proctime`.

1. Choose **Apply**.

## Create a Studio notebook with Kinesis Data Streams
<a name="example-notebook-streams-create"></a>

Now that you have created the resources your application uses, you create your Studio notebook. 

**Topics**
+ [

### Create a Studio notebook using the AWS Management Console
](#example-notebook-create-streams-console)
+ [

### Create a Studio notebook using the AWS CLI
](#example-notebook-msk-create-api)

### Create a Studio notebook using the AWS Management Console
<a name="example-notebook-create-streams-console"></a>

1. Open the Managed Service for Apache Flink console at [ https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard). 

1. In the **Managed Service for Apache Flink applications** page, choose the **Studio** tab. Choose **Create Studio notebook**.
**Note**  
You can also create a Studio notebook from the Amazon MSK or Kinesis Data Streams consoles by selecting your input Amazon MSK cluster or Kinesis data stream, and choosing **Process data in real time**.

1. In the **Create Studio notebook** page, provide the following information:
   + Enter **MyNotebook** for the name of the notebook.
   + Choose **default** for **AWS Glue database**.

   Choose **Create Studio notebook**.

1. In the **MyNotebook** page, choose **Run**. Wait for the **Status** to show **Running**. Charges apply when the notebook is running.

### Create a Studio notebook using the AWS CLI
<a name="example-notebook-msk-create-api"></a>

To create your Studio notebook using the AWS CLI, do the following:

1. Verify your account ID. You need this value to create your application.

1. Create the role `arn:aws:iam::AccountID:role/ZeppelinRole` and add the following permissions to the auto-created role by console.

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. Create a file called `create.json` with the following contents. Replace the placeholder values with your information.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Run the following command to create your application:

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. When the command completes, you see output that shows the details for your new Studio notebook. The following is an example of the output.

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Run the following command to start your application. Replace the sample value with your account ID.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Send data to your Kinesis data stream
<a name="example-notebook-streams-send"></a>

To send test data to your Kinesis data stream, do the following:

1. Open the [ Kinesis Data Generator](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). 

1. Choose **Create a Cognito User with CloudFormation**.

1. The CloudFormation console opens with the Kinesis Data Generator template. Choose **Next**.

1. In the **Specify stack details** page, enter a username and password for your Cognito user. Choose **Next**.

1. In the **Configure stack options** page, choose **Next**.

1. In the **Review Kinesis-Data-Generator-Cognito-User** page, choose the **I acknowledge that AWS CloudFormation might create IAM resources.** checkbox. Choose **Create Stack**.

1. Wait for the CloudFormation stack to finish being created. After the stack is complete, open the **Kinesis-Data-Generator-Cognito-User** stack in the CloudFormation console, and choose the **Outputs** tab. Open the URL listed for the **KinesisDataGeneratorUrl** output value.

1. In the **Amazon Kinesis Data Generator** page, log in with the credentials you created in step 4.

1. On the next page, provide the following values:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/managed-flink/latest/java/example-notebook-streams.html)

   For **Record Template**, paste the following code:

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. Choose **Send data**.

1. The generator will send data to your Kinesis data stream. 

   Leave the generator running while you complete the next section.

## Test your Studio notebook
<a name="example-notebook-streams-test"></a>

In this section, you use your Studio notebook to query data from your Kinesis data stream.

1. Open the Managed Service for Apache Flink console at [ https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard).

1. On the **Managed Service for Apache Flink applications** page, choose the **Studio notebook** tab. Choose **MyNotebook**.

1. In the **MyNotebook** page, choose **Open in Apache Zeppelin**.

   The Apache Zeppelin interface opens in a new tab.

1. In the **Welcome to Zeppelin\$1** page, choose **Zeppelin Note**.

1. In the **Zeppelin Note** page, enter the following query into a new note:

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Choose the run icon.

   After a short time, the note displays data from the Kinesis data stream.

To open the Apache Flink Dashboard for your application to view operational aspects, choose **FLINK JOB**. For more information about the Flink Dashboard, see [Apache Flink Dashboard](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) in the [Managed Service for Apache Flink Developer Guide](https://docs.aws.amazon.com/).

For more examples of Flink Streaming SQL queries, see [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) in the [Apache Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Create a Studio notebook with Amazon MSK
<a name="example-notebook-msk"></a>

This tutorial describes how to create a Studio notebook that uses an Amazon MSK cluster as a source.

**Topics**
+ [

## Set up an Amazon MSK cluster
](#example-notebook-msk-setup)
+ [

## Add a NAT gateway to your VPC
](#example-notebook-msk-nat)
+ [

## Create an AWS Glue connection and table
](#example-notebook-msk-glue)
+ [

## Create a Studio notebook with Amazon MSK
](#example-notebook-msk-create)
+ [

## Send data to your Amazon MSK cluster
](#example-notebook-msk-send)
+ [

## Test your Studio notebook
](#example-notebook-msk-test)

## Set up an Amazon MSK cluster
<a name="example-notebook-msk-setup"></a>

For this tutorial, you need an Amazon MSK cluster that allows plaintext access. If you don't have an Amazon MSK cluster set up already, follow the [Getting Started Using Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) tutorial to create an Amazon VPC, an Amazon MSK cluster, a topic, and an Amazon EC2 client instance.

When following the tutorial, do the following:
+ In [Step 3: Create an Amazon MSK Cluster](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html), on step 4, change the `ClientBroker` value from `TLS` to **PLAINTEXT**.

## Add a NAT gateway to your VPC
<a name="example-notebook-msk-nat"></a>

If you created an Amazon MSK cluster by following the [Getting Started Using Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) tutorial, or if your existing Amazon VPC does not already have a NAT gateway for its private subnets, you must add a NAT Gateway to your Amazon VPC. The following diagram shows the architecture. 

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/vpc_05.png)


To create a NAT gateway for your Amazon VPC, do the following:

1. Open the Amazon VPC console at [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Choose **NAT Gateways** from the left navigation bar.

1. On the **NAT Gateways** page, choose **Create NAT Gateway**.

1. On the **Create NAT Gateway** page, provide the following values:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/managed-flink/latest/java/example-notebook-msk.html)

   Choose **Create NAT Gateway**.

1. On the left navigation bar, choose **Route Tables**.

1. Choose **Create Route Table**.

1. On the **Create route table** page, provide the following information:
   + **Name tag**: **ZeppelinRouteTable**
   + **VPC**: Choose your VPC (e.g. **AWSKafkaTutorialVPC**).

   Choose **Create**.

1. In the list of route tables, choose **ZeppelinRouteTable**. Choose the **Routes** tab, and choose **Edit routes**.

1. In the **Edit Routes** page, choose **Add route**.

1. In the ****For **Destination**, enter **0.0.0.0/0**. For **Target**, choose **NAT Gateway**, **ZeppelinGateway**. Choose **Save Routes**. Choose **Close**.

1. On the Route Tables page, with **ZeppelinRouteTable** selected, choose the **Subnet associations** tab. Choose **Edit subnet associations**.

1. In the **Edit subnet associations** page, choose **AWSKafkaTutorialSubnet2** and **AWSKafkaTutorialSubnet3**. Choose **Save**.

## Create an AWS Glue connection and table
<a name="example-notebook-msk-glue"></a>

Your Studio notebook uses an [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) database for metadata about your Amazon MSK data source. In this section, you create an AWS Glue connection that describes how to access your Amazon MSK cluster, and an AWS Glue table that describes how to present the data in your data source to clients such as your Studio notebook. 

**Create a Connection**

1. Sign in to the AWS Management Console and open the AWS Glue console at [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. If you don't already have a AWS Glue database, choose **Databases** from the left navigation bar. Choose **Add Database**. In the **Add database** window, enter **default** for **Database name**. Choose **Create**.

1. Choose **Connections** from the left navigation bar. Choose **Add Connection**.

1. In the **Add Connection** window, provide the following values:
   + For **Connection name**, enter **ZeppelinConnection**.
   + For **Connection type**, choose **Kafka**.
   + For **Kafka bootstrap server URLs**, provide the bootstrap broker string for your cluster. You can get the bootstrap brokers from either the MSK console, or by entering the following CLI command:

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + Uncheck the **Require SSL connection** checkbox.

   Choose **Next**.

1. In the **VPC** page, provide the following values:
   + For **VPC**, choose the name of your VPC (e.g. ** AWSKafkaTutorialVPC**.)
   + For **Subnet**, choose **AWSKafkaTutorialSubnet2**.
   + For **Security groups**, choose all available groups.

   Choose **Next**.

1. In the **Connection properties** / **Connection access** page, choose **Finish**.

**Create a Table**
**Note**  
You can either manually create the table as described in the following steps, or you can use the create table connector code for Managed Service for Apache Flink in your notebook within Apache Zeppelin to create your table via a DDL statement. You can then check in AWS Glue to make sure the table was correctly created.

1. In the left navigation bar, choose **Tables**. In the **Tables** page, choose **Add tables**, **Add table manually**.

1. In the **Set up your table's properties** page, enter **stock** for the **Table name**. Make sure you select the database you created previously. Choose **Next**.

1. In the **Add a data store** page, choose **Kafka**. For the **Topic name**, enter your topic name (e.g. **AWSKafkaTutorialTopic**). For **Connection**, choose **ZeppelinConnection**.

1. In the **Classification** page, choose **JSON**. Choose **Next**.

1. In the **Define a Schema** page, choose Add Column to add a column. Add columns with the following properties:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/managed-flink/latest/java/example-notebook-msk.html)

   Choose **Next**.

1. On the next page, verify your settings, and choose **Finish**.

1. Choose your newly created table from the list of tables.

1. Choose **Edit table** and add the following properties:
   + key: `managed-flink.proctime`, value: `proctime`
   + key: `flink.properties.group.id`, value: `test-consumer-group`
   + key: `flink.properties.auto.offset.reset`, value: `latest`
   + key: `classification`, value: `json`

   Without these key/value pairs, the Flink notebook runs into an error. 

1. Choose **Apply**.

## Create a Studio notebook with Amazon MSK
<a name="example-notebook-msk-create"></a>

Now that you have created the resources your application uses, you create your Studio notebook. 

**Topics**
+ [

### Create a Studio notebook using the AWS Management Console
](#example-notebook-create-msk-console)
+ [

### Create a Studio notebook using the AWS CLI
](#example-notebook-msk-create-api)

**Note**  
You can also create a Studio notebook from the Amazon MSK console by choosing an existing cluster, then choosing **Process data in real time**.

### Create a Studio notebook using the AWS Management Console
<a name="example-notebook-create-msk-console"></a>

1. Open the Managed Service for Apache Flink console at [ https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard).

1. In the **Managed Service for Apache Flink applications** page, choose the **Studio** tab. Choose **Create Studio notebook**.
**Note**  
To create a Studio notebook from the Amazon MSK or Kinesis Data Streams consoles, select your input Amazon MSK cluster or Kinesis data stream, then choose **Process data in real time**.

1. In the **Create Studio notebook** page, provide the following information:
   + Enter **MyNotebook** for **Studio notebook Name**.
   + Choose **default** for **AWS Glue database**.

   Choose **Create Studio notebook**.

1. In the **MyNotebook** page, choose the **Configuration** tab. In the **Networking** section, choose **Edit**.

1. In the **Edit networking for MyNotebook** page, choose **VPC configuration based on Amazon MSK cluster**. Choose your Amazon MSK cluster for **Amazon MSK Cluster**. Choose **Save changes**.

1. In the **MyNotebook** page, choose **Run**. Wait for the **Status** to show **Running**.

### Create a Studio notebook using the AWS CLI
<a name="example-notebook-msk-create-api"></a>

To create your Studio notebook by using the AWS CLI, do the following:

1. Verify that you have the following information. You need these values to create your application.
   + Your account ID.
   + The subnet IDs and security group ID for the Amazon VPC that contains your Amazon MSK cluster.

1. Create a file called `create.json` with the following contents. Replace the placeholder values with your information.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Run the following command to create your application:

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. When the command completes, you should see output similar to the following, showing the details for your new Studio notebook:

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Run the following command to start your application. Replace the sample value with your account ID.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Send data to your Amazon MSK cluster
<a name="example-notebook-msk-send"></a>

In this section, you run a Python script in your Amazon EC2 client to send data to your Amazon MSK data source.

1. Connect to your Amazon EC2 client.

1. Run the following commands to install Python version 3, Pip, and the Kafka for Python package, and confirm the actions:

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. Configure the AWS CLI on your client machine by entering the following command:

   ```
   aws configure
   ```

   Provide your account credentials, and **us-east-1** for the `region`.

1. Create a file called `stock.py` with the following contents. Replace the sample value with your Amazon MSK cluster's Bootstrap Brokers string, and update the topic name if your topic is not **AWSKafkaTutorialTopic**:

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. Run the script with the following command:

   ```
   $ python3 stock.py
   ```

1. Leave the script running while you complete the following section.

## Test your Studio notebook
<a name="example-notebook-msk-test"></a>

In this section, you use your Studio notebook to query data from your Amazon MSK cluster.

1. Open the Managed Service for Apache Flink console at [ https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard).

1. On the **Managed Service for Apache Flink applications** page, choose the **Studio notebook** tab. Choose **MyNotebook**.

1. In the **MyNotebook** page, choose **Open in Apache Zeppelin**.

   The Apache Zeppelin interface opens in a new tab.

1. In the **Welcome to Zeppelin\$1** page, choose **Zeppelin new note**.

1. In the **Zeppelin Note** page, enter the following query into a new note:

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Choose the run icon.

   The application displays data from the Amazon MSK cluster.

To open the Apache Flink Dashboard for your application to view operational aspects, choose **FLINK JOB**. For more information about the Flink Dashboard, see [Apache Flink Dashboard](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) in the [Managed Service for Apache Flink Developer Guide](https://docs.aws.amazon.com/).

For more examples of Flink Streaming SQL queries, see [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) in the [Apache Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Clean up your application and dependent resources
<a name="example-notebook-cleanup"></a>

## Delete your Studio notebook
<a name="example-notebook-cleanup-app"></a>

1. Open the Managed Service for Apache Flink console.

1. Choose **MyNotebook**.

1. Choose **Actions**, then **Delete**.

## Delete your AWS Glue database and connection
<a name="example-notebook-cleanup-glue"></a>

1. Open the AWS Glue console at [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Choose **Databases** from the left navigation bar. Check the checkbox next to **Default** to select it. Choose **Action**, **Delete Database**. Confirm your selection.

1. Choose **Connections** from the left navigation bar. Check the checkbox next to **ZeppelinConnection** to select it. Choose **Action**, **Delete Connection**. Confirm your selection.

## Delete your IAM role and policy
<a name="example-notebook-msk-cleanup-iam"></a>

1. Open the IAM console at [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Choose **Roles** from the left navigation bar.

1. Use the search bar to search for the **ZeppelinRole** role.

1. Choose the **ZeppelinRole** role. Choose **Delete Role**. Confirm the deletion.

## Delete your CloudWatch log group
<a name="example-notebook-cleanup-cw"></a>

The console creates a CloudWatch Logs group and log stream for you when you create your application using the console. You do not have a log group and stream if you created your application using the AWS CLI.

1. Open the CloudWatch console at [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. Choose **Log groups** from the left navigation bar.

1. Choose the **/AWS/KinesisAnalytics/MyNotebook** log group.

1. Choose **Actions**, **Delete log group(s)**. Confirm the deletion.

## Clean up Kinesis Data Streams resources
<a name="example-notebook-cleanup-streams"></a>

To delete your Kinesis stream, open the Kinesis Data Streams console, select your Kinesis stream, and choose **Actions**, **Delete**.

## Clean up MSK resources
<a name="example-notebook-cleanup-msk"></a>

Follow the steps in this section if you created an Amazon MSK cluster for this tutorial. This section has directions for cleaning up your Amazon EC2 client instance, Amazon VPC, and Amazon MSK cluster.

### Delete your Amazon MSK cluster
<a name="example-notebook-msk-cleanup-msk"></a>

Follow these steps if you created an Amazon MSK cluster for this tutorial.

1. Open the Amazon MSK console at [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. Choose **AWSKafkaTutorialCluster**. Choose **Delete**. Enter **delete** in the window that appears, and confirm your selection.

### Terminate your client instance
<a name="example-notebook-msk-cleanup-client"></a>

Follow these steps if you created an Amazon EC2 client instance for this tutorial.

1. Open the Amazon EC2 console at [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Choose **Instances** from the left navigation bar.

1. Choose the checkbox next to **ZeppelinClient** to select it.

1. Choose **Instance State**, **Terminate Instance**.

### Delete your Amazon VPC
<a name="example-notebook-msk-cleanup-vpc"></a>

Follow these steps if you created an Amazon VPC for this tutorial.

1. Open the Amazon EC2 console at [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Choose **Network Interfaces** from the left navigation bar.

1. Enter your VPC ID in the search bar and press enter to search.

1. Select the checkbox in the table header to select all the displayed network interfaces.

1. Choose **Actions**, **Detach**. In the window that appears, choose **Enable** under **Force detachment**. Choose **Detach**, and wait for all of the network interfaces to reach the **Available** status.

1. Select the checkbox in the table header to select all the displayed network interfaces again.

1. Choose **Actions**, **Delete**. Confirm the action.

1. Open the Amazon VPC console at [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Select **AWSKafkaTutorialVPC**. Choose **Actions**, **Delete VPC**. Enter **delete** and confirm the deletion.

# Tutorial: Deploy a Studio notebook as a Managed Service for Apache Flink application with durable state
<a name="example-notebook-deploy"></a>

The following tutorial demonstrates how to deploy a Studio notebook as a Managed Service for Apache Flink application with durable state.

**Topics**
+ [

## Complete prerequisites
](#example-notebook-durable-setup)
+ [

## Deploy an application with durable state using the AWS Management Console
](#example-notebook-deploy-console)
+ [

## Deploy an application with durable state using the AWS CLI
](#example-notebook-deploy-cli)

## Complete prerequisites
<a name="example-notebook-durable-setup"></a>

Create a new Studio notebook by following the [Tutorial: Create a Studio notebook in Managed Service for Apache Flink](example-notebook.md), using either Kinesis Data Streams or Amazon MSK. Name the Studio notebook `ExampleTestDeploy`.

## Deploy an application with durable state using the AWS Management Console
<a name="example-notebook-deploy-console"></a>

1. Add an S3 bucket location where you want the packaged code to be stored under **Application code location - *optional*** in the console. This enables the steps to deploy and run your application directly from the notebook.

1. Add required permissions to the application role to enable the role you are using to read and write to an Amazon S3 bucket, and to launch a Managed Service for Apache Flink application:
   + AmazonS3FullAccess
   + Amazonmanaged-flinkFullAccess
   + Access to your sources, destinations, and VPCs as applicable. For more information, see [Review IAM permissions for Studio notebooks](how-zeppelin-iam.md).

1. Use the following sample code:

   ```
   %flink.ssql(type=update) 
   CREATE TABLE exampleoutput (
     'ticket' VARCHAR,
     'price' DOUBLE
   )
   WITH (
     'connector' = 'kinesis',
     'stream' = 'ExampleOutputStream',
     'aws.region' = 'us-east-1',
     'scan.stream.initpos' = 'LATEST',
     'format' = 'json'
   );
   
   INSERT INTO exampleoutput SELECT ticker, price FROM exampleinputstream
   ```

1. With this feature launch, you will see a new dropdown on the right top corner of each note in your notebook with the name of the notebook. You can do the following:
   + View the Studio notebook settings in the AWS Management Console.
   + Build your Zeppelin Note and export it to Amazon S3. At this point, provide a name for your application and choose **Build and Export**. You will get a notification when the export completes.
   + If you need to, you can view and run any additional tests on the executable in Amazon S3.
   + Once the build is complete, you will be able to deploy your code as a Kinesis streaming application with durable state and autoscaling.
   + Use the dropdown and choose **Deploy Zeppelin Note as Kinesis streaming application**. Review the application name and choose **Deploy via AWS Console**.
   + This will lead you to the AWS Management Console page for creating a Managed Service for Apache Flink application. Note that application name, parallelism, code location, default Glue DB, VPC (if applicable) and IAM roles have been pre-populated. Validate that the IAM roles have the required permissions to your sources and destinations. Snapshots are enabled by default for durable application state management.
   + Choose **create application**.
   + You can choose **configure** and modify any settings, and choose **Run** to start your streaming application.

## Deploy an application with durable state using the AWS CLI
<a name="example-notebook-deploy-cli"></a>

To deploy an application using the AWS CLI, you must update your AWS CLI to use the service model provided with your Beta 2 information. For information about how to use the updated service model, see [Complete the prerequisitesComplete prerequisites](example-notebook.md#example-notebook-setup).

The following example code creates a new Studio notebook:

```
aws kinesisanalyticsv2 create-application \
     --application-name <app-name> \
     --runtime-environment ZEPPELIN-FLINK-3_0 \
     --application-mode INTERACTIVE \
     --service-execution-role <iam-role>
     --application-configuration '{ 
       "ZeppelinApplicationConfiguration": { 
         "CatalogConfiguration": { 
           "GlueDataCatalogConfiguration": { 
             "DatabaseARN": "arn:aws:glue:us-east-1:<account>:database/<glue-database-name>" 
           } 
         } 
       },
       "FlinkApplicationConfiguration": {
         "ParallelismConfiguration": {
           "ConfigurationType": "CUSTOM",
           "Parallelism": 4,
           "ParallelismPerKPU": 4
         }
       },
       "DeployAsApplicationConfiguration": {
            "S3ContentLocation": { 
               "BucketARN": "arn:aws:s3:::<s3bucket>",
               "BasePath": "/something/"
            }
        },
       "VpcConfigurations": [
         {
           "SecurityGroupIds": [
             "<security-group>"
           ],
           "SubnetIds": [
             "<subnet-1>",
             "<subnet-2>"
           ]
         }
       ]
     }' \
     --region us-east-1
```

The following code example starts a Studio notebook:

```
aws kinesisanalyticsv2 start-application \
    --application-name <app-name> \
    --region us-east-1 \
    --no-verify-ssl
```

The following code returns the URL for an application's Apache Zeppelin notebook page:

```
aws kinesisanalyticsv2 create-application-presigned-url \
    --application-name <app-name> \
    --url-type ZEPPELIN_UI_URL \

    --region us-east-1 \
    --no-verify-ssl
```

# View example queries to analyza data in a Studio notebook
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [

## Create tables with Amazon MSK/Apache Kafka
](#how-zeppelin-examples-creating-tables)
+ [

## Create tables with Kinesis
](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [

## Query a tumbling window
](#how-zeppelin-examples-tumbling)
+ [

## Query a sliding window
](#how-zeppelin-examples-sliding)
+ [

## Use interactive SQL
](#how-zeppelin-examples-interactive-sql)
+ [

## Use the BlackHole SQL connector
](#how-zeppelin-examples-blackhole-connector-sql)
+ [

## Use Scala to generate sample data
](#notebook-example-data-generator)
+ [

## Use interactive Scala
](#notebook-example-interactive-scala)
+ [

## Use interactive Python
](#notebook-example-interactive-python)
+ [

## Use a combination of interactive Python, SQL, and Scala
](#notebook-example-interactive-pythonsqlscala)
+ [

## Use a cross-account Kinesis data stream
](#notebook-example-crossaccount-kds)

For information about Apache Flink SQL query settings, see [ Flink on Zeppelin Notebooks for Interactive Data Analysis](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html).

To view your application in the Apache Flink dashboard, choose **FLINK JOB** in your application's **Zeppelin Note** page.

For more information about window queries, see [Windows](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html) in the [Apache Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

For more examples of Apache Flink Streaming SQL queries, see [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) in the [Apache Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

## Create tables with Amazon MSK/Apache Kafka
<a name="how-zeppelin-examples-creating-tables"></a>

You can use the Amazon MSK Flink connector with Managed Service for Apache Flink Studio to authenticate your connection with Plaintext, SSL, or IAM authentication. Create your tables using the specific properties per your requirements.

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

You can combine these with other properties at [Apache Kafka SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/).

## Create tables with Kinesis
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

In the following example, you create a table using Kinesis:

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

For more information on other properties you can use, see [Amazon Kinesis Data Streams SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/).

## Query a tumbling window
<a name="how-zeppelin-examples-tumbling"></a>

The following Flink Streaming SQL query selects the highest price in each five-second tumbling window from the `ZeppelinTopic` table:

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## Query a sliding window
<a name="how-zeppelin-examples-sliding"></a>

The following Apache Flink Streaming SQL query selects the highest price in each five-second sliding window from the `ZeppelinTopic` table:

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## Use interactive SQL
<a name="how-zeppelin-examples-interactive-sql"></a>

This example prints the max of event time and processing time and the sum of values from the key-values table. Ensure that you have the sample data generation script from the [Use Scala to generate sample data](#notebook-example-data-generator) running. To try other SQL queries such as filtering and joins in your Studio notebook, see the Apache Flink documentation: [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) in the Apache Flink documentation.

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## Use the BlackHole SQL connector
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

The BlackHole SQL connector doesn't require that you create a Kinesis data stream or an Amazon MSK cluster to test your queries. For information about the BlackHole SQL connector, see [BlackHole SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html) in the Apache Flink documentation. In this example, the default catalog is an in-memory catalog.

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## Use Scala to generate sample data
<a name="notebook-example-data-generator"></a>

This example uses Scala to generate sample data. You can use this sample data to test various queries. Use the create table statement to create the key-values table.

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## Use interactive Scala
<a name="notebook-example-interactive-scala"></a>

This is the Scala translation of the [Use interactive SQL](#how-zeppelin-examples-interactive-sql). For more Scala examples, see [Table API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) in the Apache Flink documentation.

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Use interactive Python
<a name="notebook-example-interactive-python"></a>

This is the Python translation of the [Use interactive SQL](#how-zeppelin-examples-interactive-sql). For more Python examples, see [Table API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) in the Apache Flink documentation. 

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Use a combination of interactive Python, SQL, and Scala
<a name="notebook-example-interactive-pythonsqlscala"></a>

You can use any combination of SQL, Python, and Scala in your notebook for interactive analysis. In a Studio notebook that you plan to deploy as an application with durable state, you can use a combination of SQL and Scala. This example shows you the sections that are ignored and those that get deployed in the application with durable state.

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## Use a cross-account Kinesis data stream
<a name="notebook-example-crossaccount-kds"></a>

To use a Kinesis data stream that's in an account other than the account that has your Studio notebook, create a service execution role in the account where your Studio notebook is running and a role trust policy in the account that has the data stream. Use `aws.credentials.provider`, `aws.credentials.role.arn`, and `aws.credentials.role.sessionName` in the Kinesis connector in your create table DDL statement to create a table against the data stream.

Use the following service execution role for the Studio notebook account.

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

Use the `AmazonKinesisFullAccess` policy and the following role trust policy for the data stream account.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

Use the following paragraph for the create table statement.

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```

# Troubleshoot Studio notebooks for Managed Service for Apache Flink
<a name="how-zeppelin-troubleshooting"></a>

This section contains troubleshooting information for Studio notebooks.

## Stop a stuck application
<a name="how-zeppelin-troubleshooting-stopping"></a>

To stop an application that is stuck in a transient state, call the [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) action with the `Force` parameter set to `true`. For more information, see [Running Applications](https://docs.aws.amazon.com/managed-flink/latest/java/how-running-apps.html) in the [Managed Service for Apache Flink Developer Guide](https://docs.aws.amazon.com/managed-flink/latest/java/).

## Deploy as an application with durable state in a VPC with no internet access
<a name="how-zeppelin-troubleshooting-deploying-no-internet"></a>

The Managed Service for Apache Flink Studio deploy-as-application function does not support VPC applications without internet access. We recommend that you build your application in Studio, and then use Managed Service for Apache Flink to manually create a Flink application and select the zip file you built in your Notebook.

The following steps outline this approach: 

1. Build and export your Studio application to Amazon S3. This should be a zip file. 

1. Create a Managed Service for Apache Flink application manually with code path referencing the zip file location in Amazon S3. In addition, you will need to configure the application with the following `env` variables (2 `groupID`, 3 `var` in total): 

1. kinesis.analytics.flink.run.options

   1. python: source/note.py

   1. jarfile: lib/PythonApplicationDependencies.jar

1. managed.deploy\$1as\$1app.options

   1. DatabaseARN: *<glue database ARN (Amazon Resource Name)>*

1. You may need to give permissions to the Managed Service for Apache Flink Studio and Managed Service for Apache Flink IAM roles for the services your application uses. You can use the same IAM role for both apps.

## Deploy-as-app size and build time reduction
<a name="how-zeppelin-troubleshooting-deploying-as-app-reduce-build-time"></a>

Studio deploy-as-app for Python applications packages everything available in the Python environment because we cannot determine which libraries you need. This may result in a larger-than necessary deploy-as-app size. The following procedure demonstrates how to reduce the size of the deploy-as-app Python application size by uninstalling dependencies.

If you’re building a Python application with deploy-as-app feature from Studio, you might consider removing pre-installed Python packages from the system if your applications are not depending on. This will not only help to reduce the final artifact size to avoid breaching the service limit for application size, but also improve the build time of applications with the deploy-as-app feature.

You can execute following command to list out all installed Python packages with their respective installed size and selectively remove packages with significant size.

```
%flink.pyflink

!pip list --format freeze | awk -F = {'print $1'} | xargs pip show | grep -E 'Location:|Name:' | cut -d ' ' -f 2 | paste -d ' ' - - | awk '{gsub("-","_",$1); print $2 "/" tolower($1)}' | xargs du -sh 2> /dev/null | sort -hr
```

**Note**  
`apache-beam` is required by Flink Python to operate. You should never remove this package and its dependencies.

Following is the list of pre-install Python packages in Studio V2 which can be considered for removal:

```
scipy
statsmodels
plotnine
seaborn
llvmlite
bokeh
pandas
matplotlib
botocore
boto3
numba
```

**To remove a Python package from Zeppelin notebook:**

1. Check if your application depends on the package, or any of its consuming packages, before removing it. You can identify dependants of a package using [pipdeptree](https://pypi.org/project/pipdeptree/).

1. Executing following command to remove a package:

   ```
   %flink.pyflink
   !pip uninstall -y <package-to-remove>
   ```

1. If you need to retrieve a package which you removed by mistake, executing the following command:

   ```
   %flink.pyflink
   !pip install <package-to-install>
   ```

**Example: Remove `scipy` package before deploying your Python application with deploy-as-app feature.**  

1. Use `pipdeptree` to discover all `scipy` consumers and verify if you can safely remove `scipy`.
   + Install the tool through notebook:

     ```
     %flink.pyflink             
     !pip install pipdeptree
     ```
   + Get reversed dependency tree of `scipy` by running:

     ```
     %flink.pyflink
     !pip -r -p scipy
     ```

     You should see output similar to the following (condensed for brevity):

     ```
     ...
     ------------------------------------------------------------------------ 
     scipy==1.8.0 
     ├── plotnine==0.5.1 [requires: scipy>=1.0.0] 
     ├── seaborn==0.9.0 [requires: scipy>=0.14.0] 
     └── statsmodels==0.12.2 [requires: scipy>=1.1] 
         └── plotnine==0.5.1 [requires: statsmodels>=0.8.0]
     ```

1. Carefully inspect the usage of `seaborn`, `statsmodels` and `plotnine` in your applications. If your applications do not depend on any of `scipy`, `seaborn`, `statemodels`, or `plotnine`, you can remove all of these packages, or only ones which your applications don’t need.

1. Remove the package by running:

   ```
   !pip uninstall -y scipy plotnine seaborn statemodels
   ```

## Cancel jobs
<a name="how-notbook-canceling-jobs"></a>

This section shows you how to cancel Apache Flink jobs that you can't get to from Apache Zeppelin. If you want to cancel such a job, go to the Apache Flink dashboard, copy the job ID, then use it in one of the following examples.

To cancel a single job:

```
%flink.pyflink
import requests

requests.patch("https://zeppelin-flink:8082/jobs/[job_id]", verify=False)
```

To cancel all running jobs:

```
%flink.pyflink
import requests

r = requests.get("https://zeppelin-flink:8082/jobs", verify=False)
jobs = r.json()['jobs']

for job in jobs:
    if (job["status"] == "RUNNING"):
        print(requests.patch("https://zeppelin-flink:8082/jobs/{}".format(job["id"]), verify=False))
```

To cancel all jobs:

```
%flink.pyflink
import requests

r = requests.get("https://zeppelin-flink:8082/jobs", verify=False)
jobs = r.json()['jobs']

for job in jobs:
    requests.patch("https://zeppelin-flink:8082/jobs/{}".format(job["id"]), verify=False)
```

## Restart the Apache Flink interpreter
<a name="how-notbook-restarting-interpreter"></a>

To restart the Apache Flink interpreter within your Studio notebook

1. Choose **Configuration** near the top right corner of the screen.

1. Choose **Interpreter**.

1. Choose **restart** and then **OK**.

# Create custom IAM policies for Managed Service for Apache Flink Studio notebooks
<a name="how-zeppelin-appendix-iam"></a>

You normally use managed IAM policies to allow your application to access dependent resources. If you need finer control over your application's permissions, you can use a custom IAM policy. This section contains examples of custom IAM policies.

**Note**  
In the following policy examples, replace the placeholder text with your application's values.

**Topics**
+ [

## AWS Glue
](#how-zeppelin-iam-glue)
+ [

## CloudWatch Logs
](#how-zeppelin-iam-cw)
+ [

## Kinesis streams
](#how-zeppelin-iam-streams)
+ [

## Amazon MSK clusters
](#how-zeppelin-iam-msk)

## AWS Glue
<a name="how-zeppelin-iam-glue"></a>

The following example policy grants permissions to access a AWS Glue database.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "GlueTable",
            "Effect": "Allow",
            "Action": [
                "glue:GetConnection",
                "glue:GetTable",
                "glue:GetTables",
                "glue:GetDatabase",
                "glue:CreateTable",
                "glue:UpdateTable"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:123456789012:connection/*",
                "arn:aws:glue:us-east-1:123456789012:table/<database-name>/*",
                "arn:aws:glue:us-east-1:123456789012:database/<database-name>",
                "arn:aws:glue:us-east-1:123456789012:database/hive",
                "arn:aws:glue:us-east-1:123456789012:catalog"
            ]
        },
        {
            "Sid": "GlueDatabase",
            "Effect": "Allow",
            "Action": "glue:GetDatabases",
            "Resource": "*"
        }
    ]
}
```

------

## CloudWatch Logs
<a name="how-zeppelin-iam-cw"></a>

The following policy grants permissions to access CloudWatch Logs:

```
{
      "Sid": "ListCloudwatchLogGroups",
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogGroups"
      ],
      "Resource": [
        "arn:aws:logs:<region>:<accountId>:log-group:*"
      ]
    },
    {
      "Sid": "ListCloudwatchLogStreams",
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogStreams"
      ],
      "Resource": [
        "<logGroupArn>:log-stream:*"
      ]
    },
    {
      "Sid": "PutCloudwatchLogs",
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents"
      ],
      "Resource": [
        "<logStreamArn>"
      ]
    }
```

**Note**  
If you create your application using the console, the console adds the necessary policies to access CloudWatch Logs to your application role.

## Kinesis streams
<a name="how-zeppelin-iam-streams"></a>

Your application can use a Kinesis Stream for a source or a destination. Your application needs read permissions to read from a source stream, and write permissions to write to a destination stream.

The following policy grants permissions to read from a Kinesis Stream used as a source:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisShardDiscovery",
            "Effect": "Allow",
            "Action": "kinesis:ListShards",
            "Resource": "*"
        },
        {
            "Sid": "KinesisShardConsumption",
            "Effect": "Allow",
            "Action": [
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:DeregisterStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>"
        },
        {
            "Sid": "KinesisEfoConsumer",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamConsumer",
                "kinesis:SubscribeToShard"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>/consumer/*"
        }
    ]
}
```

------

The following policy grants permissions to write to a Kinesis Stream used as a destination:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisStreamSink",
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:DescribeStreamSummary",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>"
        }
    ]
}
```

------

If your application accesses an encypted Kinesis stream, you must grant additional permissions to access the stream and the stream's encryption key. 

The following policy grants permissions to access an encrypted source stream and the stream's encryption key:

```
{
      "Sid": "ReadEncryptedKinesisStreamSource",
      "Effect": "Allow",
      "Action": [
        "kms:Decrypt"
      ],
      "Resource": [
        "<inputStreamKeyArn>"
      ]
    }
    ,
```

The following policy grants permissions to access an encrypted destination stream and the stream's encryption key:

```
{
      "Sid": "WriteEncryptedKinesisStreamSink",
      "Effect": "Allow",
      "Action": [
        "kms:GenerateDataKey"
      ],
      "Resource": [
        "<outputStreamKeyArn>"
      ]
    }
```

## Amazon MSK clusters
<a name="how-zeppelin-iam-msk"></a>

To grant access to an Amazon MSK cluster, you grant access to the cluster's VPC. For policy examples for accessing an Amazon VPC, see [ VPC Application Permissions](https://docs.aws.amazon.com/managed-flink/latest/java/vpc-permissions.html).