

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Managed Service for Apache Flink와 함께 Studio 노트북 사용
<a name="how-notebook"></a>

Managed Service for Apache Flink용 Studio 노트북을 사용하면 실시간으로 데이터 스트림을 대화식으로 쿼리하고 표준 SQL, Python 및 Scala를 사용하여 스트림 처리 애플리케이션을 쉽게 빌드하고 실행할 수 있습니다. AWS 관리 콘솔에서 몇 번의 클릭만으로 서버리스 노트북을 시작하여 데이터 스트림을 쿼리하고 몇 초 만에 결과를 얻을 수 있습니다.

노트북은 웹 기반 개발 환경입니다. 노트북을 사용하면 Apache Flink에서 제공하는 고급 기능과 결합된 간단한 대화형 개발 환경을 얻을 수 있습니다. Studio 노트북은 [Apache Zeppelin](https://zeppelin.apache.org/) 기반 노트북을 사용하며 [Apache Flink](https://flink.apache.org/)를 스트림 처리 엔진으로 사용합니다. Studio 노트북은 이러한 기술을 완벽하게 결합하여 모든 기술을 갖춘 개발자가 데이터 스트림에 대한 고급 분석을 이용할 수 있도록 합니다.

Apache Zeppelin은 Studio 노트북에 다음을 포함한 완벽한 분석 도구 제품군을 제공합니다.
+ 데이터 시각화
+ 데이터를 파일로 내보내기
+ 보다 쉬운 분석을 위한 출력 형식 제어

Apache Flink 및 Apache Zeppelin용 관리 서비스 사용을 시작하려면 [자습서: Managed Service for Apache Flink에서 Studio 노트북 생성](example-notebook.md) 섹션을 참조하세요. Apache Zeppelin 에 대한 자세한 내용은 [Apache Zeppelin 문서](http://zeppelin.apache.org)를 참조하세요.

 노트북을 사용하면 SQL, Python 또는 Scala의 Apache Flink [Table API 및 SQL](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/) 또는 Scala의 [DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/)를 사용하여 쿼리를 모델링할 수 있습니다. 그런 다음 몇 번의 클릭만으로 Studio 노트북을 프로덕션 워크로드에 맞게 지속적으로 실행되는 비대화형 Managed Service for Apache Flink 스트림 처리 애플리케이션으로 승격할 수 있습니다.

**Topics**
+ [올바른 Studio 노트북 런타임 버전 사용](studio-notebook-versions.md)
+ [Studio 노트북 생성](how-zeppelin-creating.md)
+ [스트리밍 데이터의 대화형 분석 수행](how-zeppelin-interactive.md)
+ [지속 가능한 상태의 애플리케이션으로 배포](how-notebook-durable.md)
+ [IAM 권한](how-zeppelin-iam.md)
+ [커넥터 및 종속성 사용](how-zeppelin-connectors.md)
+ [사용자 정의 함수](how-zeppelin-udf.md)
+ [체크포인트 지정 활성화](how-zeppelin-checkpoint.md)
+ [Studio 런타임 업그레이드](upgrading-studio-runtime.md)
+ [작업 AWS Glue](how-zeppelin-glue.md)
+ [Managed Service for Apache Flink의 Studio 노트북 예제 및 자습서](how-zeppelin-examples.md)
+ [Managed Service for Apache Flink의 Studio 노트북 문제 해결](how-zeppelin-troubleshooting.md)
+ [Managed Service for Apache Flink Studio 노트북용 사용자 지정 IAM 정책 생성](how-zeppelin-appendix-iam.md)

# 올바른 Studio 노트북 런타임 버전 사용
<a name="studio-notebook-versions"></a>

Amazon Managed Service for Apache Flink Studio를 사용하면 대화형 노트북 환경에서 표준 SQL, Python, Scala를 사용해 실시간으로 데이터 스트림을 조회하고 스트림 처리 애플리케이션을 빌드 및 실행할 수 있습니다. Studio 노트북은 [Apache Zeppelin](https://zeppelin.apache.org/)을 기반으로 하며 [Apache Flink](https://flink.apache.org/)를 스트림 처리 엔진으로 사용합니다.

**참고**  
**Apache Flink 버전 1.11**을 사용하는 Studio 런타임은 2024년 11월 5일에 지원이 중단될 예정입니다. 이 날짜 이후에는 이 버전을 사용하여 새 노트북을 실행하거나 새 애플리케이션을 생성할 수 없습니다. 해당 시점 전에 최신 런타임(Apache Flink 1.15 및 Apache Zeppelin 0.10)으로 업그레이드할 것을 권장합니다. 노트북 업그레이드 방법에 대한 지침은 [Studio 런타임 업그레이드](upgrading-studio-runtime.md) 섹션을 참조하세요.


**Studio 런타임**  

| Apache Flink 버전 | Apache Zeppelin 버전 | Python 버전 |  | 
| --- | --- | --- | --- | 
| 1.15 | 0.1 | 3.8 | 권장 | 
| 1.13 | 0.9 | 3.8 | 2024년 10월 16일까지 지원 | 
| 1.11 | 0.9 | 3.7 | 2025년 2월 24일 사용 중단 | 

# Studio 노트북 생성
<a name="how-zeppelin-creating"></a>

Studio 노트북에는 스트리밍 데이터에서 실행되고 분석 결과를 반환하는 SQL, Python 또는 Scala로 작성된 쿼리 또는 프로그램이 포함되어 있습니다. 콘솔이나 CLI를 사용하여 애플리케이션을 만들고 데이터 소스의 데이터를 분석하기 위한 쿼리를 제공합니다.

애플리케이션은 다음과 같은 구성 요소로 이루어집니다.
+ Amazon MSK 클러스터, Kinesis 데이터 스트림 또는 Amazon S3 버킷과 같은 데이터 소스.
+  AWS Glue 데이터베이스. 이 데이터베이스에는 데이터 소스, 대상 스키마 및 엔드포인트를 저장하는 테이블이 포함되어 있습니다. 자세한 내용을 알아보려면 [작업 AWS Glue](how-zeppelin-glue.md) 섹션을 참조하세요.
+ 애플리케이션 코드. 코드는 분석 쿼리 또는 프로그램을 구현합니다.
+ 애플리케이션 설정 및 런타임 속성. 애플리케이션 설정과 런타임 속성에 대한 자세한 내용은 [Apache Flink 애플리케이션 개발자 안내서](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html)에서 다음 항목을 참조하세요.
  + **애플리케이션 병렬성 및 크기 조정:** 애플리케이션의 병렬성 설정을 사용하여 애플리케이션이 동시에 실행할 수 있는 쿼리 수를 제어할 수 있습니다. 또한 다음과 같은 상황에서 쿼리에 실행 경로가 여러 개 있는 경우 향상된 병렬 처리 기능을 활용할 수 있습니다.
    + Kinesis 데이터 스트림의 여러 샤드를 처리하는 경우
    + `KeyBy` 연산자를 사용하여 데이터를 분할하는 경우.
    + 여러 윈도우 연산자를 사용하는 경우

    애플리케이션 스케일링에 대한 자세한 내용은 [Apache Flink용 Managed Service for Apache Flink의 애플리케이션 스케일링](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html)을 참조하세요.
  + **로깅 및 모니터링:** 애플리케이션 로깅 및 모니터링에 대한 자세한 내용은 [Apache Flink용 Amazon Managed Service for Apache Flink의 로깅 및 모니터링을](https://docs.aws.amazon.com/managed-flink/latest/java/monitoring-overview.html) 참조하세요.
  + 애플리케이션은 내결함성을 위해 체크포인트와 세이브포인트를 사용합니다. Studio 노트북에서는 체크포인트와 세이브포인트가 기본적으로 활성화되지 않습니다.

 AWS Management Console 또는를 사용하여 Studio 노트북을 생성할 수 있습니다 AWS CLI.

콘솔에서 애플리케이션을 생성할 때 사용할 수 있는 옵션은 다음과 같습니다.
+ Amazon MSK 콘솔에서 클러스터를 선택한 다음 **실시간 데이터 처리**를 선택합니다.
+ Kinesis Data Streams 콘솔에서 데이터 스트림을 선택한 다음 **애플리케이션** 탭에서 **실시간 데이터 처리**를 선택합니다.
+ Apache Flink용 관리형 서비스 콘솔에서 **Studio** 탭을 선택한 다음 **Studio 노트북 생성**을 선택합니다.

# 스트리밍 데이터의 대화형 분석 수행
<a name="how-zeppelin-interactive"></a>

Apache Zeppelin으로 구동되는 서버리스 노트북을 사용하여 스트리밍 데이터와 상호 작용합니다. 노트북에는 여러 개의 노트가 있을 수 있으며, 각 노트에는 코드를 작성할 수 있는 하나 이상의 단락이 있을 수 있습니다.

다음 예제 SQL 쿼리는 데이터 소스에서 데이터를 검색하는 방법을 보여줍니다.

```
%flink.ssql(type=update)
select * from stock;
```

Flink Streaming SQL 쿼리의 더 많은 예는 다음 [Managed Service for Apache Flink의 Studio 노트북 예제 및 자습서](how-zeppelin-examples.md) 및 Apache Flink 설명서의 [쿼리](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/overview/)를 참조하세요.

Studio 노트북에서 Flink SQL 쿼리를 사용하여 스트리밍 데이터를 쿼리할 수 있습니다. Python (Table API) 및 Scala (Table and Datastream APIs) 를 사용하여 스트리밍 데이터를 대화형 방식으로 쿼리하는 프로그램을 작성할 수도 있습니다. 쿼리 또는 프로그램의 결과를 보고, 몇 초 만에 업데이트한 다음, 다시 실행하여 업데이트된 결과를 볼 수 있습니다.

## 플링크 인터프리터
<a name="how-zeppelin-interactive-interpreters"></a>

*인터프리터*를 사용하여 Managed Service for Apache Flink에서 애플리케이션을 실행하는 데 사용할 언어를 지정합니다. Managed Service for Apache Flink와 함께 다음 인터프리터를 사용할 수 있습니다.


| 이름 | Class | 설명 | 
| --- |--- |--- |
| %flink | FlinkInterpreter | Creates ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment and provides a Scala environment | 
| %flink.pyflink | PyFlinkInterpreter | Provides a python environment | 
| %flink.ipyflink | IPyFlinkInterpreter | Provides an ipython environment | 
| %flink.ssql | FlinkStreamSqlInterpreter | Provides a stream sql environment | 
| %flink.bsql | FlinkBatchSqlInterpreter | Provides a batch sql environment | 

Flink 인터프리터에 대한 자세한 내용은 [Apache Zeppelin용 Flink 인터프리터](https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html)를 참조하세요.

`%flink.pyflink` 또는 `%flink.ipyflink`를 인터프리터로 사용하는 경우 노트북에서 결과를 시각화하려면 `ZeppelinContext`를 사용해야 합니다.

PyFlink 관련 예제에 대한 자세한 내용은 [Apache Flink Studio 및 Python용 관리형 서비스를 사용하여 대화형 방식으로 데이터 스트림을 쿼리하는 내용](https://aws.amazon.com/blogs/big-data/query-your-data-streams-interactively-using-kinesis-data-analytics-studio-and-python/)을 참조하세요.

## Apache Flink 테이블 환경 변수
<a name="how-zeppelin-interactive-env-vars"></a>

Apache Zeppelin은 환경 변수를 사용하여 테이블 환경 리소스에 액세스할 수 있습니다.

다음 변수를 사용하여 Scala 테이블 환경 리소스에 액세스할 수 있습니다.


| 변수 | Resource | 
| --- |--- |
| senv | StreamExecutionEnvironment | 
| stenv | 블링크 플래너를 위한 StreamTableEnvironment | 

다음 변수를 사용하여 Python 테이블 환경 리소스에 액세스합니다.


| 변수 | Resource | 
| --- |--- |
| s\$1env | StreamExecutionEnvironment | 
| st\$1env | 블링크 플래너를 위한 StreamTableEnvironment | 

테이블 환경 사용에 관한 자세한 내용은 Apache Flink 설명서의 [개념 및 공통 API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/common/)를 참조하세요.

# 지속 가능한 상태의 애플리케이션으로 배포
<a name="how-notebook-durable"></a>

코드를 빌드하고 Amazon S3로 내보낼 수 있습니다. 노트에 작성한 코드를 지속적으로 실행되는 스트림 처리 애플리케이션으로 승격할 수 있습니다. Managed Service for Apache Flink에서 Apache Flink 애플리케이션을 실행하는 두 가지 모드가 있습니다. Studio 노트북을 사용하면 코드를 대화형 방식으로 개발하고, 코드 결과를 실시간으로 보고, 노트 내에서 시각화할 수 있습니다. 스트리밍 모드에서 실행할 노트를 배포하면 Managed Service for Apache Flink가 지속적으로 실행되는 애플리케이션을 생성하고, 소스에서 데이터를 읽고, 대상에 쓰고, 장기 실행 애플리케이션 상태를 유지하고, 소스 스트림의 처리량에 따라 자동으로 스케일링하는 애플리케이션을 생성합니다.

**참고**  
애플리케이션 코드를 내보내는 S3 버킷은 Studio 노트북과 동일한 리전에 있어야 합니다.

다음 기준을 충족하는 경우에만 Studio 노트북의 노트를 배포할 수 있습니다.
+ 단락은 순차적으로 정렬해야 합니다. 애플리케이션을 배포하면 노트 내의 모든 단락이 노트에 나타나는 대로, 그리고 순차적으로 (왼쪽에서 오른쪽, 위에서 아래로) 실행됩니다. 노트에서 **모든 단락 실행**을 선택하여 이 순서를 확인할 수 있습니다.
+ 코드는 Python과 SQL 또는 Scala와 SQL의 조합입니다. 현재 애플리케이션으로 배포할 경우 Python과 Scala를 함께 지원하지 않습니다.
+ 노트에는`%flink`, `%flink.ssql`, `%flink.pyflink`, `%flink.ipyflink`, `%md`와(과) 같은 인터프리터만 있어야 합니다.
+ [Zeppelin 컨텍스트](https://zeppelin.apache.org/docs/0.9.0/usage/other_features/zeppelin_context.html) 객체 `z`의 사용은 지원되지 않습니다. 아무것도 반환하지 않는 메서드는 경고를 기록하는 것 외에는 아무것도 수행하지 않습니다. 다른 메서드는 Python 예외를 발생시키거나 Scala에서 컴파일하지 못합니다.
+ 노트 하나로 Apache Flink 작업이 한 번 발생해야 합니다.
+ [동적 양식](https://zeppelin.apache.org/docs/0.9.0/usage/dynamic_form/intro.html)이 포함된 노트는 애플리케이션으로 배포할 수 없습니다.
+ %md ([Markdown](https://zeppelin.apache.org/docs/0.9.0/interpreter/markdown.html)) 단락은 애플리케이션의 일부로 실행하기에 부적합한 사람이 읽을 수 있는 문서를 포함할 것으로 예상되어 애플리케이션으로 배포할 때는 생략됩니다.
+ Zeppelin 내에서 실행되지 않는 문단은 애플리케이션으로 배포할 때 생략됩니다. 비활성화된 단락이 호환되지 않는 인터프리터를 사용하는 경우, 예를 들어 `%flink` `and %flink.ssql` 인터프리터가 있는 노트 `%flink.ipyflink`을(를) 사용하더라도 노트를 애플리케이션으로 배포하는 동안에는 해당 단락을 건너뛰고 오류가 발생하지 않습니다.
+ 애플리케이션 배포가 성공하려면 실행할 수 있는 소스 코드 (Flink SQL, PyFlink 또는 Flink Scala) 가 포함된 단락이 하나 이상 있어야 합니다.
+ 단락 내에서 인터프리터 지시문에 병렬성을 설정하는 경우 (예: `%flink.ssql(parallelism=32)`) 노트에서 배포된 애플리케이션에서는 무시됩니다. 대신 , AWS Management Console AWS Command Line Interface 또는 AWS API를 통해 배포된 애플리케이션을 업데이트하여 애플리케이션에 필요한 병렬 처리 수준에 따라 병렬 처리 및/또는 ParallelismPerKPU 설정을 변경하거나 배포된 애플리케이션에 대해 자동 크기 조정을 활성화할 수 있습니다.
+ 지속 가능한 상태의 애플리케이션으로 배포하는 경우 VPC는 인터넷에 액세스할 수 있어야 합니다. VPC가 인터넷에 액세스할 수 없는 경우 [인터넷에 접속할 수 없는 VPC에서 지속 가능한 상태의 애플리케이션으로 배포](how-zeppelin-troubleshooting.md#how-zeppelin-troubleshooting-deploying-no-internet) 섹션을 참조하세요.

## Scala/Python 기준
<a name="how-notebook-durable-scala"></a>
+ Scala 또는 Python 코드에서는 이전의 “Flink” 플래너(Scala용 `senv`, `stenv`; Python용 `s_env`, `st_env`)가 아닌 [Blink 플래너](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/#dependency-structure) (Scala의 경우 `stenv_2`, Python의 경우 `st_env_2`)를 사용하세요. Apache Flink 프로젝트에서는 프로덕션 사용 사례에 Blink 플래너를 사용할 것을 권장하는데, 이 플래너는 Zeppelin과 Flink의 기본 플래너입니다.
+ Python 단락은 애플리케이션으로 배포하기 위한 노트와 같이 `!` 또는 `%timeit`와(과) `%conda`같은 [IPython 매직 명령](https://ipython.readthedocs.io/en/stable/interactive/magics.html)을 사용하여 [셸 호출/할당](https://ipython.readthedocs.io/en/stable/interactive/python-ipython-diff.html#shell-assignment)을 사용해서는 안 됩니다.
+ `map` 및 `filter` 같은 고차 데이터 흐름 연산자에 전달되는 함수의 파라미터로는 Scala 케이스 클래스를 사용할 수 없습니다. Scala 케이스 클래스에 대한 자세한 내용은 Scala 설명서의 [케이스 클래스](https://docs.scala-lang.org/overviews/scala-book/case-classes.html)를 참조하세요

## SQL 기준
<a name="how-notebook-durable-sql"></a>
+ 데이터를 전달할 수 있는 단락의 출력 섹션과 같은 곳이 없기 때문에 단순 SELECT 문은 허용되지 않습니다.
+ 어떤 단락에서든 DDL 문 (`USE`, `CREATE`, `ALTER`, `DROP`,`SET`, `RESET`) 은 DML(`INSERT`) 문 앞에 와야 합니다. 이는 한 단락의 DML 명령문을 단일 Flink 작업으로 함께 제출해야 하기 때문입니다.
+ DML 명령문이 포함된 단락은 최대 하나여야 합니다. 애플리케이션으로 배포 기능의 경우 Flink에 단일 작업을 제출하는 것만을 지원하기 때문입니다.

자세한 내용과 예제는 [Amazon Managed Service for Apache Flink, Amazon Translate 및 Amazon Comprehend에서 SQL 함수를 사용하여 스트리밍 데이터를 번역, 편집 및 분석하기](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesisanalytics-MyApplicatioamazon-translate-and-amazon-comprehend/)를 참조하세요.

# Studio 노트북의 IAM 권한 검토
<a name="how-zeppelin-iam"></a>

Apache Flink용 관리형 서비스는 AWS Management Console을(를) 통해 Studio 노트북을 생성할 때 IAM 역할을 생성합니다. 또한 다음 액세스를 허용하는 정책을 해당 역할과 연결합니다.


****  

| 서비스 | 액세스  | 
| --- | --- | 
| CloudWatch Logs | 나열 | 
| Amazon EC2 | List | 
| AWS Glue | 읽기, 쓰기 | 
| Managed Service for Apache Flink | 읽기 | 
| Managed Service for Apache Flink V2 | 읽기 | 
| Amazon S3 | 읽기, 쓰기 | 

# 커넥터 및 종속성 사용
<a name="how-zeppelin-connectors"></a>

커넥터를 사용하면 다양한 기술에서 데이터를 읽고 쓸 수 있습니다. Managed Service for Apache Flink는 Studio 노트북과 함께 세 개의 기본 커넥터를 번들로 제공합니다. 사용자 지정 커넥터를 사용할 수도 있습니다. 커넥터에 대한 자세한 내용은 Apache Flink 설명서의 [테이블 및 SQL 커넥터](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/)를 참조하세요.

## 기본 커넥터
<a name="zeppelin-default-connectors"></a>

 AWS Management Console 를 사용하여 Studio 노트북을 생성하는 경우 Managed Service for Apache Flink에는 기본적으로 및 사용자 지정 커넥터가 포함됩니다`flink-sql-connector-kinesis``flink-connector-kafka_2.12``aws-msk-iam-auth`. 이러한 사용자 지정 커넥터 없이 콘솔을 통해 Studio 노트북을 만들려면 **사용자 지정 설정으로 만들기** 옵션을 선택합니다. 그런 다음 **구성** 페이지로 이동하면 두 커넥터 옆에 있는 확인란의 선택을 취소하세요.

[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) API를 사용하여 Studio 노트북을 만드는 경우, `flink-sql-connector-flink` 및 `flink-connector-kafka` 커넥터는 기본적으로 포함되지 않습니다. 추가하려면 다음 예와 같이 `CustomArtifactsConfiguration` 데이터 유형에 `MavenReference`(으)로 지정하세요.

`aws-msk-iam-auth` 커넥터는 Amazon MSK에서 사용할 커넥터로, IAM으로 자동 인증하는 기능이 포함되어 있습니다.

**참고**  
다음 예제에 표시된 커넥터 버전은 지원되는 유일한 버전입니다.

```
For the Kinesis connector:

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "org.apache.flink",

      "ArtifactId": "flink-sql-connector-kinesis",
      "Version": "1.15.4"

   }      
}]

For authenticating with AWS MSK through AWS IAM:

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "software.amazon.msk",
      "ArtifactId": "aws-msk-iam-auth",
      "Version": "1.1.6"
   }      
}]
            
For the Apache Kafka connector:  

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "org.apache.flink",

      "ArtifactId": "flink-connector-kafka",
      "Version": "1.15.4"

   }      
}]
```

기존 노트북에 이러한 커넥터를 추가하려면 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) API 작업을 사용하고 `CustomArtifactsConfigurationUpdate` 데이터 유형에 `MavenReference`(으)로 지정하세요.

**참고**  
테이블 API의 `flink-sql-connector-kinesis` 커넥터에 대해 `failOnError`을(를) true로 설정할 수 있습니다.

## 종속성 및 사용자 정의 커넥터 추가
<a name="zeppelin-custom-connectors"></a>

 AWS Management Console 를 사용하여 Studio 노트북에 종속성 또는 사용자 지정 커넥터를 추가하려면 다음 단계를 따릅니다.

1. 사용자 정의 커넥터 파일을 Amazon S3에 업로드합니다.

1. 에서 Studio 노트북**을 생성하기 위한 사용자 지정** 생성 옵션을 AWS Management Console선택합니다.

1. **구성** 단계에 도달할 때까지 Studio 노트북 제작 워크플로를 따르세요.

1. **사용자 지정 커넥터** 섹션에서 사용자 **지정 커넥터 추가**를 선택합니다.

1. 종속성 또는 사용자 지정 커넥터의 Amazon S3 위치를 지정합니다.

1. **변경 사항 저장**을 선택합니다.

[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) API를 사용하여 새 Studio 노트북을 생성할 때 종속성 JAR 또는 사용자 지정 커넥터를 추가하려면 종속성 JAR의 Amazon S3 위치 또는 `CustomArtifactsConfiguration` 데이터 유형에서 사용자 지정 커넥터를 지정하세요. 기존 Studio 노트북에 종속성 또는 사용자 지정 커넥터를 추가하려면 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) API 작업을 호출하고 종속성 JAR의 Amazon S3 위치 또는 `CustomArtifactsConfigurationUpdate` 데이터 유형의 사용자 지정 커넥터를 지정하세요.

**참고**  
종속성이나 사용자 지정 커넥터를 포함할 때는 종속성 또는 사용자 지정 커넥터에 번들로 제공되지 않은 모든 전이적 종속성도 포함해야 합니다.

# 사용자 정의 함수 구현
<a name="how-zeppelin-udf"></a>

사용자 정의 함수 (UDF) 는 자주 사용되는 로직이나 쿼리에서 달리 표현할 수 없는 사용자 지정 로직을 호출할 수 있는 확장점입니다. Python 또는 Java 또는 Scala와 같은 JVM 언어를 사용하여 Studio 노트북 내에서 단락으로 UDF를 구현할 수 있습니다. JVM 언어로 구현된 UDF가 포함된 외부 JAR 파일을 Studio 노트북에 추가할 수도 있습니다.

서브클래스 `UserDefinedFunction` (또는 자체 추상 클래스)가 있는 추상 클래스를 등록하는 JAR을 구현할 때는 Apache Maven에서 제공된 범위, Gradle에서 `compileOnly` 종속성 선언, SBT에서 제공된 범위 또는 UDF 프로젝트 빌드 구성에서 동등한 디렉티브를 사용하세요. 이렇게 하면 UDF 소스 코드를 Flink API에 대해 컴파일할 수 있지만 Flink API 클래스 자체는 빌드 아티팩트에 포함되지 않습니다. Maven 프로젝트의 이러한 전제 조건을 준수하는 UDF jar 예제의 다음 [pom](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47)을 참조하세요.

**참고**  
예제 설정은 기계 학습 블로그 에서 *AWS 기계 학습 블로그*에서 [Amazon Managed Service for Apache Flink, Amazon Translate, 그리고 Amazon Comprehend를 사용하여 SQL 기능으로 스트리밍 데이터를 번역, 수정 및 분석](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/)을 참조하세요.

콘솔을 사용하여 UDF JAR 파일을 Studio 노트북에 추가하려면 다음 단계를 따르세요.

1. Amazon S3에 UDF JAR 파일을 업로드합니다.

1. 에서 Studio 노트북**을 생성하기 위한 사용자 지정** 생성 옵션을 AWS Management Console선택합니다.

1. **구성** 단계에 도달할 때까지 Studio 노트북 제작 워크플로를 따르세요.

1. **사용자 정의 함수** 섹션에서 **사용자 정의 함수 추가**를 선택합니다.

1. UDF를 구현한 JAR 파일 또는 ZIP 파일의 Amazon S3 위치를 지정하세요.

1. **변경 사항 저장**을 선택합니다.

[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) API를 사용하여 새 Studio 노트북을 만들 때 UDF JAR을 추가하려면 `CustomArtifactConfiguration` 데이터 유형에 JAR 위치를 지정하세요. 기존 Studio 노트북에 UDF JAR을 추가하려면 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) API 작업을 호출하고 `CustomArtifactsConfigurationUpdate` 데이터 유형에 JAR 위치를 지정하세요. 또는를 사용하여 Studio 노트북 AWS Management Console 에 UDF JAR 파일을 추가할 수 있습니다.

## 사용자 정의 함수 관련 고려 사항
<a name="how-zeppelin-udf-considerations"></a>
+ Managed Service for Apache Flink Studio는 [Apache Zeppelin 용어](https://zeppelin.apache.org/docs/0.9.0/quickstart/explore_ui.html)를 사용합니다. 여기서 노트북은 여러 노트를 포함할 수 있는 Zeppelin 인스턴스입니다. 그러면 각 노트에는 여러 단락이 포함될 수 있습니다. Managed Service for Apache Flink Studio를 사용하면 인터프리터 프로세스가 노트북의 모든 노트에서 공유됩니다. 따라서 한 노트에서 [CreateTemporarySystemFunction](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableEnvironment.html#createTemporarySystemFunction-java.lang.String-java.lang.Class-)을 사용하여 명시적 함수 등록을 수행하면 같은 노트북의 다른 노트에서도 동일한 함수를 있는 그대로 참조할 수 있습니다.

  하지만 *애플리케이션으로 배포* 작업은 *개별* 노트에서만 작동하며 노트북의 모든 노트에 적용할 수는 없습니다. 애플리케이션으로 배포를 수행하는 경우 활성 노트의 내용만 애플리케이션을 생성하는 데 사용됩니다. 다른 노트북에서 수행된 명시적 함수 등록은 생성된 애플리케이션 종속성에 포함되지 않습니다. 또한 애플리케이션으로 배포 옵션을 사용하는 경우 JAR의 기본 클래스 이름을 소문자 문자열로 변환하여 암시적 함수를 등록할 수 있습니다.

   예를 들어 `TextAnalyticsUDF`이(가) UDF JAR의 기본 클래스인 경우 암시적 등록으로 인해 함수 이름은 `textanalyticsudf`(으)로 생성됩니다. 따라서 스튜디오의 노트 1에서 다음과 같은 명시적 함수 등록이 발생하면 공유 인터프리터로 인해 해당 노트북의 다른 모든 노트(예: 노트 2)가 함수를 이름을 `myNewFuncNameForClass`을(를) 참조할 수 있습니다.

  `stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())`

   하지만 노트 2에서 애플리케이션으로 배포하기 작업 중에는 이러한 명시적 등록이 종속성에 *포함되지 않으므로* 배포된 애플리케이션이 예상대로 작동하지 않습니다. 암시적 등록으로 인해 기본적으로 이 함수에 대한 모든 참조는 `myNewFuncNameForClass`이(가) 아닌 `textanalyticsudf`와(과) 함께 있어야 합니다.

   사용자 정의 함수 이름 등록이 필요한 경우 노트 2 자체에는 다음과 같이 명시적 등록을 다시 수행하는 다른 단락이 포함될 것으로 예상됩니다.

  ```
  %flink(parallelism=l)
  import com.amazonaws.kinesis.udf.textanalytics.TextAnalyticsUDF 
  # re-register the JAR for UDF with custom name
  stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())
  ```

  ```
  %flink. ssql(type=update, parallelism=1) 
  INSERT INTO
      table2
  SELECT
      myNewFuncNameForClass(column_name)
  FROM
      table1
  ;
  ```
+ UDF JAR에 Flink SDK가 포함된 경우 UDF 소스 코드가 Flink SDK에 대해 컴파일될 수 있지만 Flink SDK 클래스 자체는 빌드 아티팩트에 포함되지 않도록 JAR과 같이 자바 프로젝트를 구성하세요.

  Apache Maven에서는 `provided` 범위를, Gradle에서는 `compileOnly` 종속성 선언을, SBT에서는 `provided` 범위를, UDF 프로젝트 빌드 구성에서는 이에 상응하는 디렉티브를 사용할 수 있습니다. 메이븐 프로젝트의 이러한 전제 조건을 준수하는 UDF jar 예제에서 이 [pom](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47)을 참조할 수 있습니다. 완전한 단계별 자습서는 [Translate, redact and analyze streaming data using SQL functions with Amazon Managed Service for Apache Flink, Amazon Translate, and Amazon Comprehend](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/)를 참조하세요.

# 체크포인트 지정 활성화
<a name="how-zeppelin-checkpoint"></a>

환경 설정을 사용하여 체크포인트를 활성화할 수 있습니다. 체크포인트에 대한 자세한 내용은 [Managed Service for Apache Flink Developer Guide](https://docs.aws.amazon.com/managed-flink/latest/java/)의 [Fault Tolerance](https://docs.aws.amazon.com/managed-flink/latest/java/how-fault.html)를 참조하세요.

## 체크포인트 간격 설정
<a name="how-zeppelin-checkpoint-interval"></a>

다음 Scala 코드 예제는 애플리케이션의 체크포인트 간격을 1분으로 설정합니다.

```
// start a checkpoint every 1 minute
stenv.enableCheckpointing(60000)
```

다음 Python 코드 예제는 애플리케이션의 체크포인트 간격을 1분으로 설정합니다.

```
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.interval", "1min"
)
```

## 체크포인트 유형 설정
<a name="how-zeppelin-checkpoint-type"></a>

다음 Scala 코드 예제는 애플리케이션의 체크포인트 모드를 `EXACTLY_ONCE`(기본값)(으)로 설정합니다.

```
// set mode to exactly-once (this is the default)
stenv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
```

다음 Python 코드 예제는 애플리케이션의 체크포인트 모드를 `EXACTLY_ONCE`(기본값)으(로) 설정합니다.

```
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.mode", "EXACTLY_ONCE"
)
```

# Studio 런타임 업그레이드
<a name="upgrading-studio-runtime"></a>

이 섹션에는 Studio 노트북 런타임을 업그레이드하는 방법에 대한 정보를 제공합니다. 항상 최신 지원 Studio 런타임으로 업그레이드할 것을 권장합니다.

## 노트북을 새 Studio 런타임으로 업그레이드
<a name="upgrading-notebook"></a>

Studio 사용 방식에 따라 런타임 업그레이드 단계가 달라집니다. 사용 사례에 맞는 옵션을 선택합니다.

### 외부 종속성이 없는 SQL 쿼리 또는 Python 코드
<a name="notebook-no-dependencies"></a>

SQL 또는 Python을 외부 종속성 없이 사용 중이면 다음 런타임 업그레이드 프로세스를 사용하세요. 최신 런타임 버전으로 업그레이드하는 것이 좋습니다. 업그레이드의 출발 런타임 버전과 관계없이 업그레이드 프로세스는 동일합니다.

1. 최신 런타임을 사용하여 새 Studio 노트북을 생성합니다.

1. 기존 노트북의 모든 노트 코드를 새 노트북으로 복사하여 붙여 넣습니다.

1. 새 노트북에서 이전 버전에서 변경된 Apache Flink 기능과 호환되도록 코드를 조정합니다.
   + 새 노트북을 실행합니다. 노트북을 열고 노트별로 순서대로 실행하여 동작을 테스트합니다.
   + 필요한 코드 변경 사항을 적용합니다.
   + 새 노트북 실행을 중지합니다.

1. 기존 노트북을 애플리케이션으로 배포한 경우:
   + 새 노트북을 별도의 신규 애플리케이션으로 배포합니다.
   + 기존 애플리케이션을 중지합니다.
   + 스냅샷 없이 새 애플리케이션을 실행합니다.

1. 기존 노트북이 실행 중이면 중지합니다. 필요한 경우 새 노트북을 실행하여 대화형으로 사용합니다.

**외부 종속성이 없는 업그레이드 프로세스 흐름**

![\[다음 다이어그램은 외부 종속성이 없는 노트북을 업그레이드하기 위한 권장 워크플로를 나타냅니다.\]](http://docs.aws.amazon.com/ko_kr/managed-flink/latest/java/images/MSF-Studio-upgrade-without-dependencies.png)


### 외부 종속성이 있는 SQL 쿼리 또는 Python 코드
<a name="notebook-dependencies"></a>

커넥터나 Python 또는 Java로 구현된 사용자 정의 함수와 같은 사용자 지정 아티팩트 등 외부 종속성을 사용하는 SQL 또는 Python을 사용 중인 경우 이 프로세스를 따르세요. 최신 런타임으로 업그레이드할 것을 권장합니다. 업그레이드를 시작하는 런타임 버전과 관계없이 프로세스는 동일합니다.

1. 최신 런타임을 사용하여 새 Studio 노트북을 생성합니다.

1. 기존 노트북의 모든 노트 코드를 새 노트북으로 복사하여 붙여 넣습니다.

1. 외부 종속성과 사용자 지정 아티팩트를 업데이트합니다.
   + 새 런타임의 Apache Flink 버전과 호환되는 새 커넥터를 확인합니다. 해당 Flink 버전에 맞는 올바른 커넥터를 찾으려면 Apache Flink 설명서의 [Table 및 SQL 커넥터](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/overview/)를 참조하세요.
   + 사용자 정의 함수의 코드를 Apache Flink API 변경 사항에 맞게 업데이트하고 사용자 정의 함수에서 사용하는 Python 또는 JAR 종속성도 최신 버전에 맞게 변경합니다. 업데이트된 사용자 지정 아티팩트를 리패키징합니다.
   + 이러한 새로운 커넥터와 아티팩트를 새 노트북에 추가합니다.

1. 새 노트북에서 이전 버전에서 변경된 Apache Flink 기능과 호환되도록 코드를 조정합니다.
   + 새 노트북을 실행합니다. 노트북을 열고 노트별로 순서대로 실행하여 동작을 테스트합니다.
   + 필요한 코드 변경 사항을 적용합니다.
   + 새 노트북 실행을 중지합니다.

1. 기존 노트북을 애플리케이션으로 배포한 경우:
   + 새 노트북을 별도의 신규 애플리케이션으로 배포합니다.
   + 기존 애플리케이션을 중지합니다.
   + 스냅샷 없이 새 애플리케이션을 실행합니다.

1. 기존 노트북이 실행 중이면 중지합니다. 필요한 경우 새 노트북을 실행하여 대화형으로 사용합니다.

**외부 종속성이 있는 업그레이드 프로세스 흐름**

![\[다음 다이어그램은 외부 종속성이 있는 노트북을 업그레이드하기 위한 권장 워크플로를 나타냅니다.\]](http://docs.aws.amazon.com/ko_kr/managed-flink/latest/java/images/MSF-Studio-upgrade-with-dependencies.png)


# 작업 AWS Glue
<a name="how-zeppelin-glue"></a>

Studio 노트북은 데이터 소스 및 싱크에 대한 정보를 저장하고 가져옵니다 AWS Glue. Studio 노트북을 생성할 때 연결 정보가 포함된 AWS Glue 데이터베이스를 지정합니다. 데이터 소스 및 싱크에 액세스할 때 데이터베이스에 포함된 AWS Glue 테이블을 지정합니다. AWS Glue 테이블은 데이터 소스 및 대상의 위치, 스키마 및 파라미터를 정의하는 AWS Glue 연결에 대한 액세스를 제공합니다.

Studio 노트북은 테이블 속성을 사용하여 애플리케이션별 데이터를 저장합니다. 자세한 내용은 [테이블 속성](how-zeppelin-glue-properties.md) 단원을 참조하십시오.

Studio 노트북에 사용할 AWS Glue 연결, 데이터베이스 및 테이블을 설정하는 방법에 대한 예는 [AWS Glue 데이터베이스 생성](example-notebook.md#example-notebook-glue) [자습서: Managed Service for Apache Flink에서 Studio 노트북 생성](example-notebook.md) 자습서의 섹션을 참조하세요.

# 테이블 속성
<a name="how-zeppelin-glue-properties"></a>

 AWS Glue 테이블은 데이터 필드 외에도 테이블 속성을 사용하여 Studio 노트북에 다른 정보를 제공합니다. Managed Service for Apache Flink는 다음 AWS Glue 테이블 속성을 사용합니다.
+ [Apache Flink 시간 값 정의](#how-zeppelin-glue-timestamp): 이러한 속성은 Managed Service for Apache Flink가 Apache Flink의 내부 데이터 처리 시간 값을 내보내는 방법을 정의합니다.
+ [Flink 커넥터 및 포맷 속성](#how-zeppelin-glue-connector): 이러한 속성은 데이터 스트림에 대한 정보를 제공합니다.

 AWS Glue 테이블에 속성을 추가하려면 다음을 수행합니다.

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) AWS Glue 콘솔을 엽니다.

1. 테이블 목록에서 애플리케이션이 데이터 연결 정보를 저장하는 데 사용하는 테이블을 선택합니다. **작업**, **테이블 세부 정보 편집**을 선택합니다.

1. **테이블 속성**에서 **managed-flink.proctime** **키**와 **user\$1action\$1time** **값**을 입력합니다.

## Apache Flink 시간 값 정의
<a name="how-zeppelin-glue-timestamp"></a>

Apache Flink는 [처리 시간](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#processing-time) 및 [이벤트 시간](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#event-time)과 같은 스트림 처리 이벤트가 발생한 시간 값을 제공합니다. 애플리케이션 출력에 이러한 값을 포함하려면 Managed Service for Apache Flink 런타임에 이러한 값을 지정된 필드로 내보내도록 지시하는 속성을 AWS Glue 테이블에 정의합니다.

테이블 속성에서 사용하는 키와 값은 다음과 같습니다.


| 타임스탬프 유형 | Key(키) | 값 | 
| --- |--- |--- |
| [ 처리 시간](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#processing-time) | managed-flink.proctime | The column name that AWS Glue will use to expose the value. This column name does not correspond to an existing table column. | 
| [ 이벤트 시간](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#event-time) | managed-flink.rowtime | The column name that AWS Glue will use to expose the value. This column name corresponds to an existing table column. | 
| managed-flink.watermark.*column\$1name*.milliseconds | The watermark interval in milliseconds | 

## Flink 커넥터 및 포맷 속성
<a name="how-zeppelin-glue-connector"></a>

 AWS Glue 테이블 속성을 사용하여 애플리케이션의 Flink 커넥터에 데이터 소스에 대한 정보를 제공합니다. Managed Service for Apache Flink에서 커넥터에 사용하는 속성의 몇 가지 예는 다음과 같습니다.


| 커넥터 유형 | Key(키) | 값 | 
| --- |--- |--- |
| [ Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/kafka.html#connector-options) | 형식 | The format used to deserialize and serialize Kafka messages, e.g. json or csv. | 
| scan.startup.mode | The startup mode for the Kafka consumer, e.g. earliest-offset or timestamp. | 
| [ Kinesis](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kinesis.html#connector-options) | 형식 | The format used to deserialize and serialize Kinesis data stream records, e.g. json or csv. | 
| aws.region | The AWS region where the stream is defined.  | 
| [ S3 (Filesystem)](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html) | format | The format used to deserialize and serialize files, e.g. json or csv. | 
| 경로 | The Amazon S3 path, e.g. s3://mybucket/. | 

Kinesis 및 Apache Kafka 이외의 다른 커넥터에 대한 자세한 내용은 커넥터의 설명서를 참조하세요.

# Managed Service for Apache Flink의 Studio 노트북 예제 및 자습서
<a name="how-zeppelin-examples"></a>

**Topics**
+ [자습서: Managed Service for Apache Flink에서 Studio 노트북 생성](example-notebook.md)
+ [자습서: 지속 가능한 상태를 사용하는 Managed Service for Apache Flink 애플리케이션으로 Studio 노트북 배포](example-notebook-deploy.md)
+ [Studio 노트북에서 데이터를 분석하기 위한 예제 쿼리 보기](how-zeppelin-sql-examples.md)

# 자습서: Managed Service for Apache Flink에서 Studio 노트북 생성
<a name="example-notebook"></a>

다음 자습서는 Kinesis 데이터 스트림 또는 Amazon MSK 클러스터에서 데이터를 읽는 Studio 노트북을 생성하는 방법을 보여줍니다.

**Topics**
+ [사전 조건 완료](#example-notebook-setup)
+ [AWS Glue 데이터베이스 생성](#example-notebook-glue)
+ [다음 단계: Kinesis Data Streams 또는 Amazon MSK를 사용한 Studio 노트북 생성](#examples-notebook-nextsteps)
+ [Kinesis Data Streams로 Studio 노트북 생성](example-notebook-streams.md)
+ [Amazon MSK로 Studio 노트북 생성](example-notebook-msk.md)
+ [애플리케이션 및 종속 리소스 정리](example-notebook-cleanup.md)

## 사전 조건 완료
<a name="example-notebook-setup"></a>

 AWS CLI 가 버전 2 이상인지 확인합니다. 최신 버전을 설치하려면 버전 2 설치, 업데이트 및 제거를 AWS CLI참조하세요. [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) 

## AWS Glue 데이터베이스 생성
<a name="example-notebook-glue"></a>

Studio 노트북은 Amazon MSK 데이터 소스에 대한 메타데이터용 [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) 데이터베이스를 사용합니다.

**AWS Glue 데이터베이스 생성**

1. [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) AWS Glue 콘솔을 엽니다.

1. **데이터베이스 추가(Add database)**를 선택합니다. **데이터베이스 추가** 창에서 **데이터베이스 이름**을 **default**(으)로 입력합니다. **생성(Create)**을 선택합니다.

## 다음 단계: Kinesis Data Streams 또는 Amazon MSK를 사용한 Studio 노트북 생성
<a name="examples-notebook-nextsteps"></a>

이 자습서에서는 Kinesis Data Streams 또는 Amazon MSK를 사용하는 Studio 노트북을 만들 수 있습니다.
+ [Kinesis Data Streams로 Studio 노트북 생성](example-notebook-streams.md): Kinesis Data Streams 를 사용하면 Kinesis 데이터 스트림을 소스로 사용하는 애플리케이션을 빠르게 생성할 수 있습니다. Kinesis 데이터 스트림을 종속 리소스로 생성하기만 하면 됩니다.
+ [Amazon MSK로 Studio 노트북 생성](example-notebook-msk.md): Amazon MSK를 사용하면 Amazon MSK 클러스터를 소스로 사용하는 애플리케이션을 생성할 수 있습니다. Amazon VPC, Amazon EC2 클라이언트 인스턴스, Amazon MSK 클러스터를 종속 리소스로 생성해야 합니다.

# Kinesis Data Streams로 Studio 노트북 생성
<a name="example-notebook-streams"></a>

이 자습서에서는 Kinesis 데이터 스트림을 소스로 사용하는 Studio 노트북을 생성하는 방법을 설명합니다.

**Topics**
+ [사전 조건 완료](#example-notebook-streams-setup)
+ [AWS Glue 테이블 생성](#example-notebook-streams-glue)
+ [Kinesis Data Streams로 Studio 노트북 생성](#example-notebook-streams-create)
+ [Kinesis 데이터 스트림으로 데이터 전송](#example-notebook-streams-send)
+ [Studio 노트북을 테스트](#example-notebook-streams-test)

## 사전 조건 완료
<a name="example-notebook-streams-setup"></a>

Studio 노트북을 생성하기 전에 Kinesis 데이터 스트림(`ExampleInputStream`)을 생성하세요. 애플리케이션은 이 스트림을 애플리케이션 소스로 사용합니다.

Amazon Kinesis 콘솔 또는 다음 AWS CLI 명령을 사용하여 이러한 스트림을 만들 수 있습니다. 콘솔 지침은 *Amazon Kinesis Data Streams 개발자 가이드*의 [데이터 스트림 생성 및 업데이트](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)를 참조하세요. 스트림의 이름을 **ExampleInputStream**(으)로 지정하고 **열린 샤드 수**를 **1**(으)로 설정합니다.

를 사용하여 스트림(`ExampleInputStream`)을 생성하려면 다음 Amazon Kinesis `create-stream` AWS CLI 명령을 AWS CLI사용합니다.

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## AWS Glue 테이블 생성
<a name="example-notebook-streams-glue"></a>

Studio 노트북은 Kinesis Data Streams 데이터 소스에 대한 메타데이터용 [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) 데이터베이스를 사용합니다.

**참고**  
먼저 데이터베이스를 수동으로 생성하거나 노트북을 만들 때 Managed Service for Apache Flink가 자동으로 데이터베이스를 생성하도록 할 수 있습니다. 마찬가지로 이 섹션에 설명된 대로 테이블을 수동으로 생성하거나 Apache Zeppelin 내에서 노트북의 Managed Service for Apache Flink용 테이블 커넥터 코드 생성을 사용하여 DDL 문을 통해 테이블을 생성할 수 있습니다. 그런 다음 테이블 AWS Glue 이 올바르게 생성되었는지 확인할 수 있습니다.

**표 생성**

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) AWS Glue 콘솔을 엽니다.

1. 아직 AWS Glue 데이터베이스가 없는 경우 왼쪽 탐색 모음에서 **데이터베이스를** 선택합니다. **데이터베이스 추가**를 선택합니다. **데이터베이스 추가** 창에서 **데이터베이스 이름**을 **default**(으)로 입력합니다. **생성(Create)**을 선택합니다.

1. 왼쪽 탐색 모음에서 **표**를 선택합니다. **표** 페이지에서 **표 추가**, **표 수동 추가**를 선택합니다.

1. **표 속성 설정** 페이지에서 **표 명칭**에 **stock**을 입력합니다. 이전에 만든 데이터베이스를 선택했는지 확인하세요. **다음**을 선택합니다.

1. **데이터 스토어 추가** 페이지에서 **Kinesis**를 선택합니다. **스트림 이름**에는 **ExampleInputStream**을(를) 입력합니다. **Kinesis 소스 URL**의 경우 **https://kinesis.us-east-1.amazonaws.com** 입력을 선택합니다. **Kinesis 소스 URL**을 복사하여 붙여넣는 경우 선행 또는 후행 공백을 모두 삭제해야 합니다. **Next**(다음)를 선택합니다.

1. **분류** 페이지에서 **JSON**을 선택합니다. **Next**(다음)를 선택합니다.

1. **스키마 정의** 페이지에서 열 추가를 선택하여 열을 추가합니다. 다음 속성을 가진 열을 추가합니다.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/managed-flink/latest/java/example-notebook-streams.html)

   **Next**(다음)를 선택합니다.

1. 다음 페이지에서 설정을 확인하고 **마침**을 선택합니다.

1. 테이블 목록에서 새로 생성한 테이블을 선택합니다.

1. **테이블 편집**을 선택하고 `managed-flink.proctime` 키와 `proctime` 값이 있는 속성을 추가합니다.

1. **적용**을 선택합니다.

## Kinesis Data Streams로 Studio 노트북 생성
<a name="example-notebook-streams-create"></a>

애플리케이션에서 사용하는 리소스를 생성했으니 이제 Studio 노트북을 생성합니다.

**Topics**
+ [를 사용하여 Studio 노트북 생성 AWS Management Console](#example-notebook-create-streams-console)
+ [를 사용하여 Studio 노트북 생성 AWS CLI](#example-notebook-msk-create-api)

### 를 사용하여 Studio 노트북 생성 AWS Management Console
<a name="example-notebook-create-streams-console"></a>

1. [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **Managed Service for Apache Flink 애플리케이션** 페이지에서 **Studio** 탭을 선택합니다. **Studio 노트북 생성**을 선택합니다.
**참고**  
입력한 Amazon MSK 클러스터 또는 Kinesis 데이터 스트림을 선택하고 **실시간 데이터 처리**를 선택하여 Amazon MSK 또는 Kinesis Data Streams 콘솔에서 Studio 노트북을 생성할 수도 있습니다.

1. **Studio 노트북 생성** 페이지에서 다음 정보를 입력합니다.
   + 노트북 이름을 **MyNotebook**(으)로 입력합니다.
   + **AWS Glue 데이터베이스**의 **기본값**을 선택합니다.

   **Studio 노트북 생성**을 선택합니다.

1. **MyNotebook** 페이지에서 **실행**을 선택합니다. **상태**가 **실행 중**으로 표시될 때까지 기다리세요. 노트북이 실행 중일 때는 요금이 부과됩니다.

### 를 사용하여 Studio 노트북 생성 AWS CLI
<a name="example-notebook-msk-create-api"></a>

를 사용하여 Studio 노트북을 생성하려면 다음을 AWS CLI수행합니다.

1. 계정 ID를 확인합니다. 애플리케이션을 생성하려면 이 값을 사용합니다.

1. `arn:aws:iam::AccountID:role/ZeppelinRole` 역할을 생성하고 콘솔에서 자동 생성된 역할에 다음 권한을 추가합니다.

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. 다음 콘텐츠를 가진 `create.json`이라는 파일을 생성합니다: 자리 표시자 값을 정보로 바꿉니다.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 애플리케이션을 생성하려면 다음 명령을 실행합니다.

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 명령이 완료되면 새 Studio 노트북의 세부 정보를 보여주는 출력이 표시됩니다. 다음은 출력의 예제입니다.

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 애플리케이션을 시작하려면 다음 명령을 실행합니다. 샘플 값을 계정 ID로 바꿉니다.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Kinesis 데이터 스트림으로 데이터 전송
<a name="example-notebook-streams-send"></a>

Kinesis 데이터 스트림으로 테스트 데이터를 보내려면 다음을 수행하세요.

1. [Kinesis 데이터 제너레이터(KDG)](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)를 엽니다.

1. **CloudFormation으로 Cognito 사용자 만들기**를 선택합니다.

1.  CloudFormation 콘솔이 Kinesis Data Generator 템플릿과 함께 열립니다. **Next**(다음)를 선택합니다.

1. **스택 세부 정보 지정** 페이지에서 Cognito 사용자의 사용자 이름 및 암호를 입력합니다. **Next**(다음)를 선택합니다.

1. **스택 옵션 구성** 페이지에서 **다음**을 선택합니다.

1. **Kinesis-Data-Generator-Cognito-User 검토** 페이지에서 ** AWS CloudFormation이 IAM 리소스를 생성할 수 있음을 승인합니다.** 확인란을 선택합니다. **스택 생성**을 선택합니다.

1.  CloudFormation 스택 생성이 완료될 때까지 기다립니다. 스택이 완료되면 CloudFormation 콘솔에서 **Kinesis-Data-Generator-Cognito-User** 스택을 열고 **출력** 탭을 선택합니다. **KinesisDataGeneratorUrl** 출력 값에 대해 나열된 URL을 엽니다.

1. **Amazon Kinesis Data Generator** 페이지에서 4단계에서 생성한 보안 인증을 사용하여 로그인합니다.

1. 다음 페이지에서 다음 값을 입력합니다.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/managed-flink/latest/java/example-notebook-streams.html)

   **레코드 템플릿**의 경우 다음 코드를 붙여넣습니다.

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. **데이터 보내기**를 선택합니다.

1. 생성기가 데이터를 Kinesis 데이터 스트림으로 전송합니다.

   다음 섹션을 완료하는 동안 생성기를 계속 실행하세요.

## Studio 노트북을 테스트
<a name="example-notebook-streams-test"></a>

이 섹션에서는 Studio 노트북을 사용하여 Kinesis 데이터 스트림의 데이터를 쿼리합니다.

1. [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **Managed Service for Apache Flink 애플리케이션** 페이지에서 **Studio 노트북** 탭을 선택합니다. **MyNotebook**을 선택합니다.

1. **MyNotebook** 페이지에서 **Apache Zeppelin에서 열기**를 선택합니다.

   Apache Zeppelin 인터페이스가 새 탭에서 열립니다.

1. **제플린에 오신 것을 환영합니다\$1** 페이지에서 **Zeppelin Note**를 선택하세요.

1. **Zeppelin 노트** 페이지에서 새로운 노트에 다음 쿼리를 입력합니다.

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   실행 아이콘을 선택합니다.

   잠시 후 노트에 Kinesis 데이터 스트림의 데이터가 표시됩니다.

애플리케이션에서 작동 측면을 볼 수 있도록 Apache Flink 대시보드를 열려면 **FLINK JOB**을 선택합니다. Flink 대시보드에 대한 자세한 내용을 알아보려면 [Managed Service for Apache Flink 개발자 가이드](https://docs.aws.amazon.com/)의 [Apache Flink 대시보드](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)를 참조하세요.

Flink Streaming SQL 쿼리의 더 많은 예는 [Apache Flink 설명서](https://nightlies.apache.org/flink/flink-docs-release-1.15/)의 [쿼리](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)를 참조하세요.

# Amazon MSK로 Studio 노트북 생성
<a name="example-notebook-msk"></a>

이 자습서에서는 소스로 Amazon MSK 클러스터를 사용하는 Studio 노트북을 생성하는 방법을 설명합니다.

**Topics**
+ [Amazon MSK 클러스터 설정](#example-notebook-msk-setup)
+ [VPC에 NAT 게이트웨이 추가](#example-notebook-msk-nat)
+ [AWS Glue 연결 및 테이블 생성](#example-notebook-msk-glue)
+ [Amazon MSK로 Studio 노트북 생성](#example-notebook-msk-create)
+ [Amazon MSK 클러스터로 데이터 전송](#example-notebook-msk-send)
+ [Studio 노트북을 테스트](#example-notebook-msk-test)

## Amazon MSK 클러스터 설정
<a name="example-notebook-msk-setup"></a>

이 자습서에서는 일반 텍스트 액세스를 허용하는 Amazon MSK 클러스터가 필요합니다. Amazon MSK 클러스터를 아직 설정하지 않은 경우, [Amazon MSK 사용 시작하기](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) 자습서를 따라 Amazon VPC, Amazon MSK 클러스터, 주제 및 Amazon EC2 클라이언트 인스턴스를 생성하세요.

자습서에 따라 다음을 수행하십시오:
+ [3단계: Amazon MSK 클러스터 생성](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html)의 4단계에서 `ClientBroker` 값을 `TLS`에서 **PLAINTEXT**로 변경합니다.

## VPC에 NAT 게이트웨이 추가
<a name="example-notebook-msk-nat"></a>

[Amazon MSK 사용 시작하기](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) 자습서에 따라 Amazon MSK 클러스터를 생성했거나 기존 Amazon VPC에 프라이빗 서브넷용 NAT 게이트웨이가 아직 없는 경우, Amazon VPC에 NAT 게이트웨이를 추가해야 합니다. 다음 다이어그램은 아키텍처입니다.

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/ko_kr/managed-flink/latest/java/images/vpc_05.png)


Amazon VPC용 NAT 게이트웨이를 생성하려면 다음을 수행합니다.

1. [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)에서 Amazon VPC 콘솔을 엽니다.

1. 왼쪽 탐색 모음에서 **NAT 게이트웨이**를 선택합니다.

1. **NAT 게이트웨이** 페이지에서 **NAT 게이트웨이 생성**을 선택합니다.

1. **NAT 게이트웨이 생성** 페이지에서 다음 값을 입력합니다.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/managed-flink/latest/java/example-notebook-msk.html)

   **NAT 게이트웨이 생성**을 선택합니다.

1. 왼쪽 탐색 모음에서 **경로 표**를 선택합니다.

1. [**Create Route Table**]을 선택합니다.

1. **경로 표 생성** 페이지에서 다음 정보를 제공합니다.
   + **Name tag:** **ZeppelinRouteTable**
   + **VPC**: VPC(예: **AWS KafkaTutorialVPC**)를 선택합니다.

   **생성(Create)**을 선택합니다.

1. 경로 표 목록에서 **ZeppelinRouteTable**을 선택합니다. **경로** 탭에서 **경로 편집**을 선택합니다.

1. **경로 편집** 페이지에서 **경로 추가**를 선택합니다.

1. ****에서 **대상 주소**에 **0.0.0.0/0**을 입력합니다. **타겟**에서 **NAT 게이트웨이**, **ZeppelinGateway**를 선택합니다. **경로 저장**을 선택합니다. **닫기**를 선택하세요.

1. 경로 표 페이지에서 **ZeppelinRouteTable**을 선택한 상태에서 **서브넷 연결** 탭을 선택합니다. **서브넷 연결 편집**을 선택합니다.

1. **서브넷 연결 편집** 페이지에서 **AWS KafkaTutorialSubnet2**와 **AWS KafkaTutorialSubnet3**을 선택합니다. **저장(Save)**을 선택합니다.

## AWS Glue 연결 및 테이블 생성
<a name="example-notebook-msk-glue"></a>

Studio 노트북은 Amazon MSK 데이터 소스에 대한 메타데이터용 [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) 데이터베이스를 사용합니다. 이 섹션에서는 Amazon MSK 클러스터에 액세스하는 방법을 설명하는 AWS Glue 연결과 Studio 노트북과 같은 클라이언트에 데이터 소스의 데이터를 제공하는 방법을 설명하는 AWS Glue 테이블을 생성합니다.

**연결 생성**

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) AWS Glue 콘솔을 엽니다.

1. 아직 AWS Glue 데이터베이스가 없는 경우 왼쪽 탐색 모음에서 **데이터베이스를** 선택합니다. **데이터베이스 추가**를 선택합니다. **데이터베이스 추가** 창에서 **데이터베이스 이름**을 **default**(으)로 입력합니다. **생성(Create)**을 선택합니다.

1. 왼쪽 탐색 모음에서 **연결**을 선택합니다. **연결 추가**를 선택합니다.

1. **연결 추가** 창에서 다음 값을 입력합니다.
   + **연결 명칭**에 **ZeppelinConnection**을 입력합니다.
   + **연결 유형**에서 **Kafka**를 선택합니다.
   + **Kafka 부트스트랩 서버 URL**에 클러스터의 부트스트랩 브로커 문자열을 제공하세요. MSK 콘솔에서 또는 다음 CLI 명령을 입력하여 부트스트랩 브로커를 가져올 수 있습니다.

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + **SSL 연결 필요** 확인란의 선택을 취소하세요.

   **Next**(다음)를 선택합니다.

1. **VPC** 페이지에서 다음 값을 입력합니다.
   + **VPC**에서 VPC의 이름(예: ** AWS KafkaTutorialVPC**)을 선택합니다.
   + **서브넷**에서 **AWS KafkaTutorialSubnet2**를 선택합니다.
   + **보안 그룹**의 경우 사용 가능한 모든 그룹을 선택합니다.

   **Next**(다음)를 선택합니다.

1. **연결 속성** 및 **연결 액세스** 페이지에서 **마침**을 선택합니다.

**표 생성**
**참고**  
다음 단계에 설명된 대로 표을 수동으로 생성하거나 Apache Zeppelin 내의 노트북에서 Managed Service for Apache Flink용 표 커넥터 코드 생성을 사용하여 DDL 문을 통해 표을 생성할 수 있습니다. 그런 다음 테이블 AWS Glue 이 올바르게 생성되었는지 확인할 수 있습니다.

1. 왼쪽 탐색 모음에서 **표**를 선택합니다. **표** 페이지에서 **표 추가**, **표 수동 추가**를 선택합니다.

1. **표 속성 설정** 페이지에서 **표 명칭**에 **stock**을 입력합니다. 이전에 만든 데이터베이스를 선택했는지 확인하세요. **Next**(다음)를 선택합니다.

1. **데이터 스토어 추가** 페이지에서 **Kafka**를 선택합니다. **주제 명칭**에는 주제 명칭(예: **AWS KafkaTutorialTopic**)을 입력합니다. **연결**에서 **Zeppel In Connection**을 선택합니다.

1. **분류** 페이지에서 **JSON**을 선택합니다. **Next**(다음)를 선택합니다.

1. **스키마 정의** 페이지에서 열 추가를 선택하여 열을 추가합니다. 다음 속성을 가진 열을 추가합니다.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/managed-flink/latest/java/example-notebook-msk.html)

   **Next**(다음)를 선택합니다.

1. 다음 페이지에서 설정을 확인하고 **마침**을 선택합니다.

1. 테이블 목록에서 새로 생성한 테이블을 선택합니다.

1. **테이블 편집**을 선택하고 다음 속성을 추가합니다.
   + 키: `managed-flink.proctime`, 값: `proctime`
   + 키: `flink.properties.group.id`, 값: `test-consumer-group`
   + 키: `flink.properties.auto.offset.reset`, 값: `latest`
   + 키: `classification`, 값: `json`

   이러한 키/값 페어가 없으면 Flink 노트북에 오류가 발생합니다.

1. **적용**을 선택합니다.

## Amazon MSK로 Studio 노트북 생성
<a name="example-notebook-msk-create"></a>

애플리케이션에서 사용하는 리소스를 생성했으니 이제 Studio 노트북을 생성합니다.

**Topics**
+ [를 사용하여 Studio 노트북 생성 AWS Management Console](#example-notebook-create-msk-console)
+ [를 사용하여 Studio 노트북 생성 AWS CLI](#example-notebook-msk-create-api)

**참고**  
Amazon MSK 콘솔에서 기존 클러스터를 선택한 다음 **실시간 데이터 처리**를 선택하여 Studio 노트북을 생성할 수도 있습니다.

### 를 사용하여 Studio 노트북 생성 AWS Management Console
<a name="example-notebook-create-msk-console"></a>

1. [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **Managed Service for Apache Flink 애플리케이션** 페이지에서 **Studio** 탭을 선택합니다. **Studio 노트북 생성**을 선택합니다.
**참고**  
Amazon MSK 또는 Kinesis Data Streams 콘솔에서 Studio 노트북을 생성하려면 입력 Amazon MSK 클러스터 또는 Kinesis 데이터 스트림을 선택한 다음 **실시간 데이터 처리**를 선택합니다.

1. **Studio 노트북 생성** 페이지에서 다음 정보를 입력합니다.
   + **Studio 노트북 명칭** **MyNotebook**을 입력합니다.
   + **AWS Glue 데이터베이스**의 **기본값**을 선택합니다.

   **Studio 노트북 생성**을 선택합니다.

1. **MyNotebook** 페이지에서 **구성** 탭을 선택합니다. **네트워킹** 섹션에서 **편집**을 선택합니다.

1. **MyNotebook의 네트워킹 편집** 페이지에서 **Amazon MSK 클러스터를 기반으로 하는 VPC 구성**을 선택합니다. **Amazon MSK 클러스터**용 Amazon MSK 클러스터를 선택하세요. **Save changes**(변경 사항 저장)를 선택합니다.

1. **MyNotebook** 페이지에서 **실행**을 선택합니다. **상태**가 **실행 중**으로 표시될 때까지 기다리세요.

### 를 사용하여 Studio 노트북 생성 AWS CLI
<a name="example-notebook-msk-create-api"></a>

를 사용하여 Studio 노트북을 생성하려면 다음을 AWS CLI수행합니다.

1. 다음 정보가 있는지 확인합니다. 애플리케이션을 생성하려면 이러한 값이 필요합니다.
   + 계정 ID
   + Amazon MSK 클러스터를 포함하는 Amazon VPC의 서브넷 ID 및 보안 그룹 ID

1. 다음 콘텐츠를 가진 `create.json`이라는 파일을 생성합니다: 자리 표시자 값을 정보로 바꿉니다.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 애플리케이션을 생성하려면 다음 명령을 실행합니다.

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 명령이 완료되면 새 Studio 노트북의 세부 정보를 보여 주는 다음과 유사한 출력이 나타나야 합니다.

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 애플리케이션을 시작하려면 다음 명령을 실행합니다. 샘플 값을 계정 ID로 바꿉니다.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Amazon MSK 클러스터로 데이터 전송
<a name="example-notebook-msk-send"></a>

이 섹션에서는 Amazon EC2 클라이언트에서 Python 스크립트를 실행하여 Amazon MSK 데이터 소스로 데이터를 전송합니다.

1. Amazon EC2 클라이언트에 연결합니다.

1. 다음 명령을 실행하여 Python 버전 3, Pip 및 Python용 Kafka 패키지를 설치하고 작업을 확인합니다.

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. 다음 명령을 입력하여 클라이언트 시스템에서를 구성합니다 AWS CLI .

   ```
   aws configure
   ```

   `region`에 계정 자격 증명 및 **us-east-1**을 제공하세요.

1. 다음 콘텐츠를 가진 `stock.py`이라는 파일을 생성합니다: 샘플 값을 Amazon MSK 클러스터의 부트스트랩 브로커 문자열로 바꾸고, 주제가 **AWS KafkaTutorialTopic**이 아닌 경우 주제 명칭을 업데이트하세요.

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. 다음 명령으로 스크립트를 실행합니다:

   ```
   $ python3 stock.py
   ```

1. 다음 섹션을 완료하는 동안 스크립트는 실행 상태로 두세요.

## Studio 노트북을 테스트
<a name="example-notebook-msk-test"></a>

이 섹션에서는 Studio 노트북을 사용하여 Amazon MSK 클러스터에서 데이터를 쿼리합니다.

1. [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **Managed Service for Apache Flink 애플리케이션** 페이지에서 **Studio 노트북** 탭을 선택합니다. **MyNotebook**을 선택합니다.

1. **MyNotebook** 페이지에서 **Apache Zeppelin에서 열기**를 선택합니다.

   Apache Zeppelin 인터페이스가 새 탭에서 열립니다.

1. **제플린에 오신 것을 환영합니다\$1** 페이지에서 **Zeppelin 새로운 노트**를 선택하세요.

1. **Zeppelin 노트** 페이지에서 새로운 노트에 다음 쿼리를 입력합니다.

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   실행 아이콘을 선택합니다.

   애플리케이션은 Amazon MSK 클러스터의 데이터를 표시합니다.

애플리케이션에서 작동 측면을 볼 수 있도록 Apache Flink 대시보드를 열려면 **FLINK JOB**을 선택합니다. Flink 대시보드에 대한 자세한 내용을 알아보려면 [Managed Service for Apache Flink 개발자 가이드](https://docs.aws.amazon.com/)의 [Apache Flink 대시보드](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)를 참조하세요.

Flink Streaming SQL 쿼리의 더 많은 예는 [Apache Flink 설명서](https://nightlies.apache.org/flink/flink-docs-release-1.15/)의 [쿼리](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)를 참조하세요.

# 애플리케이션 및 종속 리소스 정리
<a name="example-notebook-cleanup"></a>

## Studio 노트북 삭제
<a name="example-notebook-cleanup-app"></a>

1. Managed Service for Apache Flink 콘솔을 엽니다.

1. **MyNotebook**을 선택합니다.

1. **작업**을 선택한 다음 **삭제**를 선택합니다.

## AWS Glue 데이터베이스 및 연결 삭제
<a name="example-notebook-cleanup-glue"></a>

1. [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) AWS Glue 콘솔을 엽니다.

1. 왼쪽 탐색 모음에서 **데이터베이스**를 선택합니다. **기본값** 옆의 체크박스를 선택하여 선택합니다. **작업**, **데이터베이스 삭제**를 선택합니다. 선택 항목을 확인합니다.

1. 왼쪽 탐색 모음에서 **연결**을 선택합니다. **ZeppelInConnection** 옆의 체크박스를 선택하여 선택하세요. **작업**, **연결 삭제**를 선택합니다. 선택 항목을 확인합니다.

## IAM 역할 및 정책 삭제
<a name="example-notebook-msk-cleanup-iam"></a>

1. IAM 콘솔([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/))을 엽니다.

1. 왼쪽 탐색 모음에서 **역할**을 선택합니다.

1. 검색창을 사용하여 **ZeppelInRole** 역할을 검색합니다.

1. **ZeppelInRole** 역할을 선택하세요. **역할 삭제**를 선택합니다. 삭제를 확인합니다.

## CloudWatch 로그 그룹 삭제
<a name="example-notebook-cleanup-cw"></a>

콘솔을 사용하여 애플리케이션을 생성하면 콘솔에서 CloudWatch Logs 그룹과 로그 스트림을 자동으로 생성합니다. AWS CLI를 사용하여 애플리케이션을 생성한 경우에는 로그 그룹과 스트림이 없습니다.

1. [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)에서 CloudWatch 콘솔을 엽니다.

1. 왼쪽 탐색 모음에서 **로그 그룹**을 선택합니다.

1. **/AWS/KinesisAnalytics/MyNotebook** 로그 그룹을 선택합니다.

1. **작업**, **로그 그룹 삭제**를 선택합니다. 삭제를 확인합니다.

## Kinesis Data Streams 리소스 정리
<a name="example-notebook-cleanup-streams"></a>

Kinesis 스트림을 삭제하려면 Kinesis 데이터 스트림 콘솔을 열고 Kinesis 스트림을 선택한 다음 **작업**, **삭제**를 선택합니다.

## MSK 리소스 정리
<a name="example-notebook-cleanup-msk"></a>

이 자습서에서 Amazon MSK 클러스터를 생성한 경우 이번 섹션의 단계를 따르세요. 이 섹션에는 Amazon EC2 클라이언트 인스턴스, Amazon VPC 및 Amazon MSK 클러스터를 정리하는 방법이 나와 있습니다.

### Amazon MSK 클러스터 삭제
<a name="example-notebook-msk-cleanup-msk"></a>

이 자습서에서 Amazon MSK 클러스터를 생성한 경우 다음 단계를 따르세요.

1. [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)에서 Amazon MSK 콘솔을 엽니다.

1. **AWS KafkaTutorialCluster**를 선택합니다. **삭제**를 선택합니다. 나타나는 창에 **delete**을(를) 입력하고 선택을 확인합니다.

### 클라이언트 인스턴스 종료
<a name="example-notebook-msk-cleanup-client"></a>

이 자습서에서 Amazon EC2 클라이언트 인스턴스를 생성한 경우 다음 단계를 따르세요.

1. [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/)에서 Amazon EC2 콘솔을 엽니다.

1. 왼쪽 탐색 모음에서 **인스턴스**를 선택합니다.

1. **ZeppelinClient** 옆의 체크박스를 선택하여 선택하세요.

1. **인스턴스 상태**, **인스턴스 종료**를 선택합니다.

### Amazon VPC 삭제
<a name="example-notebook-msk-cleanup-vpc"></a>

이 자습서에서 Amazon VPC를 만든 경우 다음 단계를 따르세요.

1. [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/)에서 Amazon EC2 콘솔을 엽니다.

1. 왼쪽 탐색 표시줄에서 **네트워크 인터페이스**를 선택합니다.

1. 검색창에 VPC ID를 입력하고 Enter 키를 눌러 검색합니다.

1. 테이블 헤더의 체크박스를 선택하여 표시된 모든 네트워크 인터페이스를 선택합니다.

1. **작업**, **분리**를 선택합니다. 표시되는 창에서 **강제 분리** 아래에서 **활성화**를 선택합니다. **분리**를 선택하고 모든 네트워크 인터페이스가 **사용 가능** 상태에 도달할 때까지 기다립니다.

1. 테이블 헤더의 체크박스를 선택하여 표시된 모든 네트워크 인터페이스를 다시 선택합니다.

1. **작업**, **삭제**를 선택합니다. 작업을 확인합니다.

1. [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)에서 Amazon VPC 콘솔을 엽니다.

1. **AWS KafkaTutorialVPC**를 선택합니다. **작업**, **VPC 삭제**를 선택합니다. **delete**을(를) 입력하고 삭제를 확인합니다.

# 자습서: 지속 가능한 상태를 사용하는 Managed Service for Apache Flink 애플리케이션으로 Studio 노트북 배포
<a name="example-notebook-deploy"></a>

다음 자습서에서는 지속 가능한 상태의 Apache Flink 애플리케이션용 관리 서비스로 Studio 노트북을 배포하는 방법을 보여줍니다.

**Topics**
+ [사전 조건 완료](#example-notebook-durable-setup)
+ [를 사용하여 내구성 있는 상태의 애플리케이션 배포 AWS Management Console](#example-notebook-deploy-console)
+ [를 사용하여 내구성 있는 상태의 애플리케이션 배포 AWS CLI](#example-notebook-deploy-cli)

## 사전 조건 완료
<a name="example-notebook-durable-setup"></a>

Kinesis Data Streams 또는 Amazon MSK를 사용하여 [자습서: Managed Service for Apache Flink에서 Studio 노트북 생성](example-notebook.md)에 따라 새 Studio 노트북을 생성하세요. Studio 노트북의 이름을 `ExampleTestDeploy`(으)로 지정하세요.

## 를 사용하여 내구성 있는 상태의 애플리케이션 배포 AWS Management Console
<a name="example-notebook-deploy-console"></a>

1. 콘솔의 **애플리케이션 코드 위치(*선택 사항*)**에 패키지 코드를 저장할 S3 버킷 위치를 추가합니다. 이렇게 하면 단계를 통해 노트북에서 직접 애플리케이션을 배포하고 실행할 수 있습니다.

1. Amazon S3 버킷을 읽고 쓰는 데 사용하는 역할을 활성화하고 Managed Service for Apache Flink 애플리케이션을 시작하는 데 필요한 권한을 애플리케이션 역할에 추가합니다.
   + AmazonS3FullAccess
   + Amazonmanaged-flinkFullAccess
   + 해당하는 경우 소스, 대상 및 VPC에 액세스할 수 있습니다. 자세한 설명은 [Studio 노트북의 IAM 권한 검토](how-zeppelin-iam.md) 섹션을 참조하세요.

1. 다음 예제 코드를 사용하세요.

   ```
   %flink.ssql(type=update) 
   CREATE TABLE exampleoutput (
     'ticket' VARCHAR,
     'price' DOUBLE
   )
   WITH (
     'connector' = 'kinesis',
     'stream' = 'ExampleOutputStream',
     'aws.region' = 'us-east-1',
     'scan.stream.initpos' = 'LATEST',
     'format' = 'json'
   );
   
   INSERT INTO exampleoutput SELECT ticker, price FROM exampleinputstream
   ```

1. 이번 기능 출시와 함께 노트북의 각 노트 오른쪽 상단 모서리에 노트북 이름과 함께 새로운 드롭다운이 표시됩니다. 다음을 수행할 수 있습니다.
   +  AWS Management Console에서 Studio 노트북 설정을 볼 수 있습니다.
   + Zeppelin Note를 빌드하고 Amazon S3로 내보내세요. 이때 애플리케이션 이름을 입력하고 **빌드 및 내보내기**를 선택합니다. 내보내기가 완료되면 알림을 받게 됩니다.
   + 필요한 경우 Amazon S3의 실행 파일에서 추가 테스트를 보고 실행할 수 있습니다.
   + 빌드가 완료되면 지속 가능한 상태 및 자동 크기 조정 기능을 갖춘 Kinesis 스트리밍 애플리케이션으로 코드를 배포할 수 있습니다.
   + 드롭다운을 사용하여 **Zeppelin Note를 Kinesis 스트리밍 애플리케이션으로 배포**를 선택합니다. 애플리케이션 이름을 검토하고 ** AWS 콘솔을 통해 배포**를 선택합니다.
   + 그러면 Managed Service for Apache Flink 애플리케이션을 생성하는 AWS Management Console 페이지로 이동합니다. 참고로 애플리케이션 이름, 병렬 처리, 코드 위치, 기본 Glue DB, VPC (해당하는 경우) 및 IAM 역할이 미리 입력되어 있습니다. IAM 역할에 소스 및 대상에 필요한 권한이 있는지 확인하세요. 안정적인 애플리케이션 상태 관리를 위해 스냅샷은 기본적으로 활성화됩니다.
   + **애플리케이션 생성**을 선택합니다.
   + 설정 **구성** 및 수정을 선택한 다음 **Run**을 선택하여 스트리밍 애플리케이션을 시작할 수 있습니다.

## 를 사용하여 내구성 있는 상태의 애플리케이션 배포 AWS CLI
<a name="example-notebook-deploy-cli"></a>

를 사용하여 애플리케이션을 배포하려면 Beta 2 정보와 함께 제공된 서비스 모델을 AWS CLI 사용하도록를 업데이트 AWS CLI해야 합니다. 업데이트된 서비스 모델을 사용하는 방법에 대한 자세한 내용은 [사전 조건 완료사전 조건 완료](example-notebook.md#example-notebook-setup) 섹션을 참조하세요.

다음 예제 코드에서는 새 Studio 노트북을 생성합니다.

```
aws kinesisanalyticsv2 create-application \
     --application-name <app-name> \
     --runtime-environment ZEPPELIN-FLINK-3_0 \
     --application-mode INTERACTIVE \
     --service-execution-role <iam-role>
     --application-configuration '{ 
       "ZeppelinApplicationConfiguration": { 
         "CatalogConfiguration": { 
           "GlueDataCatalogConfiguration": { 
             "DatabaseARN": "arn:aws:glue:us-east-1:<account>:database/<glue-database-name>" 
           } 
         } 
       },
       "FlinkApplicationConfiguration": {
         "ParallelismConfiguration": {
           "ConfigurationType": "CUSTOM",
           "Parallelism": 4,
           "ParallelismPerKPU": 4
         }
       },
       "DeployAsApplicationConfiguration": {
            "S3ContentLocation": { 
               "BucketARN": "arn:aws:s3:::<s3bucket>",
               "BasePath": "/something/"
            }
        },
       "VpcConfigurations": [
         {
           "SecurityGroupIds": [
             "<security-group>"
           ],
           "SubnetIds": [
             "<subnet-1>",
             "<subnet-2>"
           ]
         }
       ]
     }' \
     --region us-east-1
```

다음 코드 예제에서는 Studio 노트북을 시작합니다.

```
aws kinesisanalyticsv2 start-application \
    --application-name <app-name> \
    --region us-east-1 \
    --no-verify-ssl
```

다음 코드는 애플리케이션의 Apache Zeppelin 노트북 페이지의 URL을 반환합니다.

```
aws kinesisanalyticsv2 create-application-presigned-url \
    --application-name <app-name> \
    --url-type ZEPPELIN_UI_URL \

    --region us-east-1 \
    --no-verify-ssl
```

# Studio 노트북에서 데이터를 분석하기 위한 예제 쿼리 보기
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [Amazon MSK/Apache Kafka로 테이블 생성](#how-zeppelin-examples-creating-tables)
+ [Kinesis를 사용하여 테이블 생성](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [텀블링 윈도우 쿼리](#how-zeppelin-examples-tumbling)
+ [슬라이딩 창 쿼리](#how-zeppelin-examples-sliding)
+ [대화형 SQL 사용](#how-zeppelin-examples-interactive-sql)
+ [BlackHole SQL 커넥터 사용](#how-zeppelin-examples-blackhole-connector-sql)
+ [Scala를 사용하여 샘플 데이터 생성](#notebook-example-data-generator)
+ [대화형 Scala 사용](#notebook-example-interactive-scala)
+ [대화형 Python 사용](#notebook-example-interactive-python)
+ [대화형 Python, SQL, Scala를 조합하여 사용](#notebook-example-interactive-pythonsqlscala)
+ [계정 간 Kinesis 데이터 스트림 사용](#notebook-example-crossaccount-kds)

Apache Flink SQL 쿼리 설정에 대한 자세한 내용은 [대화형 데이터 분석을 위한 Zeppelin 노트북의 Flink](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html)를 참조하세요.

Apache Flink 대시보드에서 애플리케이션을 보려면 애플리케이션의 **Zeppelin Note** 페이지에서 **FLINK JOB**을 선택하세요.

윈도우 쿼리에 대한 자세한 내용은 [Apache Flink 설명서](https://nightlies.apache.org/flink/flink-docs-release-1.15/)의 [윈도우](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html)를 참조하세요.

Apache Flink Streaming SQL 쿼리의 더 많은 예는 [Apache Flink 설명서](https://nightlies.apache.org/flink/flink-docs-release-1.15/)의 [쿼리](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)를 참조하세요.

## Amazon MSK/Apache Kafka로 테이블 생성
<a name="how-zeppelin-examples-creating-tables"></a>

Managed Service for Apache Flink Studio와 Amazon MSK Flink 커넥터를 사용하여 일반 텍스트, SSL 또는 IAM 인증과의 연결을 인증할 수 있습니다. 요구 사항에 따라 특정 속성을 사용하여 테이블을 생성하세요.

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

[Apache Kafka SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/)에서 이러한 속성을 다른 속성과 결합할 수 있습니다.

## Kinesis를 사용하여 테이블 생성
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

다음 예제에서는 Kinesis를 사용하여 테이블을 생성합니다.

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

사용할 수 있는 다른 속성에 대한 자세한 내용은 [Amazon Kinesis Data Streams SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/)를 참조하세요.

## 텀블링 윈도우 쿼리
<a name="how-zeppelin-examples-tumbling"></a>

다음 Flink Streaming SQL 쿼리는 `ZeppelinTopic` 테이블에서 각 5초의 텀블링 윈도우 내에서 가장 높은 가격을 선택합니다.

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## 슬라이딩 창 쿼리
<a name="how-zeppelin-examples-sliding"></a>

다음 Apache Flink Streaming SQL 쿼리는 `ZeppelinTopic` 표에서 각 5초 슬라이딩 윈도우에서 가장 높은 가격을 선택합니다.

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## 대화형 SQL 사용
<a name="how-zeppelin-examples-interactive-sql"></a>

이 예제는 최대 이벤트 시간 및 처리 시간과 키 값 테이블의 값의 합계를 인쇄합니다. [Scala를 사용하여 샘플 데이터 생성](#notebook-example-data-generator) 샘플 데이터 생성 스크립트가 실행 중인지 확인하세요. Studio 노트북에서 필터링 및 조인과 같은 다른 SQL 쿼리를 시도하려면 Apache Flink 설명서: Apache Flink 설명서의 [쿼리](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)를 참조하세요.

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## BlackHole SQL 커넥터 사용
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

BlackHole SQL 커넥터를 사용하면 쿼리를 테스트하기 위해 Kinesis 데이터 스트림 또는 Amazon MSK 클러스터를 생성할 필요가 없습니다. BlackHole SQL 커넥터에 대한 자세한 내용은 Apache Flink 설명서의 [BlackHole SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html) 참조하세요. 이 예제에서 기본 카탈로그는 인메모리 카탈로그입니다.

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## Scala를 사용하여 샘플 데이터 생성
<a name="notebook-example-data-generator"></a>

이 예제에서는 Scala를 사용하여 샘플 데이터를 생성합니다. 이 샘플 데이터를 사용하여 다양한 쿼리를 테스트할 수 있습니다. 테이블 생성 명령문을 사용하여 키 값 테이블을 생성합니다.

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## 대화형 Scala 사용
<a name="notebook-example-interactive-scala"></a>

[대화형 SQL 사용](#how-zeppelin-examples-interactive-sql)의 스칼라 번역본입니다. 더 많은 스칼라 예제를 보려면 Apache Flink 설명서의 [테이블 API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html)를 참조하세요.

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## 대화형 Python 사용
<a name="notebook-example-interactive-python"></a>

[대화형 SQL 사용](#how-zeppelin-examples-interactive-sql)의 Python 번역은 다음과 같습니다 더 많은 Python 예제를 보려면 Apache Flink 문서의 [테이블 API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html)를 참조하세요.

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## 대화형 Python, SQL, Scala를 조합하여 사용
<a name="notebook-example-interactive-pythonsqlscala"></a>

대화형 분석을 위해 노트북에서 SQL, Python 및 Scala의 모든 조합을 사용할 수 있습니다. 지속 가능한 상태의 애플리케이션으로 배포할 Studio 노트북에서는 SQL과 Scala를 함께 사용할 수 있습니다. 이 예제에서는 무시되는 섹션과 지속 가능한 상태의 애플리케이션에 배포되는 섹션을 보여줍니다.

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## 계정 간 Kinesis 데이터 스트림 사용
<a name="notebook-example-crossaccount-kds"></a>

Studio 노트북이 있는 계정이 아닌 다른 계정에 있는 Kinesis 데이터 스트림을 사용하려면 Studio 노트북이 실행되는 계정에서 서비스 실행 역할을 생성하고 데이터 스트림이 있는 계정에서 역할 신뢰 정책을 생성하세요. 테이블 생성 DDL 명령문의 Kinesis 커넥터에서 `aws.credentials.provider`, `aws.credentials.role.arn`, 및 `aws.credentials.role.sessionName`을(를) 사용하여 데이터 스트림에 대한 테이블을 생성합니다.

Studio 노트북 계정에 다음 서비스 실행 역할을 사용하세요.

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

데이터 스트림 계정에 `AmazonKinesisFullAccess` 정책과 다음 역할 신뢰 정책을 사용하세요.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

테이블 생성 명령문에는 다음 단락을 사용하세요.

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```

# Managed Service for Apache Flink의 Studio 노트북 문제 해결
<a name="how-zeppelin-troubleshooting"></a>

이 섹션에는 Studio 노트북의 문제 해결 정보가 포함되어 있습니다.

## 중단된 애플리케이션 중지
<a name="how-zeppelin-troubleshooting-stopping"></a>

일시적 상태에 있는 애플리케이션을 중지하려면 `Force` 파라미터를 `true` 로 설정한 상태에서 [stopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) 작업을 호출하세요. 자세한 내용은 [Managed Service for Apache Flink 개발자 안내서](https://docs.aws.amazon.com/managed-flink/latest/java/)의 [애플리케이션 실행](https://docs.aws.amazon.com/managed-flink/latest/java/how-running-apps.html)을 참조하세요.

## 인터넷에 접속할 수 없는 VPC에서 지속 가능한 상태의 애플리케이션으로 배포
<a name="how-zeppelin-troubleshooting-deploying-no-internet"></a>

Managed Service for Apache Flink Studio의 애플리케이션별 배포 기능은 인터넷 액세스가 없는 VPC 애플리케이션을 지원하지 않습니다. 스튜디오에서 애플리케이션을 빌드한 다음 Managed Service for Apache Flink를 사용하여 Flink 애플리케이션을 수동으로 만들고 노트북에 빌드한 zip 파일을 선택하는 것이 좋습니다.

다음 단계에서는 이 접근 방식을 요약합니다.

1. 스튜디오 애플리케이션을 빌드하고 Amazon S3로 내보냅니다. 이 파일은 zip 파일이어야 합니다.

1. Amazon S3의 zip 파일 위치를 참조하는 코드 경로를 사용하여 Managed Service for Apache Flink 애플리케이션을 수동으로 생성합니다. 또한 다음 `env` 변수(총 2 `groupID`, 3 `var`)를 사용하여 애플리케이션을 구성해야 합니다.

1. kinesis.analytics.flink.run.options

   1. python: source/note.py

   1. jarfile: lib/PythonApplicationDependencies.jar

1. managed.deploy\$1as\$1app.options

   1. DatabaseARN: *<glue database ARN (Amazon Resource Name)>*

1. 애플리케이션에서 사용하는 서비스에 대한 Managed Service for Apache Flink Studio 및 Managed Service for Apache Flink IAM 역할에 권한을 부여해야 할 수 있습니다. 두 앱에 동일한 IAM 역할을 사용할 수 있습니다.

## 앱으로 배포 크기 및 구축 시간 단축
<a name="how-zeppelin-troubleshooting-deploying-as-app-reduce-build-time"></a>

필요한 라이브러리를 결정할 수 없기 때문에 Python 애플리케이션용 Studio 앱으로 배포는 Python 환경에서 사용 가능한 모든 것을 패키징합니다. 이로 인해 앱으로 배포 크기가 필요 이상으로 커질 수 있습니다. 다음 절차는 종속성을 제거하여 앱으로 배포 Python 애플리케이션 크기를 줄이는 방법을 보여줍니다.

Studio의 앱으로 배포 기능을 사용하여 Python 애플리케이션을 구축하는 경우 애플리케이션이 종속되지 않는다면 시스템에서 사전 설치된 Python 패키지를 제거하는 것을 고려할 수 있습니다. 이렇게 하면 최종 아티팩트 크기를 줄여 애플리케이션 크기에 대한 서비스 제한을 위반하지 않을 뿐만 아니라 앱으로 배포 기능을 사용하여 애플리케이션의 구축 시간을 개선할 수 있습니다.

다음 명령을 실행하여 설치된 모든 Python 패키지를 각각의 설치된 크기와 함께 나열하고 상당한 크기의 패키지를 선택적으로 제거할 수 있습니다.

```
%flink.pyflink

!pip list --format freeze | awk -F = {'print $1'} | xargs pip show | grep -E 'Location:|Name:' | cut -d ' ' -f 2 | paste -d ' ' - - | awk '{gsub("-","_",$1); print $2 "/" tolower($1)}' | xargs du -sh 2> /dev/null | sort -hr
```

**참고**  
`apache-beam`은 Flink Python이 작동하는 데 필요합니다. 이 패키지와 해당 종속성을 제거하면 안 됩니다.

다음은 제거를 고려할 수 있는 Studio V2의 사전 설치 Python 패키지 목록입니다.

```
scipy
statsmodels
plotnine
seaborn
llvmlite
bokeh
pandas
matplotlib
botocore
boto3
numba
```

**Zeppelin 노트북에서 Python 패키지 제거**

1. 애플리케이션을 제거하기 전에 애플리케이션이 패키지 또는 해당 패키지를 사용하는 패키지에 종속되어 있는지 확인합니다. [pipdeptree](https://pypi.org/project/pipdeptree/)를 사용하여 패키지의 종속 항목을 식별할 수 있습니다.

1. 다음 명령을 실행하여 패키지를 제거합니다.

   ```
   %flink.pyflink
   !pip uninstall -y <package-to-remove>
   ```

1. 실수로 제거한 패키지를 검색해야 하는 경우 다음 명령을 실행합니다.

   ```
   %flink.pyflink
   !pip install <package-to-install>
   ```

**Example 예제: 앱으로 배포 기능을 사용하여 Python 애플리케이션을 배포하기 전에 `scipy` 패키지를 제거합니다.**  

1. `pipdeptree`를 사용하여 모든 `scipy` 소비자를 검색하고 `scipy`를 안전하게 제거할 수 있는지 확인합니다.
   + 노트북을 통해 도구를 설치합니다.

     ```
     %flink.pyflink             
     !pip install pipdeptree
     ```
   + 다음을 실행하여 `scipy`의 역방향 종속성 트리를 가져옵니다.

     ```
     %flink.pyflink
     !pip -r -p scipy
     ```

     다음과 유사한 출력 화면이 표시되어야 합니다(간결하게 나타내기 위해 요약됨).

     ```
     ...
     ------------------------------------------------------------------------ 
     scipy==1.8.0 
     ├── plotnine==0.5.1 [requires: scipy>=1.0.0] 
     ├── seaborn==0.9.0 [requires: scipy>=0.14.0] 
     └── statsmodels==0.12.2 [requires: scipy>=1.1] 
         └── plotnine==0.5.1 [requires: statsmodels>=0.8.0]
     ```

1. 애플리케이션에서 `seaborn`, `statsmodels` 및 `plotnine`의 사용법을 주의 깊게 살펴봅니다. 애플리케이션이 `scipy`, `seaborn`, `statemodels` 또는 `plotnine`에 종속되지 않은 경우 이러한 패키지를 모두 제거하거나 애플리케이션에 필요하지 않은 패키지만 제거할 수 있습니다.

1. 다음을 실행하여 패키지를 제거합니다.

   ```
   !pip uninstall -y scipy plotnine seaborn statemodels
   ```

## 작업 취소
<a name="how-notbook-canceling-jobs"></a>

이 섹션에서는 Apache Zeppelin에서 가져올 수 없는 Apache Flink 작업을 취소하는 방법을 보여줍니다. 이러한 작업을 취소하려면 Apache Flink 대시보드로 이동하여 작업 ID를 복사한 다음, 다음 예제 중 하나에서 이 작업을 사용하세요.

단일 작업을 취소하려면:

```
%flink.pyflink
import requests

requests.patch("https://zeppelin-flink:8082/jobs/[job_id]", verify=False)
```

실행 중인 모든 작업을 취소하려면:

```
%flink.pyflink
import requests

r = requests.get("https://zeppelin-flink:8082/jobs", verify=False)
jobs = r.json()['jobs']

for job in jobs:
    if (job["status"] == "RUNNING"):
        print(requests.patch("https://zeppelin-flink:8082/jobs/{}".format(job["id"]), verify=False))
```

모든 작업을 취소하려면:

```
%flink.pyflink
import requests

r = requests.get("https://zeppelin-flink:8082/jobs", verify=False)
jobs = r.json()['jobs']

for job in jobs:
    requests.patch("https://zeppelin-flink:8082/jobs/{}".format(job["id"]), verify=False)
```

## Apache Flink 인터프리터 재시작
<a name="how-notbook-restarting-interpreter"></a>

Studio 노트북에서 Apache Flink 인터프리터를 다시 시작하려면

1. 화면 오른쪽 상단에 있는 **구성**을 선택합니다.

1. **인터프리터**를 선택합니다.

1. **재시작**을 선택한 다음 **확인**을 선택합니다.

# Managed Service for Apache Flink Studio 노트북용 사용자 지정 IAM 정책 생성
<a name="how-zeppelin-appendix-iam"></a>

일반적으로 관리형 IAM 정책을 사용하여 애플리케이션이 종속 리소스에 액세스할 수 있도록 허용합니다. 애플리케이션의 권한을 더 세밀하게 제어해야 하는 경우 사용자 지정 IAM 정책을 사용할 수 있습니다. 이 섹션에는 사용자 지정 IAM 정책의 예가 포함되어 있습니다.

**참고**  
다음 정책 예시에서는 자리 표시자 텍스트를 애플리케이션 값으로 대체합니다.

**Topics**
+ [AWS Glue](#how-zeppelin-iam-glue)
+ [CloudWatch Logs](#how-zeppelin-iam-cw)
+ [Kinesis 스트림](#how-zeppelin-iam-streams)
+ [Amazon MSK 클러스터](#how-zeppelin-iam-msk)

## AWS Glue
<a name="how-zeppelin-iam-glue"></a>

다음 예제 정책은 AWS Glue 데이터베이스에 액세스할 수 있는 권한을 부여합니다.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "GlueTable",
            "Effect": "Allow",
            "Action": [
                "glue:GetConnection",
                "glue:GetTable",
                "glue:GetTables",
                "glue:GetDatabase",
                "glue:CreateTable",
                "glue:UpdateTable"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:123456789012:connection/*",
                "arn:aws:glue:us-east-1:123456789012:table/<database-name>/*",
                "arn:aws:glue:us-east-1:123456789012:database/<database-name>",
                "arn:aws:glue:us-east-1:123456789012:database/hive",
                "arn:aws:glue:us-east-1:123456789012:catalog"
            ]
        },
        {
            "Sid": "GlueDatabase",
            "Effect": "Allow",
            "Action": "glue:GetDatabases",
            "Resource": "*"
        }
    ]
}
```

------

## CloudWatch Logs
<a name="how-zeppelin-iam-cw"></a>

다음 정책에서는 CloudWatch 로그에 대한 액세스 권한을 부여합니다.

```
{
      "Sid": "ListCloudwatchLogGroups",
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogGroups"
      ],
      "Resource": [
        "arn:aws:logs:<region>:<accountId>:log-group:*"
      ]
    },
    {
      "Sid": "ListCloudwatchLogStreams",
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogStreams"
      ],
      "Resource": [
        "<logGroupArn>:log-stream:*"
      ]
    },
    {
      "Sid": "PutCloudwatchLogs",
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents"
      ],
      "Resource": [
        "<logStreamArn>"
      ]
    }
```

**참고**  
콘솔을 사용하여 애플리케이션을 생성하는 경우 콘솔은 CloudWatch Logs에 액세스하는 데 필요한 정책을 애플리케이션 역할에 추가합니다.

## Kinesis 스트림
<a name="how-zeppelin-iam-streams"></a>

애플리케이션은 소스 또는 대상에 Kinesis Stream을 사용할 수 있습니다. 애플리케이션에는 소스 스트림에서 읽을 수 있는 읽기 권한과 대상 스트림에 쓸 수 있는 쓰기 권한이 필요합니다.

다음 정책은 소스로 사용되는 Kinesis Stream에서 읽을 수 있는 권한을 부여합니다.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisShardDiscovery",
            "Effect": "Allow",
            "Action": "kinesis:ListShards",
            "Resource": "*"
        },
        {
            "Sid": "KinesisShardConsumption",
            "Effect": "Allow",
            "Action": [
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:DeregisterStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>"
        },
        {
            "Sid": "KinesisEfoConsumer",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamConsumer",
                "kinesis:SubscribeToShard"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>/consumer/*"
        }
    ]
}
```

------

다음 정책은 대상으로 사용되는 Kinesis Stream에 쓰기 권한을 부여합니다.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisStreamSink",
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:DescribeStreamSummary",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>"
        }
    ]
}
```

------

애플리케이션이 암호화된 Kinesis 스트림에 액세스하는 경우 스트림과 스트림의 암호화 키에 액세스할 수 있는 추가 권한을 부여해야 합니다.

다음 정책은 암호화된 소스 스트림과 스트림의 암호화 키에 액세스할 수 있는 권한을 부여합니다.

```
{
      "Sid": "ReadEncryptedKinesisStreamSource",
      "Effect": "Allow",
      "Action": [
        "kms:Decrypt"
      ],
      "Resource": [
        "<inputStreamKeyArn>"
      ]
    }
    ,
```

다음 정책은 암호화된 대상 스트림과 스트림의 암호화 키에 액세스할 수 있는 권한을 부여합니다.

```
{
      "Sid": "WriteEncryptedKinesisStreamSink",
      "Effect": "Allow",
      "Action": [
        "kms:GenerateDataKey"
      ],
      "Resource": [
        "<outputStreamKeyArn>"
      ]
    }
```

## Amazon MSK 클러스터
<a name="how-zeppelin-iam-msk"></a>

Amazon MSK 클러스터에 대한 액세스 권한을 부여하려면 클러스터의 VPC에 대한 액세스 권한을 부여해야 합니다. Amazon VPC에 액세스하기 위한 정책 예제는 [VPC 애플리케이션 권한](https://docs.aws.amazon.com/managed-flink/latest/java/vpc-permissions.html)을 참조하세요.