

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Menggunakan notebook Studio dengan Managed Service untuk Apache Flink
<a name="how-notebook"></a>

Notebook Studio untuk Layanan Terkelola untuk Apache Flink memungkinkan Anda secara interaktif mengkueri aliran data secara real-time, dan dengan mudah membangun dan menjalankan aplikasi pemrosesan aliran menggunakan SQL standar, Python, dan Scala. Dengan beberapa klik di konsol AWS Manajemen, Anda dapat meluncurkan notebook tanpa server untuk menanyakan aliran data dan mendapatkan hasil dalam hitungan detik. 

Notebook adalah lingkungan pengembangan berbasis web. Dengan notebook, Anda mendapatkan pengalaman pengembangan interaktif sederhana yang dikombinasikan dengan kemampuan lanjutan yang disediakan oleh Apache Flink. Notebook studio menggunakan notebook yang didukung [Apache Zeppelin](https://zeppelin.apache.org/), dan menggunakan [Apache Flink](https://flink.apache.org/) sebagai mesin pemrosesan aliran. Notebook Studio menggabungkan teknologi ini dengan lancar untuk membuat analitik lanjutan pada aliran data yang dapat diakses oleh developer dari semua keahlian. 

Apache Zeppelin memberi notebook Studio Anda dengan rangkaian alat analitik lengkap, termasuk yang berikut:
+ Visualisasi Data
+ Mengekspor data ke file
+ Mengontrol format output untuk analisis yang lebih mudah

Untuk mulai menggunakan Managed Service untuk Apache Flink dan Apache Zeppelin, lihat. [Tutorial: Membuat notebook Studio di Managed Service untuk Apache Flink](example-notebook.md) Untuk informasi selengkapnya tentang Apache Zeppelin, lihat [Dokumentasi Apache Zeppelin](http://zeppelin.apache.org).

 [Dengan notebook, Anda memodelkan kueri menggunakan Apache Flink [Table API & SQL di SQL](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/), Python, atau Scala, atau API di Scala. DataStream ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) Dengan beberapa klik, Anda kemudian dapat mempromosikan notebook Studio ke aplikasi pemrosesan aliran Apache Flink yang terus berjalan, non-interaktif, untuk beban kerja produksi Anda.

**Topics**
+ [Gunakan versi Runtime notebook Studio yang benar](studio-notebook-versions.md)
+ [Buat notebook Studio](how-zeppelin-creating.md)
+ [Lakukan analisis interaktif data streaming](how-zeppelin-interactive.md)
+ [Terapkan sebagai aplikasi dengan status tahan lama](how-notebook-durable.md)
+ [Izin IAM](how-zeppelin-iam.md)
+ [Gunakan konektor dan dependensi](how-zeppelin-connectors.md)
+ [Fungsi yang ditetapkan pengguna](how-zeppelin-udf.md)
+ [Aktifkan pos pemeriksaan](how-zeppelin-checkpoint.md)
+ [Tingkatkan Waktu Proses Studio](upgrading-studio-runtime.md)
+ [Bekerja dengan AWS Glue](how-zeppelin-glue.md)
+ [Contoh dan tutorial untuk notebook Studio di Managed Service untuk Apache Flink](how-zeppelin-examples.md)
+ [Memecahkan masalah notebook Studio untuk Layanan Terkelola untuk Apache Flink](how-zeppelin-troubleshooting.md)
+ [Buat kebijakan IAM khusus untuk Managed Service untuk notebook Apache Flink Studio](how-zeppelin-appendix-iam.md)

# Gunakan versi Runtime notebook Studio yang benar
<a name="studio-notebook-versions"></a>

Dengan Amazon Managed Service untuk Apache Flink Studio, Anda dapat melakukan kueri aliran data secara real time dan membangun serta menjalankan aplikasi pemrosesan aliran menggunakan SQL, Python, dan Scala standar dalam buku catatan interaktif. Notebook studio didukung oleh [Apache Zeppelin](https://zeppelin.apache.org/) dan menggunakan [Apache Flink](https://flink.apache.org/) sebagai mesin pemrosesan aliran. 

**catatan**  
Kami akan mencela Studio Runtime dengan **Apache Flink versi 1.11** pada 5 November 2024. Mulai dari tanggal ini, Anda tidak akan dapat menjalankan notebook baru atau membuat aplikasi baru menggunakan versi ini. Kami menyarankan Anda meningkatkan ke runtime terbaru (Apache Flink 1.15 dan Apache Zeppelin 0.10) sebelum waktu itu. Untuk panduan tentang cara meng-upgrade notebook Anda, lihat[Tingkatkan Waktu Proses Studio](upgrading-studio-runtime.md).


**Waktu Jalan Studio**  

| Versi Apache Flink | Versi Apache Zeppelin | Versi Python |  | 
| --- | --- | --- | --- | 
| 1.15 | 0.1 | 3.8 | Disarankan | 
| 1.13 | 0,9 | 3.8 | Didukung hingga 16 Oktober 2024 | 
| 1.11 | 0,9 | 3.7 | Menghentikan pada 24 Februari 2025 | 

# Buat notebook Studio
<a name="how-zeppelin-creating"></a>

Notebook Studio berisi kueri atau program yang ditulis dalam SQL, Python, atau Scala yang berjalan di data streaming dan mengembalikan hasil analitik. Anda membuat aplikasi menggunakan konsol atau CLI, dan menyediakan kueri untuk menganalisis data dari sumber data Anda.

Aplikasi Anda memiliki komponen berikut:
+ Sumber data, seperti klaster Amazon MSK, Kinesis data stream, atau bucket Amazon S3.
+  AWS Glue Database. Basis data ini berisi tabel, yang menyimpan sumber data dan tujuan serta skema titik akhir tujuan Anda. Untuk informasi selengkapnya, lihat [Bekerja dengan AWS Glue](how-zeppelin-glue.md).
+ Kode aplikasi Anda. Kode Anda menerapkan kueri atau program analitik Anda.
+ Pengaturan aplikasi dan properti runtime Anda. Untuk informasi tentang pengaturan aplikasi dan properti runtime, lihat topik berikut di [Panduan Developer untuk Aplikasi Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html):
  + **Paralelisme dan Penskalaan Aplikasi: ** Anda menggunakan pengaturan Paralelisme aplikasi untuk mengontrol jumlah kueri yang dapat dijalankan aplikasi Anda secara bersamaan. Kueri Anda juga dapat mengambil keuntungan dari peningkatan paralelisme jika kueri tersebut memiliki beberapa jalur eksekusi, seperti dalam keadaan berikut:
    + Saat memproses beberapa serpihan Kinesis data stream
    + Ketika membuat partisi data menggunakan operator `KeyBy`.
    + Saat menggunakan beberapa operator jendela

    Untuk informasi selengkapnya tentang penskalaan aplikasi, lihat [Penskalaan Aplikasi di Layanan Terkelola untuk Apache Flink untuk Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html).
  + **Logging dan Monitoring:** Untuk informasi tentang pencatatan dan pemantauan aplikasi, lihat [Logging dan Monitoring di Amazon Managed Service for Apache Flink for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/monitoring-overview.html).
  + Aplikasi Anda menggunakan titik pemeriksaan dan titik simpan untuk toleransi kesalahan. Titik pemeriksaan dan titik simpan tidak diaktifkan secara default untuk notebook Studio.

Anda dapat membuat buku catatan Studio menggunakan buku catatan Konsol Manajemen AWS atau file AWS CLI. 

Saat membuat aplikasi dari konsol, Anda memiliki opsi berikut:
+ Di konsol Amazon MSK, pilih klaster Anda, lalu pilih **Process data in real time** (Proses data secara langsung).
+ Di konsol Kinesis Data Streams, pilih aliran data Anda, lalu di tab **Application** (Aplikasi), pilih **Process data in real time** (Proses data secara langsung).
+ Di konsol Managed Service for Apache Flink pilih tab **Studio**, lalu pilih **Buat notebook Studio**.

# Lakukan analisis interaktif data streaming
<a name="how-zeppelin-interactive"></a>

Anda menggunakan notebook nirserver yang didukung Apache Zeppelin untuk berinteraksi dengan data streaming Anda. Notebook Anda dapat memiliki beberapa catatan, dan setiap catatan dapat memiliki satu atau beberapa paragraf tempat Anda dapat menulis kode Anda.

Contoh kueri SQL berikut menunjukkan cara mengambil data dari sumber data:

```
%flink.ssql(type=update)
select * from stock;
```

Untuk lebih banyak contoh kueri Flink Streaming SQL, lihat [Contoh dan tutorial untuk notebook Studio di Managed Service untuk Apache Flink](how-zeppelin-examples.md) berikut, dan [Kueri](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/overview/) dalam dokumentasi Apache Flink.

Anda dapat menggunakan kueri SQL Flink di notebook Studio untuk mengkueri data streaming. Anda juga dapat menggunakan Python (API Tabel) dan Scala (Tabel dan Datastream APIs) untuk menulis program untuk menanyakan data streaming Anda secara interaktif. Anda dapat melihat hasil kueri atau program, memperbaruinya dalam hitungan detik, dan menjalankannya kembali untuk melihat hasil yang diperbarui.

## Interpreter Flink
<a name="how-zeppelin-interactive-interpreters"></a>

*Anda menentukan bahasa Managed Service untuk Apache Flink yang digunakan untuk menjalankan aplikasi Anda dengan menggunakan interpreter.* Anda dapat menggunakan interpreter berikut dengan Managed Service untuk Apache Flink:


| Nama | Kelas | Deskripsi | 
| --- |--- |--- |
| %flink | FlinkInterpreter | Menciptakan ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironmentdan menyediakan lingkungan Scala | 
| %flink.pyflink | PyFlinkInterpreter | Menyediakan lingkungan python | 
| %flink.ipyflink | IPyFlinkInterpreter | Menyediakan lingkungan ipython | 
| %flink.ssql | FlinkStreamSqlInterpreter | Menyediakan lingkungan stream sql | 
| %flink.bsql | FlinkBatchSqlInterpreter | Menyediakan lingkungan batch sql | 

Untuk informasi selengkapnya tentang interpreter Flink, lihat [ Interpreter Flink untuk Apache Zeppelin](https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html).

Jika Anda menggunakan `%flink.pyflink` atau `%flink.ipyflink` sebagai penerjemah Anda, Anda harus menggunakan `ZeppelinContext` untuk memvisualisasikan hasil dalam buku catatan.

Untuk contoh yang lebih PyFlink spesifik, lihat [Kueri aliran data Anda secara interaktif menggunakan Layanan Terkelola untuk Apache Flink Studio](https://aws.amazon.com/blogs/big-data/query-your-data-streams-interactively-using-kinesis-data-analytics-studio-and-python/) dan Python.

## Variabel lingkungan tabel Apache Flink
<a name="how-zeppelin-interactive-env-vars"></a>

Apache Zeppelin menyediakan akses ke sumber daya lingkungan tabel menggunakan variabel lingkungan. 

Anda mengakses sumber daya lingkungan tabel Scala dengan variabel berikut:


| Variabel | Sumber daya | 
| --- |--- |
| senv | StreamExecutionEnvironment | 
| stenv | StreamTableEnvironment for blink planner | 

Anda mengakses sumber daya lingkungan tabel Python dengan variabel berikut:


| Variabel | Sumber daya | 
| --- |--- |
| s\$1env | StreamExecutionEnvironment | 
| st\$1env | StreamTableEnvironment for blink planner | 

Untuk informasi selengkapnya tentang penggunaan lingkungan tabel, lihat [Konsep dan API Umum](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/common/) dalam dokumentasi Apache Flink. 

# Terapkan sebagai aplikasi dengan status tahan lama
<a name="how-notebook-durable"></a>

Anda dapat membangun kode Anda dan mengekspornya ke Amazon S3. Anda dapat mempromosikan kode yang Anda tulis dalam catatan Anda ke aplikasi pemrosesan streaming yang terus berjalan. Ada dua mode menjalankan aplikasi Apache Flink pada Managed Service untuk Apache Flink: Dengan notebook Studio, Anda memiliki kemampuan untuk mengembangkan kode Anda secara interaktif, melihat hasil kode Anda secara real time, dan memvisualisasikannya dalam catatan Anda. Setelah Anda menerapkan catatan untuk dijalankan dalam mode streaming, Managed Service for Apache Flink membuat aplikasi untuk Anda yang berjalan terus menerus, membaca data dari sumber Anda, menulis ke tujuan Anda, mempertahankan status aplikasi yang berjalan lama, dan skala otomatis secara otomatis berdasarkan throughput aliran sumber Anda. 

**catatan**  
Bucket S3 tempat Anda mengekspor kode aplikasi harus berada dalam Wilayah yang sama dengan notebook Studio Anda.

Anda hanya dapat men-deploy catatan dari notebook Studio jika memenuhi kriteria berikut:
+ Paragraf harus disusun secara berurutan. Saat Anda menerapkan aplikasi Anda, semua paragraf dalam catatan akan dieksekusi secara berurutan (left-to-right, top-to-bottom) seperti yang muncul di catatan Anda. Anda dapat memeriksa urutan ini dengan memilih **Run All Paragraphs** (Jalankan Semua Paragraf) di catatan Anda.
+ Kode Anda adalah kombinasi Python dan SQL atau Scala dan SQL. Kami tidak mendukung Python dan Scala bersama saat ini untuk. deploy-as-application
+ Catatan Anda sebaiknya hanya memiliki interpreter berikut: `%flink`, `%flink.ssql`, `%flink.pyflink`, `%flink.ipyflink`, `%md`.
+ Penggunaan objek [konteks Zeppelin](https://zeppelin.apache.org/docs/0.9.0/usage/other_features/zeppelin_context.html) `z` tidak didukung. Metode yang tidak mengembalikan apa pun tidak akan melakukan apa pun kecuali mencatat peringatan. Metode lain akan meningkatkan pengecualian Python atau gagal untuk mengompilasi di Scala.
+ Catatan harus menghasilkan satu tugas Apache Flink. 
+ Catatan dengan [formulir dinamis](https://zeppelin.apache.org/docs/0.9.0/usage/dynamic_form/intro.html) tidak didukung untuk men-deploy sebagai aplikasi.
+ %md ([Markdown](https://zeppelin.apache.org/docs/0.9.0/interpreter/markdown.html)) akan dilewati dalam deployment sebagai aplikasi, karena ini diprediksi berisi dokumentasi yang dapat dibaca manusia yang tidak cocok untuk dijalankan sebagai bagian dari aplikasi yang dihasilkan.
+ Paragraf yang dinonaktifkan untuk berjalan dalam Zeppelin akan dilewati dalam deployment sebagai aplikasi. Bahkan jika paragraf yang dinonaktifkan menggunakan interpreter yang tidak kompatibel, misalnya, `%flink.ipyflink` dalam catatan dengan interpreter `%flink` `and %flink.ssql`, paragraf akan dilewati saat men-deploy catatan sebagai aplikasi, dan tidak akan mengakibatkan kesalahan.
+ Harus ada setidaknya satu paragraf yang hadir dengan kode sumber (Flink SQL, PyFlink atau Flink Scala) yang diaktifkan untuk berjalan agar penerapan aplikasi berhasil.
+ Mengatur paralelisme di direktif interpreter dalam paragraf (misalnya `%flink.ssql(parallelism=32)`) akan diabaikan dalam aplikasi yang di-deploy dari catatan. Sebagai gantinya, Anda dapat memperbarui aplikasi yang digunakan melalui Konsol Manajemen AWS, AWS Command Line Interface atau AWS API untuk mengubah pengaturan and/or ParallelismPer KPU Parallelism sesuai dengan tingkat paralelisme yang dibutuhkan aplikasi Anda, atau Anda dapat mengaktifkan penskalaan otomatis untuk aplikasi yang Anda gunakan.
+ Jika Anda menerapkan sebagai aplikasi dengan status tahan lama VPC Anda harus memiliki akses internet. Jika VPC Anda tidak memiliki akses internet, lihat. [Terapkan sebagai aplikasi dengan status tahan lama di VPC tanpa akses internet](how-zeppelin-troubleshooting.md#how-zeppelin-troubleshooting-deploying-no-internet) 

## Kriteria Scala/Python
<a name="how-notebook-durable-scala"></a>
+ Dalam kode Scala atau Python Anda, gunakan [Perencana Blink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/#dependency-structure) (`senv`, `stenv` untuk Scala; `s_env`, `st_env` untuk Python) dan bukan perencana "Flink" yang lebih lama (`stenv_2` untuk Scala, `st_env_2` untuk Python). Proyek Apache Flink merekomendasikan penggunaan perencana Blink untuk kasus penggunaan produksi, dan ini adalah perencana default di Zeppelin dan di Flink.
+ Paragraf Python Anda tidak boleh menggunakan [pemanggilan/tugas shell](https://ipython.readthedocs.io/en/stable/interactive/python-ipython-diff.html#shell-assignment) menggunakan atau [perintah IPython ajaib](https://ipython.readthedocs.io/en/stable/interactive/magics.html) seperti `!` `%timeit` atau `%conda` dalam catatan yang dimaksudkan untuk digunakan sebagai aplikasi.
+ Anda tidak dapat menggunakan kelas kasus Scala sebagai parameter fungsi yang diteruskan ke operator aliran data susunan yang lebih tinggi seperti `map` dan `filter`. Untuk informasi tentang kelas kasus Scala, lihat [KELAS KASUS](https://docs.scala-lang.org/overviews/scala-book/case-classes.html) dalam dokumentasi Scala.

## Kriteria SQL
<a name="how-notebook-durable-sql"></a>
+ Pernyataan SELECT sederhana tidak diizinkan, karena tidak ada yang setara dengan bagian output paragraf tempat data dapat dikirim.
+ Dalam setiap paragraf yang diberikan, pernyataan DDL (`USE`, `CREATE`, `ALTER`, `DROP`, `SET`, `RESET`) harus mendahului pernyataan DML (`INSERT`). Ini karena pernyataan DML dalam paragraf harus dikirimkan bersama-sama sebagai satu tugas Flink.
+ Harus ada maksimal satu paragraf yang memiliki pernyataan DML di dalamnya. Ini karena, untuk deploy-as-application fitur tersebut, kami hanya mendukung pengiriman satu pekerjaan ke Flink.

Untuk informasi selengkapnya dan contoh, lihat [Menerjemahkan, menyunting, dan menganalisis data streaming menggunakan fungsi SQL dengan Amazon Managed Service untuk Apache Flink, Amazon Translate, dan Amazon Comprehend](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesisanalytics-MyApplicatioamazon-translate-and-amazon-comprehend/).

# Tinjau izin IAM untuk notebook Studio
<a name="how-zeppelin-iam"></a>

Layanan Terkelola untuk Apache Flink membuat peran IAM untuk Anda saat Anda membuat buku catatan Studio melalui. Konsol Manajemen AWS Ini juga berhubungan dengan peran kebijakan yang memungkinkan akses berikut:


****  

| Layanan | Akses  | 
| --- | --- | 
| CloudWatch Log | Daftar | 
| Amazon EC2 | Daftar | 
| AWS Glue | Baca, Tulis | 
| Layanan Terkelola untuk Apache Flink | Baca | 
| Layanan Terkelola untuk Apache Flink V2 | Baca | 
| Amazon S3 | Baca, Tulis | 

# Gunakan konektor dan dependensi
<a name="how-zeppelin-connectors"></a>

Konektor memungkinkan Anda membaca dan menulis data di berbagai teknologi. Layanan Terkelola untuk Apache Flink menggabungkan tiga konektor default dengan notebook Studio Anda. Anda juga dapat menggunakan konektor kustom. Untuk informasi selengkapnya tentang konektor, lihat [Konektor Tabel & SQL](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/) di dokumentasi Apache Flink.

## Konektor default
<a name="zeppelin-default-connectors"></a>

Jika Anda menggunakan Konsol Manajemen AWS untuk membuat buku catatan Studio, Managed Service for Apache Flink menyertakan konektor kustom berikut secara default:`flink-sql-connector-kinesis`, `flink-connector-kafka_2.12` dan. `aws-msk-iam-auth` Untuk membuat notebook Studio melalui konsol tanpa konektor khusus ini, pilih opsi **Buat dengan pengaturan khusus**. Selanjutnya, ketika Anda sampai di halaman **Konfigurasi**, hapus kotak centang di sebelah dua konektor.

Jika Anda menggunakan [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)API untuk membuat notebook Studio, `flink-connector-kafka` konektor `flink-sql-connector-flink` dan konektor tidak disertakan secara default. Untuk menambahkannya, tentukan konektor sebagai `MavenReference` di tipe data `CustomArtifactsConfiguration` seperti yang ditunjukkan dalam contoh berikut.

`aws-msk-iam-auth`Konektor adalah konektor yang akan digunakan dengan Amazon MSK yang menyertakan fitur untuk mengautentikasi secara otomatis dengan IAM. 

**catatan**  
Versi konektor yang ditunjukkan dalam contoh berikut adalah satu-satunya versi yang kami dukung.

```
For the Kinesis connector:

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "org.apache.flink",

      "ArtifactId": "flink-sql-connector-kinesis",
      "Version": "1.15.4"

   }      
}]

For authenticating with AWS MSK through AWS IAM:

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "software.amazon.msk",
      "ArtifactId": "aws-msk-iam-auth",
      "Version": "1.1.6"
   }      
}]
            
For the Apache Kafka connector:  

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "org.apache.flink",

      "ArtifactId": "flink-connector-kafka",
      "Version": "1.15.4"

   }      
}]
```

Untuk menambahkan konektor ini ke notebook yang ada, gunakan operasi [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)API dan tentukan sebagai `MavenReference` tipe `CustomArtifactsConfigurationUpdate` data.

**catatan**  
Anda dapat mengatur `failOnError` ke true untuk `flink-sql-connector-kinesis` konektor di API tabel.

## Tambahkan dependensi dan konektor khusus
<a name="zeppelin-custom-connectors"></a>

Untuk menggunakan Konsol Manajemen AWS cara menambahkan dependensi atau konektor kustom ke notebook Studio Anda, ikuti langkah-langkah berikut:

1. Unggah file konektor kustom Anda ke Amazon S3.

1. Di bagian Konsol Manajemen AWS, pilih opsi **Custom create** untuk membuat notebook Studio Anda.

1. Ikuti alur kerja pembuatan notebook Studio hingga Anda sampai di langkah **Konfigurasi**.

1. Di bagian **Custom connectors** (Konektor kustom), pilih **Add custom connector** (Tambahkan konektor kustom).

1. Tentukan lokasi Amazon S3 dari dependensi atau konektor kustom.

1. Pilih **Save changes** (Simpan perubahan).

Untuk menambahkan JAR dependensi atau konektor kustom saat Anda membuat notebook Studio baru menggunakan [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)API, tentukan lokasi Amazon S3 dari JAR dependensi atau konektor kustom dalam `CustomArtifactsConfiguration` tipe data. Untuk menambahkan dependensi atau konektor kustom ke notebook Studio yang ada, jalankan operasi [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)API dan tentukan lokasi Amazon S3 dari JAR dependensi atau konektor khusus dalam tipe data. `CustomArtifactsConfigurationUpdate`

**catatan**  
Ketika Anda menyertakan dependensi atau konektor kustom, Anda juga harus menyertakan semua dependensi transitif yang tidak digabungkan di dalamnya.

# Menerapkan fungsi yang ditentukan pengguna
<a name="how-zeppelin-udf"></a>

Fungsi yang ditentukan pengguna (UDFs) adalah titik ekstensi yang memungkinkan Anda memanggil logika yang sering digunakan atau logika khusus yang tidak dapat dinyatakan sebaliknya dalam kueri. Anda dapat menggunakan Python atau bahasa JVM seperti Java atau Scala untuk mengimplementasikan paragraf UDFs dalam buku catatan Studio Anda. Anda juga dapat menambahkan file JAR eksternal notebook Studio yang berisi UDFs diimplementasikan dalam bahasa JVM. 

Saat menerapkan kelas abstrak register JARs yang subclass `UserDefinedFunction` (atau kelas abstrak Anda sendiri), gunakan cakupan yang disediakan di Apache Maven, deklarasi `compileOnly` dependensi di Gradle, cakupan yang disediakan di SBT, atau direktif yang setara dalam konfigurasi build proyek UDF Anda. Ini memungkinkan kode sumber UDF untuk dikompilasi terhadap Flink APIs, tetapi kelas Flink API tidak termasuk dalam artefak build. Lihat [pom](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47) ini dari contoh toples UDF yang mematuhi prasyarat tersebut pada proyek Maven. 

**catatan**  
*Untuk contoh penyiapan, lihat [Menerjemahkan, menyunting, dan menganalisis data streaming menggunakan fungsi SQL dengan Amazon Managed Service untuk Apache Flink, Amazon Translate, dan Amazon Comprehend di Blog Machine Learning](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/).AWS *

Untuk menggunakan konsol untuk menambahkan file JAR UDF ke notebook Studio Anda, ikuti langkah-langkah berikut:

1. Upload file JAR UDF Anda ke Amazon S3.

1. Di bagian Konsol Manajemen AWS, pilih opsi **Custom create** untuk membuat notebook Studio Anda.

1. Ikuti alur kerja pembuatan notebook Studio hingga Anda sampai di langkah **Konfigurasi**.

1. Di bagian **User-defined functions** (Fungsi yang ditetapkan pengguna), pilih **Add user-defined function** (Tambahkan fungsi yang ditetapkan pengguna).

1. Tentukan lokasi Amazon S3 dari file JAR atau file ZIP yang memiliki implementasi UDF Anda.

1. Pilih **Simpan perubahan**.

Untuk menambahkan UDF JAR saat membuat notebook Studio baru menggunakan [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)API, tentukan lokasi JAR dalam tipe `CustomArtifactConfiguration` data. Untuk menambahkan UDF JAR ke notebook Studio yang ada, jalankan operasi [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)API dan tentukan lokasi JAR dalam tipe `CustomArtifactsConfigurationUpdate` data. Atau, Anda dapat menggunakan Konsol Manajemen AWS untuk menambahkan file UDF JAR ke notebook Studio Anda.

## Pertimbangan dengan fungsi yang ditentukan pengguna
<a name="how-zeppelin-udf-considerations"></a>
+ Managed Service untuk Apache Flink Studio menggunakan [terminologi Apache Zeppelin](https://zeppelin.apache.org/docs/0.9.0/quickstart/explore_ui.html) dimana notebook adalah contoh Zeppelin yang dapat berisi beberapa catatan. Setiap catatan kemudian dapat berisi beberapa paragraf. Dengan Managed Service for Apache Flink Studio, proses interpreter dibagikan di semua catatan di buku catatan. Jadi jika Anda melakukan registrasi fungsi eksplisit menggunakan [createTemporarySystemFungsi](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableEnvironment.html#createTemporarySystemFunction-java.lang.String-java.lang.Class-) dalam satu catatan, hal yang sama dapat direferensikan apa adanya di catatan lain dari buku catatan yang sama. 

  Operasi *Deploy sebagai aplikasi* bekerja pada catatan *individual* dan tidak semua catatan di notebook. Saat Anda melakukan penerapan sebagai aplikasi, hanya konten catatan aktif yang digunakan untuk menghasilkan aplikasi. Registrasi fungsi eksplisit apa pun yang dilakukan di notebook lain bukan merupakan bagian dari dependensi aplikasi yang dihasilkan. Selain itu, selama Deploy sebagai opsi aplikasi pendaftaran fungsi implisit terjadi dengan mengubah nama kelas utama JAR ke string huruf kecil.

   Misalnya, jika `TextAnalyticsUDF` adalah kelas utama untuk UDF JAR, maka registrasi implisit akan menghasilkan nama fungsi. `textanalyticsudf` Jadi jika pendaftaran fungsi eksplisit di catatan 1 Studio terjadi seperti berikut ini, maka semua catatan lain di buku catatan itu (katakanlah catatan 2) dapat merujuk fungsi dengan nama `myNewFuncNameForClass` karena penerjemah bersama:

  `stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())`

   Namun selama penerapan sebagai operasi aplikasi pada catatan 2, pendaftaran eksplisit ini *tidak akan disertakan* dalam dependensi dan karenanya aplikasi yang diterapkan tidak akan berfungsi seperti yang diharapkan. Karena pendaftaran implisit, secara default semua referensi ke fungsi ini diharapkan bersama `textanalyticsudf` dan tidak`myNewFuncNameForClass`.

   Jika ada kebutuhan untuk pendaftaran nama fungsi kustom maka catatan 2 itu sendiri diharapkan berisi paragraf lain untuk melakukan pendaftaran eksplisit lainnya sebagai berikut: 

  ```
  %flink(parallelism=l)
  import com.amazonaws.kinesis.udf.textanalytics.TextAnalyticsUDF 
  # re-register the JAR for UDF with custom name
  stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())
  ```

  ```
  %flink. ssql(type=update, parallelism=1) 
  INSERT INTO
      table2
  SELECT
      myNewFuncNameForClass(column_name)
  FROM
      table1
  ;
  ```
+ Jika UDF JAR Anda menyertakan Flink SDKs, maka konfigurasikan proyek Java Anda sehingga kode sumber UDF dapat dikompilasi terhadap Flink SDKs, tetapi kelas Flink SDK tidak termasuk dalam artefak build, misalnya JAR. 

  Anda dapat menggunakan `provided` cakupan di Apache Maven, deklarasi `compileOnly` dependensi di Gradle, `provided` cakupan di SBT, atau direktif yang setara dalam konfigurasi build proyek UDF mereka. Anda dapat merujuk ke [pom](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47) ini dari contoh toples UDF, yang mematuhi prasyarat seperti itu pada proyek maven. Untuk step-by-step tutorial selengkapnya, lihat [Terjemahkan, edit, dan analisis data streaming menggunakan fungsi SQL dengan Amazon Managed Service untuk Apache Flink, Amazon Translate, dan Amazon Comprehend](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/).

# Aktifkan pos pemeriksaan
<a name="how-zeppelin-checkpoint"></a>

Anda mengaktifkan checkpointing menggunakan pengaturan lingkungan. Untuk informasi tentang checkpointing, lihat [Toleransi Kesalahan](https://docs.aws.amazon.com/managed-flink/latest/java/how-fault.html) di [Managed Service for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/) Developer Guide.

## Mengatur interval checkpointing
<a name="how-zeppelin-checkpoint-interval"></a>

Contoh kode Scala berikut mengatur interval titik pemeriksaan aplikasi Anda ke satu menit:

```
// start a checkpoint every 1 minute
stenv.enableCheckpointing(60000)
```

Contoh kode Phyton berikut mengatur interval titik pemeriksaan aplikasi Anda ke satu menit:

```
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.interval", "1min"
)
```

## Mengatur jenis checkpointing
<a name="how-zeppelin-checkpoint-type"></a>

Contoh kode Scala berikut mengatur mode titik pemeriksaan aplikasi Anda ke `EXACTLY_ONCE` (default):

```
// set mode to exactly-once (this is the default)
stenv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
```

Contoh kode Phyton berikut mengatur mode titik pemeriksaan aplikasi Anda ke `EXACTLY_ONCE` (default):

```
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.mode", "EXACTLY_ONCE"
)
```

# Tingkatkan Waktu Proses Studio
<a name="upgrading-studio-runtime"></a>

Bagian ini berisi informasi tentang cara memutakhirkan Runtime notebook Studio Anda. Kami menyarankan Anda untuk selalu meningkatkan ke Studio Runtime terbaru yang didukung.

## Upgrade notebook Anda ke Studio Runtime baru
<a name="upgrading-notebook"></a>

Bergantung pada cara Anda menggunakan Studio, langkah-langkah untuk meningkatkan Runtime Anda berbeda. Pilih opsi yang sesuai dengan kasus penggunaan Anda.

### Kueri SQL atau kode Python tanpa dependensi eksternal
<a name="notebook-no-dependencies"></a>

Jika Anda menggunakan SQL atau Python tanpa dependensi eksternal, gunakan proses upgrade Runtime berikut. Kami menyarankan Anda meningkatkan ke versi Runtime terbaru. Proses pemutakhiran sama, tanpa belakang dari versi Runtime yang Anda tingkatkan. 

1. Buat notebook Studio baru menggunakan Runtime terbaru.

1. Salin dan tempel kode setiap catatan dari buku catatan lama ke buku catatan baru.

1. Di notebook baru, sesuaikan kode agar kompatibel dengan fitur Apache Flink apa pun yang telah berubah dari versi sebelumnya.
   + Jalankan notebook baru. Buka notebook dan jalankan catatan demi catatan, secara berurutan, dan uji apakah berhasil.
   + Buat perubahan yang diperlukan pada kode.
   + Hentikan notebook baru.

1. Jika Anda telah menggunakan notebook lama sebagai aplikasi:
   + Terapkan notebook baru sebagai aplikasi baru yang terpisah.
   + Hentikan aplikasi lama.
   + Jalankan aplikasi baru tanpa snapshot.

1. Hentikan notebook lama jika sedang berjalan. Mulai notebook baru, sesuai kebutuhan, untuk penggunaan interaktif.

**Alur proses untuk upgrade tanpa dependensi eksternal**

![\[Diagram berikut menunjukkan alur kerja yang disarankan untuk meng-upgrade notebook Anda tanpa dependensi eksternal.\]](http://docs.aws.amazon.com/id_id/managed-flink/latest/java/images/MSF-Studio-upgrade-without-dependencies.png)


### Kueri SQL atau kode Python dengan dependensi eksternal
<a name="notebook-dependencies"></a>

Ikuti proses ini jika Anda menggunakan SQL atau Python dan menggunakan dependensi eksternal seperti konektor atau artefak khusus, seperti fungsi yang ditentukan pengguna yang diimplementasikan dalam Python atau Java. Kami menyarankan Anda meningkatkan ke Runtime terbaru. Prosesnya sama, terlepas dari versi Runtime yang Anda tingkatkan.

1. Buat notebook Studio baru menggunakan Runtime terbaru.

1. Salin dan tempel kode setiap catatan dari buku catatan lama ke buku catatan baru.

1. Perbarui dependensi eksternal dan artefak khusus.
   + Cari konektor baru yang kompatibel dengan versi Apache Flink dari Runtime baru. Lihat [Konektor Tabel & SQL](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/overview/) dalam dokumentasi Apache Flink untuk menemukan konektor yang benar untuk versi Flink.
   + Perbarui kode fungsi yang ditentukan pengguna agar sesuai dengan perubahan di Apache Flink API, dan dependensi Python atau JAR apa pun yang digunakan oleh fungsi yang ditentukan pengguna. Kemas ulang artefak kustom Anda yang diperbarui.
   + Tambahkan konektor dan artefak baru ini ke notebook baru.

1. Di notebook baru, sesuaikan kode agar kompatibel dengan fitur Apache Flink apa pun yang telah berubah dari versi sebelumnya.
   + Jalankan notebook baru. Buka notebook dan jalankan catatan demi catatan, secara berurutan, dan uji apakah berhasil.
   + Buat perubahan yang diperlukan pada kode.
   + Hentikan notebook baru.

1. Jika Anda telah menggunakan notebook lama sebagai aplikasi:
   + Terapkan notebook baru sebagai aplikasi baru yang terpisah.
   + Hentikan aplikasi lama.
   + Jalankan aplikasi baru tanpa snapshot.

1. Hentikan notebook lama jika sedang berjalan. Mulai notebook baru, sesuai kebutuhan, untuk penggunaan interaktif.

**Alur proses untuk meningkatkan dengan dependensi eksternal**

![\[Diagram berikut mewakili alur kerja yang disarankan untuk meng-upgrade notebook Anda dengan dependensi eksternal..\]](http://docs.aws.amazon.com/id_id/managed-flink/latest/java/images/MSF-Studio-upgrade-with-dependencies.png)


# Bekerja dengan AWS Glue
<a name="how-zeppelin-glue"></a>

Notebook Studio Anda menyimpan dan mendapatkan informasi tentang sumber data dan sink dari AWS Glue. Saat membuat buku catatan Studio, tentukan AWS Glue database yang berisi informasi koneksi. Saat Anda mengakses sumber data dan sink, Anda menentukan AWS Glue tabel yang terdapat dalam database. AWS Glue Tabel Anda menyediakan akses ke AWS Glue koneksi yang menentukan lokasi, skema, dan parameter sumber data dan tujuan Anda.

Notebook Studio menggunakan properti tabel untuk menyimpan data khusus aplikasi. Untuk informasi selengkapnya, lihat [Properti tabel](how-zeppelin-glue-properties.md).

Untuk contoh cara mengatur AWS Glue koneksi, database, dan tabel untuk digunakan dengan notebook Studio, lihat [Buat AWS Glue database](example-notebook.md#example-notebook-glue) di [Tutorial: Membuat notebook Studio di Managed Service untuk Apache Flink](example-notebook.md) tutorial.

# Properti tabel
<a name="how-zeppelin-glue-properties"></a>

Selain bidang data, AWS Glue tabel Anda memberikan informasi lain ke buku catatan Studio Anda menggunakan properti tabel. Managed Service untuk Apache Flink menggunakan properti AWS Glue tabel berikut:
+ [Tentukan nilai waktu Apache Flink](#how-zeppelin-glue-timestamp): Properti ini menentukan bagaimana Managed Service untuk Apache Flink memancarkan nilai waktu pemrosesan data internal Apache Flink.
+ [Gunakan konektor Flink dan properti format](#how-zeppelin-glue-connector): Properti ini memberikan informasi tentang aliran data Anda.

Untuk menambahkan properti ke AWS Glue tabel, lakukan hal berikut:

1. Masuk ke Konsol Manajemen AWS dan buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Dari daftar tabel, pilih tabel yang digunakan aplikasi Anda untuk menyimpan informasi koneksi datanya. Pilih **Action** (Tindakan), **Edit table details** (Edit detail tabel).

1. Di bawah **Table Properties** (Properti Tabel), masukkan **managed-flink.proctime** untuk **key** (kunci) dan **user\$1action\$1time** untuk **Value** (Nilai).

## Tentukan nilai waktu Apache Flink
<a name="how-zeppelin-glue-timestamp"></a>

Apache Flink memberikan nilai waktu yang menjelaskan kapan peristiwa pemrosesan aliran terjadi, seperti [ Processing Time](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#processing-time) (Waktu Pemrosesan) dan [ Event Time](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#event-time) (Waktu Peristiwa). Untuk menyertakan nilai-nilai ini dalam keluaran aplikasi Anda, Anda menentukan properti pada AWS Glue tabel yang memberi tahu runtime Managed Service for Apache Flink untuk memancarkan nilai-nilai ini ke dalam bidang yang ditentukan. 

Kunci dan nilai yang Anda gunakan dalam properti tabel Anda adalah sebagai berikut:


| Tipe Stempel Waktu | Key | Nilai | 
| --- |--- |--- |
| [Waktu Pemrosesan](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#processing-time) | dikelola flink.proctime | Nama kolom yang AWS Glue akan digunakan untuk mengekspos nilai. Nama kolom ini tidak sesuai dengan kolom tabel yang ada. | 
| [Waktu Acara](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#event-time) | dikelola flink.rowtime | Nama kolom yang AWS Glue akan digunakan untuk mengekspos nilai. Nama kolom ini sesuai dengan kolom tabel yang ada. | 
| dikelola flink.watermark. *column\$1name*.milidetik | Interval tanda air dalam milidetik | 

## Gunakan konektor Flink dan properti format
<a name="how-zeppelin-glue-connector"></a>

Anda memberikan informasi tentang sumber data Anda ke konektor Flink aplikasi Anda menggunakan properti tabel AWS Glue . Beberapa contoh properti yang Managed Service untuk Apache Flink gunakan untuk konektor adalah sebagai berikut:


| Tipe Konektor | Key | Nilai | 
| --- |--- |--- |
| [Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/kafka.html#connector-options) | format | Format yang digunakan untuk deserialisasi dan serialisasi pesan Kafka, misalnya atau. json csv | 
| scan.startup.mode | Mode startup untuk konsumen Kafka, misalnya earliest-offset atautimestamp. | 
| [Kinesis](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kinesis.html#connector-options) | format | Format yang digunakan untuk deserialisasi dan serialisasi catatan aliran data Kinesis, misalnya atau. json csv | 
| aws.region |  AWS Wilayah di mana aliran didefinisikan.  | 
| [S3 (Sistem File)](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html) | format | Format yang digunakan untuk deserialisasi dan serialisasi file, misalnya atau. json csv | 
| path | Jalur Amazon S3, mis. s3://mybucket/ | 

Untuk informasi selengkapnya tentang konektor lainnya selain Kinesis dan Apache Kafka, lihat dokumentasi konektor Anda.

# Contoh dan tutorial untuk notebook Studio di Managed Service untuk Apache Flink
<a name="how-zeppelin-examples"></a>

**Topics**
+ [Tutorial: Membuat notebook Studio di Managed Service untuk Apache Flink](example-notebook.md)
+ [Tutorial: Menyebarkan notebook Studio sebagai Layanan Terkelola untuk aplikasi Apache Flink dengan status tahan lama](example-notebook-deploy.md)
+ [Lihat contoh kueri untuk menganalisis data di buku catatan Studio](how-zeppelin-sql-examples.md)

# Tutorial: Membuat notebook Studio di Managed Service untuk Apache Flink
<a name="example-notebook"></a>

Tutorial berikut menunjukkan cara membuat notebook Studio yang membaca data dari aliran data Kinesis atau cluster MSK Amazon.

**Topics**
+ [Lengkapi prasyarat](#example-notebook-setup)
+ [Buat AWS Glue database](#example-notebook-glue)
+ [Langkah selanjutnya: Buat notebook Studio dengan Kinesis Data Streams atau Amazon MSK](#examples-notebook-nextsteps)
+ [Buat notebook Studio dengan Kinesis Data Streams](example-notebook-streams.md)
+ [Buat notebook Studio dengan Amazon MSK](example-notebook-msk.md)
+ [Bersihkan aplikasi Anda dan sumber daya yang bergantung](example-notebook-cleanup.md)

## Lengkapi prasyarat
<a name="example-notebook-setup"></a>

Pastikan versi Anda AWS CLI adalah versi 2 atau yang lebih baru. Untuk menginstal yang terbaru AWS CLI, lihat [Menginstal, memperbarui, dan menghapus instalasi AWS CLI versi 2](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html).

## Buat AWS Glue database
<a name="example-notebook-glue"></a>

Notebook Studio Anda menggunakan basis data [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) untuk metadata tentang sumber data Amazon MSK Anda.

**Buat AWS Glue Database**

1. Buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Pilih **Add database** (Tambahkan basis data). Di jendela **Add database** (Tambahkan basis data), masukkan **default** untuk **Database name** (Nama basis data). Pilih **Create** (Buat). 

## Langkah selanjutnya: Buat notebook Studio dengan Kinesis Data Streams atau Amazon MSK
<a name="examples-notebook-nextsteps"></a>

Dengan tutorial ini, Anda dapat membuat notebook Studio yang menggunakan Kinesis Data Streams atau Amazon MSK:
+ [Buat notebook Studio dengan Kinesis Data Streams](example-notebook-streams.md): Dengan Kinesis Data Streams, Anda dengan cepat membuat aplikasi yang menggunakan aliran data Kinesis sebagai sumber. Anda hanya perlu membuat Kinesis data stream sebagai sumber daya dependen.
+ [Buat notebook Studio dengan Amazon MSK](example-notebook-msk.md): Dengan Amazon MSK, Anda membuat aplikasi yang menggunakan klaster Amazon MSK sebagai sumber. Anda perlu membuat Amazon VPC, instans klien Amazon EC2, dan klaster Amazon MSK sebagai sumber daya dependen.

# Buat notebook Studio dengan Kinesis Data Streams
<a name="example-notebook-streams"></a>

Tutorial ini menjelaskan cara membuat notebook Studio yang menggunakan Kinesis data stream sebagai sumber.

**Topics**
+ [Lengkapi prasyarat](#example-notebook-streams-setup)
+ [Buat AWS Glue tabel](#example-notebook-streams-glue)
+ [Buat notebook Studio dengan Kinesis Data Streams](#example-notebook-streams-create)
+ [Kirim data ke Kinesis data stream Anda](#example-notebook-streams-send)
+ [Uji notebook Studio Anda](#example-notebook-streams-test)

## Lengkapi prasyarat
<a name="example-notebook-streams-setup"></a>

Sebelum Anda membuat notebook Studio, buat Kinesis data stream (`ExampleInputStream`). Aplikasi Anda menggunakan aliran ini untuk sumber aplikasi.

Anda dapat membuat aliran ini menggunakan konsol Amazon Kinesis atau perintah AWS CLI . Untuk instruksi konsol, lihat [Membuat dan Memperbarui Aliran Data](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) di *Panduan Developer Amazon Kinesis Data Streams*. Beri nama aliran **ExampleInputStream** dan atur **Number of open shards** (Jumlah serpihan terbuka) ke **1**.

Untuk membuat stream (`ExampleInputStream`) menggunakan AWS CLI, gunakan perintah Amazon Kinesis `create-stream` AWS CLI berikut.

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## Buat AWS Glue tabel
<a name="example-notebook-streams-glue"></a>

Notebook Studio Anda menggunakan basis data [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) untuk metadata tentang sumber data Kinesis Data Streams Anda.

**catatan**  
Anda dapat membuat database secara manual terlebih dahulu atau Anda dapat membiarkan Managed Service for Apache Flink membuatnya untuk Anda saat Anda membuat buku catatan. Demikian pula, Anda dapat membuat tabel secara manual seperti yang dijelaskan di bagian ini, atau Anda dapat menggunakan kode konektor buat tabel untuk Layanan Terkelola untuk Apache Flink di buku catatan Anda dalam Apache Zeppelin untuk membuat tabel Anda melalui pernyataan DDL. Anda kemudian dapat check-in AWS Glue untuk memastikan tabel dibuat dengan benar.

**Buat Tabel**

1. Masuk ke Konsol Manajemen AWS dan buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Jika Anda belum memiliki AWS Glue database, pilih **Database** dari bilah navigasi kiri. Pilih **Add database** (Tambahkan basis data). Di jendela **Add database** (Tambahkan basis data), masukkan **default** untuk **Database name** (Nama basis data). Pilih **Create** (Buat).

1. Di bilah navigasi sebelah kiri, pilih **Tables** (Tabel). Di halaman **Tabel**, pilih **Add tables** (Tambahkan tabel), **Add table manually** (Tambahkan tabel secara manual).

1. Di halaman **Set up your table's properties** (Siapkan properti tabel Anda), masukkan **stock** untuk **Table name** (Nama tabel). Pastikan Anda memilih basis data yang Anda buat sebelumnya. Pilih **Berikutnya**.

1. Di halaman **Tambahkan penyimpanan data**, pilih **Kinesis**. Untuk **Stream name** (Nama aliran), masukkan **ExampleInputStream**. untuk **Kinesis source URL** (URL sumber Kinesis), pilih masukkan **https://kinesis.us-east-1.amazonaws.com**. Jika Anda menyalin dan menempel **URL sumber Kinesis**, pastikan untuk menghapus spasi awal atau akhir. Pilih **Berikutnya**.

1. Di halaman **Klasifikasi**, pilih **JSON**. Pilih **Berikutnya**.

1. Di halaman **Tentukan skema**, pilih Add Column (Tambahkan kolom) untuk menambahkan kolom. Tambahkan kolom dengan properti berikut:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/managed-flink/latest/java/example-notebook-streams.html)

   Pilih **Berikutnya**.

1. Di halaman berikutnya, verifikasi pengaturan Anda, dan pilih **Finish** (Selesai).

1. Pilih tabel yang baru dibuat dari daftar tabel.

1. Pilih **Edit table** (Edit tabel) dan tambahkan properti dengan kunci `managed-flink.proctime` dan nilai `proctime`.

1. Pilih **Apply** (Terapkan).

## Buat notebook Studio dengan Kinesis Data Streams
<a name="example-notebook-streams-create"></a>

Sekarang Anda sudah membuat sumber daya yang digunakan aplikasi Anda, Anda membuat notebook Studio Anda. 

**Topics**
+ [Buat notebook Studio menggunakan Konsol Manajemen AWS](#example-notebook-create-streams-console)
+ [Buat notebook Studio menggunakan AWS CLI](#example-notebook-msk-create-api)

### Buat notebook Studio menggunakan Konsol Manajemen AWS
<a name="example-notebook-create-streams-console"></a>

1. [Buka Layanan Terkelola untuk konsol Apache Flink di https://console.aws.amazon.com/managed-flink/ rumah? region=us-east-1\$1/aplikasi/dasbor](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard). 

1. Di halaman **Managed Service for Apache Flink Apache Applications**, pilih tab **Studio**. Pilih **Create Studio notebook** (Buat notebook Studio).
**catatan**  
Anda juga dapat membuat notebook Studio dari konsol Amazon MSK atau Kinesis Data Streams dengan memilih klaster Amazon MSK input atau Kinesis data stream, dan memilih **Process data in real time** (Proses data secara langsung).

1. Di halaman **Buat notebook Studio**, berikan informasi berikut:
   + Masukkan **MyNotebook** untuk nama notebook.
   + Pilih **default** untuk **Basis data AWS Glue**.

   Pilih **Create Studio notebook** (Buat notebook Studio).

1. Di **MyNotebook**halaman, pilih **Jalankan**. Tunggu **Status** hingga menampilkan **Running** (Berjalan). Biaya berlaku saat notebook berjalan.

### Buat notebook Studio menggunakan AWS CLI
<a name="example-notebook-msk-create-api"></a>

Untuk membuat notebook Studio menggunakan AWS CLI, lakukan hal berikut:

1. Verifikasi ID akun Anda. Anda memerlukan nilai ini untuk membuat aplikasi Anda.

1. Buat peran `arn:aws:iam::AccountID:role/ZeppelinRole` dan tambahkan izin berikut ke peran yang dibuat secara otomatis oleh konsol.

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. Buat file bernama `create.json` dengan konten berikut. Ganti nilai placeholder dengan informasi Anda.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Jalankan perintah berikut untuk membuat aplikasi Anda.

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. Setelah perintah selesai, Anda melihat output yang menampilkan detail untuk notebook Studio baru Anda. Berikut adalah contoh output.

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Jalankan perintah berikut untuk memulai aplikasi Anda. Ganti nilai sampel dengan ID akun Anda.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Kirim data ke Kinesis data stream Anda
<a name="example-notebook-streams-send"></a>

Untuk mengirim data uji ke Kinesis data stream, lakukan hal berikut:

1. Buka [ Kinesis Data Generator](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). 

1. Pilih **Buat Pengguna Cognito** dengan. CloudFormation

1.  CloudFormation Konsol terbuka dengan template Kinesis Data Generator. Pilih **Berikutnya**.

1. Di halaman **Tentukan detail tumpukan**, masukkan nama pengguna dan kata sandi pengguna Cognito Anda. Pilih **Berikutnya**.

1. Di halaman **Konfigurasikan opsi tumpukan**, pilih **Next** (Berikutnya).

1. Di halaman **Review Kinesis-Data-Generator-Cognito -User**, pilih yang **saya akui yang AWS CloudFormation mungkin membuat sumber daya IAM**. kotak centang. Pilih **Buat tumpukan**.

1. Tunggu CloudFormation tumpukan selesai dibuat. **Setelah tumpukan selesai, buka tumpukan **Kinesis-Data-Generator-Cognito-User** di konsol, dan pilih tab Output. CloudFormation ** Buka URL yang terdaftar untuk nilai **KinesisDataGeneratorUrl**output.

1. Di halaman **Amazon Kinesis Data Generator**, masuk dengan kredensial yang Anda buat di langkah 4.

1. Di halaman berikutnya, berikan nilai berikut:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/managed-flink/latest/java/example-notebook-streams.html)

   Untuk **Record Template** (Templat Catatan), tempel kode berikut:

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. Pilih **Send data** (Kirim data).

1. Generator akan mengirimkan data ke Kinesis data stream Anda. 

   Biarkan generator berjalan sewaktu Anda menyelesaikan bagian berikutnya.

## Uji notebook Studio Anda
<a name="example-notebook-streams-test"></a>

Di bagian ini, Anda menggunakan notebook Studio untuk mengkueri data dari Kinesis data stream Anda.

1. [Buka Layanan Terkelola untuk konsol Apache Flink di https://console.aws.amazon.com/managed-flink/ rumah? region=us-east-1\$1/aplikasi/dasbor](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard).

1. Pada halaman **Managed Service for Apache Flink Apache Applications**, pilih tab **notebook Studio**. Pilih **MyNotebook**.

1. Di **MyNotebook**halaman, pilih **Buka di Apache Zeppelin**.

   Antarmuka Apache Zeppelin terbuka di tab baru.

1. Di halaman **Selamat Datang di Zeppelin\$1**, pilih **Zeppelin Note** (Catatan Zeppelin).

1. Di halaman **Zeppelin Note** (Catatan Zeppelin), masukkan kueri berikut ke dalam catatan baru:

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Pilih ikon jalankan.

   Setelah beberapa saat, catatan menampilkan data dari Kinesis data stream.

Untuk membuka Dasbor Apache Flink untuk aplikasi Anda agar dapat melihat aspek operasional, pilih **FLINK JOB** (TUGAS FLINK). Untuk informasi selengkapnya tentang Dasbor Flink, lihat Dasbor [Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) di [Managed Service for Apache](https://docs.aws.amazon.com/) Flink Developer Guide.

Untuk contoh kueri SQL Flink Streaming selengkapnya, lihat [Kueri](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) di [Dokumentasi Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Buat notebook Studio dengan Amazon MSK
<a name="example-notebook-msk"></a>

Tutorial ini menjelaskan cara membuat notebook Studio yang menggunakan klaster Amazon MSK sebagai sumber.

**Topics**
+ [Siapkan kluster MSK Amazon](#example-notebook-msk-setup)
+ [Tambahkan gateway NAT ke VPC Anda](#example-notebook-msk-nat)
+ [Buat AWS Glue koneksi dan tabel](#example-notebook-msk-glue)
+ [Buat notebook Studio dengan Amazon MSK](#example-notebook-msk-create)
+ [Kirim data ke klaster Amazon MSK Anda](#example-notebook-msk-send)
+ [Uji notebook Studio Anda](#example-notebook-msk-test)

## Siapkan kluster MSK Amazon
<a name="example-notebook-msk-setup"></a>

Untuk tutorial ini, Anda memerlukan klaster Amazon MSK yang memungkinkan akses plaintext. Jika Anda belum menyiapkan kluster MSK Amazon, ikuti tutorial [Memulai Menggunakan Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) untuk membuat Amazon VPC, kluster MSK Amazon, topik, dan instance klien Amazon. EC2 

Saat mengikuti tutorial, lakukan hal berikut:
+ Di [Langkah 3: Buat Klaster Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html), di langkah 4, ubah nilai `ClientBroker` dari `TLS` ke **PLAINTEXT**.

## Tambahkan gateway NAT ke VPC Anda
<a name="example-notebook-msk-nat"></a>

Jika Anda membuat klaster Amazon MSK dengan mengikuti tutorial [Memulai Menggunakan Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html), atau jika Amazon VPC Anda yang sudah ada tidak memiliki gateway NAT untuk subnet privatnya, Anda harus menambahkan Gateway NAT ke Amazon VPC Anda. Diagram berikut menunjukkan arsitektur. 

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/id_id/managed-flink/latest/java/images/vpc_05.png)


Untuk membuat gateway NAT untuk VPC Amazon Anda, lakukan hal berikut:

1. Buka konsol VPC Amazon di. [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)

1. Pilih **NAT Gateways** (Gateway NAT) dari bilah navigasi sebelah kiri.

1. Di halaman **Gateway NAT**, pilih **Create NAT Gateway** (Buat Gateway NAT).

1. Di halaman **Buat Gateway NAT**, berikan nilai berikut:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/managed-flink/latest/java/example-notebook-msk.html)

   Pilih **Create NAT Gateway** (Buat Gateway NAT).

1. Di bilah navigasi sebelah kiri, pilih **Route Tables** (Tabel Rute).

1. Pilih **Create Route Table** (Buat Tabel Rute).

1. Di halaman **Create route table** (Buat tabel rute), berikan informasi berikut:
   + **Name tag** (Tanda nama): **ZeppelinRouteTable**
   + **VPC****: Pilih VPC Anda (misalnya VPC).AWS KafkaTutorial**

   Pilih **Buat**.

1. Dalam daftar tabel rute, pilih **ZeppelinRouteTable**. Pilih tab **Routes** (Rute), dan pilih **Edit routes** (Edit rute).

1. Di halaman **Edit Rute**, pilih **Add route** (Tambahkan rute).

1. Di ****Untuk **Tujuan**, masukkan **0.0.0.0/0**. Untuk **Target**, pilih **NAT Gateway**, **ZeppelinGateway**. Pilih **Save Routes** (Simpan Rute). Pilih **Close** (Tutup).

1. Pada halaman Tabel Rute, dengan **ZeppelinRouteTable**dipilih, pilih tab **Asosiasi Subnet**. Pilih **Edit subnet associations** (Edit asosiasi subnet).

1. Di halaman **Edit asosiasi subnet**, pilih **AWS KafkaTutorialSubnet2** dan **AWS KafkaTutorialSubnet3**. Pilih **Simpan**.

## Buat AWS Glue koneksi dan tabel
<a name="example-notebook-msk-glue"></a>

Notebook Studio Anda menggunakan basis data [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) untuk metadata tentang sumber data Amazon MSK Anda. Di bagian ini, Anda membuat AWS Glue sambungan yang menjelaskan cara mengakses kluster MSK Amazon, dan AWS Glue tabel yang menjelaskan cara menyajikan data dalam sumber data ke klien seperti buku catatan Studio Anda. 

**Buat Koneksi**

1. Masuk ke Konsol Manajemen AWS dan buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Jika Anda belum memiliki AWS Glue database, pilih **Database** dari bilah navigasi kiri. Pilih **Add database** (Tambahkan basis data). Di jendela **Add database** (Tambahkan basis data), masukkan **default** untuk **Database name** (Nama basis data). Pilih **Create** (Buat).

1. Pilih **Connections** (Koneksi) dari bilah navigasi sebelah kiri. Pilih **Add Connection** (Tambahkan Koneksi).

1. Di jendela **Tambahkan Koneksi**, berikan nilai berikut:
   + Untuk **Connection name** (Nama koneksi), masukkan **ZeppelinConnection**.
   + Untuk **Connection type** (Tipe koneksi), pilih **Kafka**.
   + Untuk **server bootstrap Kafka URLs**, berikan string broker bootstrap untuk cluster Anda. Anda bisa mendapatkan broker bootstrap dari konsol MSK, atau dengan memasukkan perintah CLI berikut:

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + Hapus centang di kotak centang **Require SSL connection** (Perlu koneksi SSL).

   Pilih **Berikutnya**.

1. Di halaman **VPC**, berikan nilai berikut:
   + **Untuk **VPC**, pilih nama VPC Anda (misalnya VPC.) AWS KafkaTutorial**
   + Untuk **Subnet**, pilih **AWS KafkaTutorialSubnet2**.
   + Untuk **Security groups** (Grup keamanan), pilih semua grup yang tersedia.

   Pilih **Berikutnya**.

1. Di halaman **Properti koneksi** / **Akses koneksi**, pilih **Finish** (Selesai).

**Buat Tabel**
**catatan**  
Anda dapat membuat tabel secara manual seperti yang dijelaskan dalam langkah-langkah berikut, atau Anda dapat menggunakan kode konektor buat tabel untuk Layanan Terkelola untuk Apache Flink di buku catatan Anda dalam Apache Zeppelin untuk membuat tabel Anda melalui pernyataan DDL. Anda kemudian dapat check-in AWS Glue untuk memastikan tabel dibuat dengan benar.

1. Di bilah navigasi sebelah kiri, pilih **Tables** (Tabel). Di halaman **Tabel**, pilih **Add tables** (Tambahkan tabel), **Add table manually** (Tambahkan tabel secara manual).

1. Di halaman **Set up your table's properties** (Siapkan properti tabel Anda), masukkan **stock** untuk **Table name** (Nama tabel). Pastikan Anda memilih basis data yang Anda buat sebelumnya. Pilih **Berikutnya**.

1. Di halaman **Tambahkan penyimpanan data**, pilih **Kafka**. Untuk **nama Topik**, masukkan nama topik Anda (mis. **AWS KafkaTutorialTopic**). Untuk **Koneksi**, pilih **ZeppelinConnection**.

1. Di halaman **Klasifikasi**, pilih **JSON**. Pilih **Berikutnya**.

1. Di halaman **Tentukan skema**, pilih Add Column (Tambahkan kolom) untuk menambahkan kolom. Tambahkan kolom dengan properti berikut:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/managed-flink/latest/java/example-notebook-msk.html)

   Pilih **Berikutnya**.

1. Di halaman berikutnya, verifikasi pengaturan Anda, dan pilih **Finish** (Selesai).

1. Pilih tabel yang baru dibuat dari daftar tabel.

1. Pilih **Edit tabel** dan tambahkan properti berikut:
   + kunci:`managed-flink.proctime`, nilai: `proctime`
   + kunci:`flink.properties.group.id`, nilai: `test-consumer-group`
   + kunci:`flink.properties.auto.offset.reset`, nilai: `latest`
   + kunci:`classification`, nilai: `json`

   Tanpa pasangan kunci/nilai ini, notebook Flink mengalami kesalahan. 

1. Pilih **Terapkan**.

## Buat notebook Studio dengan Amazon MSK
<a name="example-notebook-msk-create"></a>

Sekarang Anda sudah membuat sumber daya yang digunakan aplikasi Anda, Anda membuat notebook Studio Anda. 

**Topics**
+ [Buat notebook Studio menggunakan Konsol Manajemen AWS](#example-notebook-create-msk-console)
+ [Buat notebook Studio menggunakan AWS CLI](#example-notebook-msk-create-api)

**catatan**  
Anda juga dapat membuat notebook Studio dari konsol Amazon MSK dengan memilih klaster yang sudah ada, lalu memilih **Process data in real time** (Proses data secara langsung).

### Buat notebook Studio menggunakan Konsol Manajemen AWS
<a name="example-notebook-create-msk-console"></a>

1. [Buka Layanan Terkelola untuk konsol Apache Flink di https://console.aws.amazon.com/managed-flink/ rumah? region=us-east-1\$1/aplikasi/dasbor](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard).

1. Di halaman **Managed Service for Apache Flink Apache Applications**, pilih tab **Studio**. Pilih **Create Studio notebook** (Buat notebook Studio).
**catatan**  
Untuk membuat notebook Studio dari konsol Amazon MSK atau Kinesis Data Streams pilih klaster Amazon MSK input atau Kinesis data stream Anda, lalu pilih **Process data in real time** (Proses data secara langsung).

1. Di halaman **Buat notebook Studio**, berikan informasi berikut:
   + Masukkan **MyNotebook** untuk **Studio notebook Name** (Nama notebook Studio).
   + Pilih **default** untuk **Basis data AWS Glue**.

   Pilih **Create Studio notebook** (Buat notebook Studio).

1. Di **MyNotebook**halaman, pilih tab **Konfigurasi**. Di bagian **Jaringan**, pilih **Edit**.

1. Di MyNotebook halaman **Edit jaringan untuk**, pilih **konfigurasi VPC berdasarkan kluster MSK Amazon**. Pilih klaster Amazon MSK untuk **Amazon MSK Cluster** (Klaster Amazon MSK). Pilih **Simpan perubahan**.

1. Di **MyNotebook**halaman, pilih **Jalankan**. Tunggu **Status** hingga menampilkan **Running** (Berjalan).

### Buat notebook Studio menggunakan AWS CLI
<a name="example-notebook-msk-create-api"></a>

Untuk membuat buku catatan Studio menggunakan AWS CLI, lakukan hal berikut:

1. Pastikan bahwa Anda memiliki informasi berikut. Anda perlu nilai-nilai ini untuk membuat aplikasi Anda.
   + ID akun Anda.
   + ID subnet IDs dan grup keamanan untuk VPC Amazon yang berisi kluster MSK Amazon Anda.

1. Buat file bernama `create.json` dengan konten berikut. Ganti nilai placeholder dengan informasi Anda.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Jalankan perintah berikut untuk membuat aplikasi Anda.

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. Setelah perintah selesai, Anda akan melihat output yang serupa dengan yang berikut, yang menampilkan detail untuk notebook Studio baru Anda:

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Jalankan perintah berikut untuk memulai aplikasi Anda. Ganti nilai sampel dengan ID akun Anda.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Kirim data ke klaster Amazon MSK Anda
<a name="example-notebook-msk-send"></a>

Di bagian ini, Anda menjalankan skrip Python di EC2 klien Amazon Anda untuk mengirim data ke sumber data MSK Amazon Anda.

1. Connect ke EC2 klien Amazon Anda.

1. Jalankan perintah berikut untuk menginstal Python versi 3, Pip, dan Kafka untuk paket Python, dan mengonfirmasi tindakan:

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. Konfigurasikan AWS CLI pada mesin klien Anda dengan memasukkan perintah berikut:

   ```
   aws configure
   ```

   Berikan kredensial akun Anda, dan **us-east-1** untuk `region`.

1. Buat file bernama `stock.py` dengan konten berikut. Ganti nilai sampel dengan string Bootstrap Brokers kluster MSK Amazon Anda, dan perbarui nama topik jika topik Anda bukan **AWS KafkaTutorialTopic**:

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. Jalankan skrip dengan perintah berikut:

   ```
   $ python3 stock.py
   ```

1. Biarkan skrip berjalan saat Anda menyelesaikan bagian berikut.

## Uji notebook Studio Anda
<a name="example-notebook-msk-test"></a>

Di bagian ini, Anda menggunakan notebook Studio Anda untuk mengkueri data dari klaster Amazon MSK Anda.

1. [Buka Layanan Terkelola untuk konsol Apache Flink di https://console.aws.amazon.com/managed-flink/ rumah? region=us-east-1\$1/aplikasi/dasbor](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard).

1. Pada halaman **Managed Service for Apache Flink Apache Applications**, pilih tab **notebook Studio**. Pilih **MyNotebook**.

1. Di **MyNotebook**halaman, pilih **Buka di Apache Zeppelin**.

   Antarmuka Apache Zeppelin terbuka di tab baru.

1. Di halaman **Selamat Datang di Zeppelin\$1**, pilih **Zeppelin new note** (Catatan baru Zeppelin).

1. Di halaman **Zeppelin Note** (Catatan Zeppelin), masukkan kueri berikut ke dalam catatan baru:

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Pilih ikon jalankan.

   Aplikasi menampilkan data dari klaster Amazon MSK.

Untuk membuka Dasbor Apache Flink untuk aplikasi Anda agar dapat melihat aspek operasional, pilih **FLINK JOB** (TUGAS FLINK). Untuk informasi selengkapnya tentang Dasbor Flink, lihat Dasbor [Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) di [Managed Service for Apache](https://docs.aws.amazon.com/) Flink Developer Guide.

Untuk contoh kueri SQL Flink Streaming selengkapnya, lihat [Kueri](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) di [Dokumentasi Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Bersihkan aplikasi Anda dan sumber daya yang bergantung
<a name="example-notebook-cleanup"></a>

## Hapus notebook Studio Anda
<a name="example-notebook-cleanup-app"></a>

1. Buka Layanan Terkelola untuk konsol Apache Flink.

1. Pilih **MyNotebook**.

1. Pilih **Actions** (Tindakan), lalu **Delete** (Hapus).

## Hapus AWS Glue database dan koneksi Anda
<a name="example-notebook-cleanup-glue"></a>

1. Buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Pilih **Databases** (Basis Data) dari bilah navigasi sebelah kiri. Centang kotak centang di sebelah **Default** untuk memilihnya. Pilih **Action** (Tindakan), **Delete Database** (Hapus Basis Data). Konfirmasikan pilihan Anda.

1. Pilih **Connections** (Koneksi) dari bilah navigasi sebelah kiri. Centang kotak di sebelah untuk **ZeppelinConnection**memilihnya. Pilih **Action** (Tindakan), **Delete Connection** (Hapus Koneksi). Konfirmasikan pilihan Anda.

## Hapus IAM role dan kebijakan IAM Anda
<a name="example-notebook-msk-cleanup-iam"></a>

1. Buka konsol IAM di [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Pilih **Roles** (Peran) dari bilah navigasi sebelah kiri.

1. Gunakan bilah pencarian untuk mencari **ZeppelinRole**peran.

1. Pilih **ZeppelinRole**peran. Pilih **Delete Role** (Hapus Peran). Konfirmasi penghapusan.

## Hapus grup CloudWatch log Anda
<a name="example-notebook-cleanup-cw"></a>

Konsol membuat grup CloudWatch Log dan aliran log untuk Anda saat Anda membuat aplikasi menggunakan konsol. Anda tidak memiliki grup dan aliran log jika Anda membuat aplikasi menggunakan AWS CLI.

1. Buka CloudWatch konsol di [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. Pilih **Log groups** (Grup log) dari bilah navigasi sebelah kiri.

1. Pilih grup**/AWS/KinesisAnalytics/MyNotebook**log.

1. Pilih **Actions** (Tindakan), **Delete log group(s)** (Hapus grup log). Konfirmasi penghapusan.

## Bersihkan sumber daya Kinesis Data Streams
<a name="example-notebook-cleanup-streams"></a>

Untuk menghapus aliran Kinesis, buka konsol Kinesis Data Streams, pilih aliran Kinesis, lalu pilih **Actions** (Tindakan), **Delete** (Hapus).

## Bersihkan sumber daya MSK
<a name="example-notebook-cleanup-msk"></a>

Ikuti langkah-langkah di bagian ini jika Anda membuat klaster Amazon MSK untuk tutorial ini. Bagian ini berisi petunjuk untuk membersihkan instans klien Amazon EC2, Amazon VPC, dan klaster Amazon MSK Anda.

### Hapus kluster MSK Amazon Anda
<a name="example-notebook-msk-cleanup-msk"></a>

Ikuti langkah-langkah ini jika Anda membuat klaster Amazon MSK untuk tutorial ini.

1. Buka konsol MSK Amazon di [https://console.aws.amazon.com/msk/rumah? region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. Pilih **AWS KafkaTutorialCluster**. Pilih **Delete** (Hapus). Masukkan **delete** di jendela yang muncul, dan konfirmasikan pilihan Anda.

### Akhiri intans klien Anda
<a name="example-notebook-msk-cleanup-client"></a>

Ikuti langkah-langkah ini jika Anda membuat instans klien Amazon EC2 untuk tutorial ini.

1. Buka konsol Amazon EC2 di. [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/)

1. Pilih **Instances** (Instans) dari panel navigasi sebelah kiri.

1. Pilih kotak centang di sebelah untuk **ZeppelinClient**memilihnya.

1. Pilih **Instance State** (Status Instans), **Terminate Instance** (Akhiri Instans).

### Hapus Amazon VPC Anda
<a name="example-notebook-msk-cleanup-vpc"></a>

Ikuti langkah-langkah ini jika Anda membuat klaster Amazon VPC untuk tutorial ini.

1. Buka konsol Amazon EC2 di. [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/)

1. Pilih **Network Interfaces** (Antarmuka Jaringan) dari bilah navigasi sebelah kiri.

1. Masukkan ID VPC Anda di bilah pencarian dan tekan enter untuk mencari.

1. Pilih kotak centang di header tabel untuk memilih semua antarmuka jaringan yang ditampilkan.

1. Pilih **Actions** (Tindakan), **Detach** (Lepaskan). Di jendela yang muncul, pilih **Enable** (Aktifkan) di bawah **Force detachment** (Lepas paksa). Pilih **Detach** (Lepaskan), dan tunggu hingga semua antarmuka jaringan mencapai status **Available** (Tersedia).

1. Pilih kotak centang di header tabel untuk memilih lagi semua antarmuka jaringan yang ditampilkan.

1. Pilih **Actions** (Tindakan), **Delete** (Hapus). Konfirmasikan tindakan.

1. Buka konsol Amazon VPC di. [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)

1. Pilih **AWS KafkaTutorialVPC**. Pilih **Actions** (Tindakan), **Delete VPC** (Hapus VPC). Masukkan **delete** dan konfirmasikan penghapusan.

# Tutorial: Menyebarkan notebook Studio sebagai Layanan Terkelola untuk aplikasi Apache Flink dengan status tahan lama
<a name="example-notebook-deploy"></a>

Tutorial berikut menunjukkan cara menyebarkan notebook Studio sebagai Layanan Terkelola untuk aplikasi Apache Flink dengan status tahan lama.

**Topics**
+ [Prasyarat lengkap](#example-notebook-durable-setup)
+ [Menyebarkan aplikasi dengan status tahan lama menggunakan Konsol Manajemen AWS](#example-notebook-deploy-console)
+ [Menyebarkan aplikasi dengan status tahan lama menggunakan AWS CLI](#example-notebook-deploy-cli)

## Prasyarat lengkap
<a name="example-notebook-durable-setup"></a>

Buat notebook Studio baru dengan mengikuti [Tutorial: Membuat notebook Studio di Managed Service untuk Apache Flink](example-notebook.md), menggunakan Kinesis Data Streams atau Amazon MSK. Beri nama notebook Studio `ExampleTestDeploy`.

## Menyebarkan aplikasi dengan status tahan lama menggunakan Konsol Manajemen AWS
<a name="example-notebook-deploy-console"></a>

1. Tambahkan lokasi bucket S3 tempat Anda ingin kode yang dikemas disimpan di bawah **Lokasi kode aplikasi - *opsional*** di konsol. Ini mengaktifkan langkah-langkah untuk men-deploy dan menjalankan aplikasi Anda langsung dari notebook.

1. Tambahkan izin yang diperlukan ke peran aplikasi untuk mengaktifkan peran yang Anda gunakan untuk membaca dan menulis ke bucket Amazon S3, dan untuk meluncurkan Layanan Terkelola untuk aplikasi Apache Flink:
   + AmazonS3 FullAccess
   + Amazondikelola- flinkFullAccess
   + Akses ke sumber, tujuan, dan VPCs sebagaimana berlaku. Untuk informasi selengkapnya, lihat [Tinjau izin IAM untuk notebook Studio](how-zeppelin-iam.md).

1. Gunakan kode sampel berikut:

   ```
   %flink.ssql(type=update) 
   CREATE TABLE exampleoutput (
     'ticket' VARCHAR,
     'price' DOUBLE
   )
   WITH (
     'connector' = 'kinesis',
     'stream' = 'ExampleOutputStream',
     'aws.region' = 'us-east-1',
     'scan.stream.initpos' = 'LATEST',
     'format' = 'json'
   );
   
   INSERT INTO exampleoutput SELECT ticker, price FROM exampleinputstream
   ```

1. Dengan peluncuran fitur ini, Anda akan melihat menu menurun baru di sudut kanan atas setiap catatan di notebook Anda dengan nama notebook. Anda dapat melakukan tindakan berikut:
   + Lihat pengaturan notebook Studio di Konsol Manajemen AWS.
   + Bangun Zeppelin Note dan ekspor ke Amazon S3. Di titik ini, beri nama aplikasi Anda dan pilih **Build and Export** (Bangun dan Ekspor). Anda akan mendapatkan notifikasi saat ekspor selesai.
   + Jika perlu, Anda dapat melihat dan menjalankan tes tambahan pada executable di Amazon S3.
   + Setelah selesai dibangun, Anda akan dapat men-deploy kode Anda sebagai aplikasi streaming Kinesis dengan status tahan lama dan penskalaan otomatis.
   + Gunakan menu menurun dan pilih **Deploy Zeppelin Note as Kinesis streaming application** (Deploy Zeppelin Note sebagai aplikasi streaming Kinesis). Tinjau nama aplikasi dan pilih **Deploy via AWS Console**.
   + Ini akan membawa Anda ke Konsol Manajemen AWS halaman untuk membuat Layanan Terkelola untuk aplikasi Apache Flink. Perhatikan bahwa nama aplikasi, paralelisme, lokasi kode, Glue DB default, VPC (jika berlaku) dan IAM role sudah diisi sebelumnya. Pastikan IAM role memiliki izin yang diperlukan untuk sumber dan tujuan Anda. Snapshot diaktifkan secara default untuk manajemen state aplikasi yang tahan lama.
   + Pilih **create application** (buat aplikasi).
   + Anda dapat memilih **configure** (konfigurasikan) dan mengubah pengaturan apa pun, lalu memilih **Run** (Jalankan) untuk memulai aplikasi streaming Anda.

## Menyebarkan aplikasi dengan status tahan lama menggunakan AWS CLI
<a name="example-notebook-deploy-cli"></a>

Untuk menyebarkan aplikasi menggunakan AWS CLI, Anda harus memperbarui AWS CLI untuk menggunakan model layanan yang disediakan dengan informasi Beta 2 Anda. Untuk informasi tentang cara menggunakan model layanan yang diperbarui, lihat [Lengkapi prasyaratPrasyarat lengkap](example-notebook.md#example-notebook-setup).

Kode contoh berikut membuat notebook Studio baru:

```
aws kinesisanalyticsv2 create-application \
     --application-name <app-name> \
     --runtime-environment ZEPPELIN-FLINK-3_0 \
     --application-mode INTERACTIVE \
     --service-execution-role <iam-role>
     --application-configuration '{ 
       "ZeppelinApplicationConfiguration": { 
         "CatalogConfiguration": { 
           "GlueDataCatalogConfiguration": { 
             "DatabaseARN": "arn:aws:glue:us-east-1:<account>:database/<glue-database-name>" 
           } 
         } 
       },
       "FlinkApplicationConfiguration": {
         "ParallelismConfiguration": {
           "ConfigurationType": "CUSTOM",
           "Parallelism": 4,
           "ParallelismPerKPU": 4
         }
       },
       "DeployAsApplicationConfiguration": {
            "S3ContentLocation": { 
               "BucketARN": "arn:aws:s3:::<s3bucket>",
               "BasePath": "/something/"
            }
        },
       "VpcConfigurations": [
         {
           "SecurityGroupIds": [
             "<security-group>"
           ],
           "SubnetIds": [
             "<subnet-1>",
             "<subnet-2>"
           ]
         }
       ]
     }' \
     --region us-east-1
```

Contoh kode berikut memulai notebook Studio baru:

```
aws kinesisanalyticsv2 start-application \
    --application-name <app-name> \
    --region us-east-1 \
    --no-verify-ssl
```

Kode berikut mengembalikan URL untuk halaman notebook Apache Zeppelin aplikasi:

```
aws kinesisanalyticsv2 create-application-presigned-url \
    --application-name <app-name> \
    --url-type ZEPPELIN_UI_URL \

    --region us-east-1 \
    --no-verify-ssl
```

# Lihat contoh kueri untuk menganalisis data di buku catatan Studio
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [Buat tabel dengan Amazon MSK/Apache Kafka](#how-zeppelin-examples-creating-tables)
+ [Buat tabel dengan Kinesis](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [Kueri jendela yang jatuh](#how-zeppelin-examples-tumbling)
+ [Kueri jendela geser](#how-zeppelin-examples-sliding)
+ [Gunakan SQL interaktif](#how-zeppelin-examples-interactive-sql)
+ [Gunakan konektor BlackHole SQL](#how-zeppelin-examples-blackhole-connector-sql)
+ [Gunakan Scala untuk menghasilkan data sampel](#notebook-example-data-generator)
+ [Gunakan Scala interaktif](#notebook-example-interactive-scala)
+ [Gunakan Python interaktif](#notebook-example-interactive-python)
+ [Gunakan kombinasi Python interaktif, SQL, dan Scala](#notebook-example-interactive-pythonsqlscala)
+ [Gunakan aliran data Kinesis lintas akun](#notebook-example-crossaccount-kds)

Untuk informasi tentang pengaturan kueri SQL Apache Flink, lihat [Flink pada Notebook Zeppelin untuk Analisis Data Interaktif](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html).

Untuk melihat aplikasi Anda di dasbor Apache Flink, pilih **FLINK JOB** (TUGAS FLINK) di halaman **Zeppelin Note** aplikasi Anda.

Untuk informasi selengkapnya tentang kueri jendela, lihat [Windows](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html) (Jendela) di [Dokumentasi Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

Untuk contoh kueri SQL Apache Flink Streaming selengkapnya, lihat [Kueri](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) di [Dokumentasi Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

## Buat tabel dengan Amazon MSK/Apache Kafka
<a name="how-zeppelin-examples-creating-tables"></a>

Anda dapat menggunakan konektor Amazon MSK Flink dengan Managed Service for Apache Flink Studio untuk mengautentikasi koneksi Anda dengan otentikasi Plaintext, SSL, atau IAM. Buat tabel Anda menggunakan properti spesifik sesuai kebutuhan Anda.

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

Anda dapat menggabungkan ini dengan properti lain di [Apache Kafka SQL](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/) Connector.

## Buat tabel dengan Kinesis
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

Dalam contoh berikut, Anda membuat tabel menggunakan Kinesis:

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

Untuk informasi selengkapnya tentang properti lain yang dapat Anda gunakan, lihat [Konektor SQL Amazon Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/).

## Kueri jendela yang jatuh
<a name="how-zeppelin-examples-tumbling"></a>

Kueri SQL Flink Streaming berikut memilih harga tertinggi di setiap jendela tumbling lima detik dari tabel `ZeppelinTopic`:

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## Kueri jendela geser
<a name="how-zeppelin-examples-sliding"></a>

Kueri SQL Apache Flink Streaming berikut memilih harga tertinggi di setiap jendela geser lima detik dari tabel `ZeppelinTopic`:

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## Gunakan SQL interaktif
<a name="how-zeppelin-examples-interactive-sql"></a>

Contoh ini mencetak maks. waktu peristiwa dan waktu pemrosesan serta jumlah nilai dari tabel nilai kunci. Pastikan Anda memiliki skrip pembuatan data sampel dari [Gunakan Scala untuk menghasilkan data sampel](#notebook-example-data-generator) yang berjalan. Untuk mencoba kueri SQL lainnya seperti filter dan gabung di notebook Studio Anda, lihat dokumentasi Apache Flink: [Kueri](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) di dokumentasi Apache Flink.

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## Gunakan konektor BlackHole SQL
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

Konektor BlackHole SQL tidak mengharuskan Anda membuat aliran data Kinesis atau kluster MSK Amazon untuk menguji kueri Anda. Untuk informasi tentang konektor BlackHole SQL, lihat Konektor [BlackHole SQL dalam dokumentasi](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html) Apache Flink. Dalam contoh ini, katalog default adalah katalog dalam memori.

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## Gunakan Scala untuk menghasilkan data sampel
<a name="notebook-example-data-generator"></a>

Contoh ini menggunakan Scala untuk menghasilkan data sampel. Anda dapat menggunakan data sampel ini untuk menguji berbagai kueri. Gunakan pernyataan buat tabel untuk membuat tabel nilai kunci.

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## Gunakan Scala interaktif
<a name="notebook-example-interactive-scala"></a>

Ini adalah terjemahan Scala dari [Gunakan SQL interaktif](#how-zeppelin-examples-interactive-sql). Untuk contoh Scala lainnya, lihat [Tabel API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) di dokumentasi Apache Flink.

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Gunakan Python interaktif
<a name="notebook-example-interactive-python"></a>

Ini adalah terjemahan Python dari [Gunakan SQL interaktif](#how-zeppelin-examples-interactive-sql). Untuk contoh Python lainnya, lihat [Tabel API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) di dokumentasi Apache Flink. 

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Gunakan kombinasi Python interaktif, SQL, dan Scala
<a name="notebook-example-interactive-pythonsqlscala"></a>

Anda dapat menggunakan kombinasi SQL, Python, dan Scala apa pun di notebook Anda untuk analisis interaktif. Dalam notebook Studio yang Anda rencanakan untuk di-deploy sebagai aplikasi dengan status tahan lama, Anda dapat menggunakan kombinasi SQL dan Scala. Contoh ini menunjukkan bagian yang diabaikan dan bagian yang dapat digunakan dalam aplikasi dengan status tahan lama.

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## Gunakan aliran data Kinesis lintas akun
<a name="notebook-example-crossaccount-kds"></a>

Untuk menggunakan Kinesis data stream yang ada di akun selain akun yang memiliki notebook Studio, buat peran eksekusi layanan di akun tempat notebook Studio Anda berjalan dan kebijakan kepercayaan peran di akun yang memiliki aliran data. Gunakan `aws.credentials.provider`, `aws.credentials.role.arn`, dan `aws.credentials.role.sessionName` di konektor Kinesis dalam pernyataan DDL buat tabel Anda untuk membuat tabel pada aliran data.

Gunakan peran eksekusi layanan berikut untuk akun notebook Studio.

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

Gunakan kebijakan `AmazonKinesisFullAccess` dan kebijakan kepercayaan peran berikut untuk akun aliran data.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

Gunakan paragraf berikut untuk membuat pernyataan tabel.

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```

# Memecahkan masalah notebook Studio untuk Layanan Terkelola untuk Apache Flink
<a name="how-zeppelin-troubleshooting"></a>

Bagian ini berisi informasi pemecahan masalah untuk notebook Studio.

## Hentikan aplikasi yang macet
<a name="how-zeppelin-troubleshooting-stopping"></a>

Untuk menghentikan aplikasi yang terjebak dalam keadaan transien, panggil [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)tindakan dengan `Force` parameter yang disetel ke. `true` Untuk informasi selengkapnya, lihat [Menjalankan Aplikasi](https://docs.aws.amazon.com/managed-flink/latest/java/how-running-apps.html) di [Managed Service for Apache Flink Developer](https://docs.aws.amazon.com/managed-flink/latest/java/) Guide.

## Terapkan sebagai aplikasi dengan status tahan lama di VPC tanpa akses internet
<a name="how-zeppelin-troubleshooting-deploying-no-internet"></a>

 deploy-as-applicationFungsi Managed Service for Apache Flink Studio tidak mendukung aplikasi VPC tanpa akses internet. Kami menyarankan Anda membangun aplikasi Anda di Studio, dan kemudian menggunakan Managed Service for Apache Flink untuk membuat aplikasi Flink secara manual dan memilih file zip yang Anda buat di Notebook Anda.

Langkah-langkah berikut menguraikan pendekatan ini: 

1. Buat dan ekspor aplikasi Studio Anda ke Amazon S3. Ini harus berupa file zip. 

1. Buat Layanan Terkelola untuk aplikasi Apache Flink secara manual dengan jalur kode yang mereferensikan lokasi file zip di Amazon S3. Selain itu, Anda perlu mengkonfigurasi aplikasi dengan `env` variabel berikut (total 2`groupID`, 3`var`): 

1. kinesis.analytics.flink.run.options

   1. python: source/note.py

   1. jarfile: PythonApplicationDependencies lib/ .jar

1. terkelola.deploy\$1as\$1app.options

   1. DatabaSearN: *<glue database ARN (Amazon Resource Name)>*

1. Anda mungkin perlu memberikan izin ke Layanan Terkelola untuk Apache Flink Studio dan Layanan Terkelola untuk peran IAM Apache Flink untuk layanan yang digunakan aplikasi Anda. Anda dapat menggunakan peran IAM yang sama untuk kedua aplikasi.

## Deploy-as-app ukuran dan pengurangan waktu pembuatan
<a name="how-zeppelin-troubleshooting-deploying-as-app-reduce-build-time"></a>

Studio deploy-as-app untuk aplikasi Python mengemas semua yang tersedia di lingkungan Python karena kami tidak dapat menentukan pustaka mana yang Anda butuhkan. Ini dapat menghasilkan ukuran yang lebih besar dari yang diperlukan. deploy-as-app Prosedur berikut menunjukkan cara mengurangi ukuran ukuran aplikasi deploy-as-app Python dengan menghapus dependensi.

Jika Anda sedang membangun aplikasi Python dengan deploy-as-app fitur dari Studio, Anda dapat mempertimbangkan untuk menghapus paket Python yang sudah diinstal sebelumnya dari sistem jika aplikasi Anda tidak bergantung pada. Ini tidak hanya akan membantu mengurangi ukuran artefak akhir untuk menghindari pelanggaran batas layanan untuk ukuran aplikasi, tetapi juga meningkatkan waktu pembuatan aplikasi dengan fitur tersebut deploy-as-app.

Anda dapat menjalankan perintah berikut untuk mencantumkan semua paket Python yang diinstal dengan ukuran terinstal masing-masing dan secara selektif menghapus paket dengan ukuran yang signifikan.

```
%flink.pyflink

!pip list --format freeze | awk -F = {'print $1'} | xargs pip show | grep -E 'Location:|Name:' | cut -d ' ' -f 2 | paste -d ' ' - - | awk '{gsub("-","_",$1); print $2 "/" tolower($1)}' | xargs du -sh 2> /dev/null | sort -hr
```

**catatan**  
`apache-beam`diperlukan oleh Flink Python untuk beroperasi. Anda tidak boleh menghapus paket ini dan dependensinya.

Berikut ini adalah daftar paket Python pra-instal di Studio V2 yang dapat dipertimbangkan untuk dihapus:

```
scipy
statsmodels
plotnine
seaborn
llvmlite
bokeh
pandas
matplotlib
botocore
boto3
numba
```

**Untuk menghapus paket Python dari notebook Zeppelin:**

1. Periksa apakah aplikasi Anda bergantung pada paket, atau paket konsumsinya, sebelum menghapusnya. [Anda dapat mengidentifikasi dependan paket menggunakan pipdeptree.](https://pypi.org/project/pipdeptree/)

1. Menjalankan perintah berikut untuk menghapus paket:

   ```
   %flink.pyflink
   !pip uninstall -y <package-to-remove>
   ```

1. Jika Anda perlu mengambil paket yang Anda hapus karena kesalahan, jalankan perintah berikut:

   ```
   %flink.pyflink
   !pip install <package-to-install>
   ```

**Example Contoh: Hapus `scipy` paket sebelum menerapkan aplikasi deploy-as-app Python Anda dengan fitur.**  

1. Gunakan `pipdeptree` untuk menemukan semua `scipy` konsumen dan verifikasi apakah Anda dapat menghapus dengan aman`scipy`.
   + Instal alat melalui notebook:

     ```
     %flink.pyflink             
     !pip install pipdeptree
     ```
   + Dapatkan pohon ketergantungan terbalik `scipy` dengan menjalankan:

     ```
     %flink.pyflink
     !pip -r -p scipy
     ```

     Anda akan melihat output yang mirip dengan berikut ini (diringkas untuk singkatnya):

     ```
     ...
     ------------------------------------------------------------------------ 
     scipy==1.8.0 
     ├── plotnine==0.5.1 [requires: scipy>=1.0.0] 
     ├── seaborn==0.9.0 [requires: scipy>=0.14.0] 
     └── statsmodels==0.12.2 [requires: scipy>=1.1] 
         └── plotnine==0.5.1 [requires: statsmodels>=0.8.0]
     ```

1. Hati-hati memeriksa penggunaan`seaborn`, `statsmodels` dan `plotnine` dalam aplikasi Anda. Jika aplikasi Anda tidak bergantung pada salah satu`scipy`,`seaborn`,`statemodels`, atau`plotnine`, Anda dapat menghapus semua paket ini, atau hanya paket yang tidak diperlukan aplikasi Anda.

1. Hapus paket dengan menjalankan:

   ```
   !pip uninstall -y scipy plotnine seaborn statemodels
   ```

## Batalkan pekerjaan
<a name="how-notbook-canceling-jobs"></a>

Bagian ini menunjukkan cara untuk membatalkan tugas Apache Flink yang tidak bisa Anda dapatkan dari Apache Zeppelin. Jika Anda ingin membatalkan tugas seperti itu, buka dasbor Apache Flink, salin ID tugas, lalu gunakan di salah satu contoh berikut.

Untuk membatalkan satu tugas:

```
%flink.pyflink
import requests

requests.patch("https://zeppelin-flink:8082/jobs/[job_id]", verify=False)
```

Untuk membatalkan semua tugas yang sedang berjalan:

```
%flink.pyflink
import requests

r = requests.get("https://zeppelin-flink:8082/jobs", verify=False)
jobs = r.json()['jobs']

for job in jobs:
    if (job["status"] == "RUNNING"):
        print(requests.patch("https://zeppelin-flink:8082/jobs/{}".format(job["id"]), verify=False))
```

Untuk membatalkan semua tugas:

```
%flink.pyflink
import requests

r = requests.get("https://zeppelin-flink:8082/jobs", verify=False)
jobs = r.json()['jobs']

for job in jobs:
    requests.patch("https://zeppelin-flink:8082/jobs/{}".format(job["id"]), verify=False)
```

## Mulai ulang penerjemah Apache Flink
<a name="how-notbook-restarting-interpreter"></a>

Untuk memulai ulang interpreter Apache Flink dalam notebook Studio Anda

1. Pilih **Configuration** (Konfigurasi) di dekat sudut kanan atas layar.

1. Pilih **Interpreter**.

1. Pilih **restart** (mulai ulang), lalu **OK**.

# Buat kebijakan IAM khusus untuk Managed Service untuk notebook Apache Flink Studio
<a name="how-zeppelin-appendix-iam"></a>

Anda biasanya menggunakan kebijakan IAM terkelola untuk mengizinkan aplikasi Anda mengakses sumber daya dependen. Jika Anda memerlukan kontrol yang lebih baik atas izin aplikasi Anda, Anda dapat menggunakan kebijakan IAM kustom. Bagian ini berisi contoh kebijakan IAM kustom.

**catatan**  
Dalam contoh kebijakan berikut, ganti teks placeholder dengan nilai-nilai aplikasi Anda.

**Topics**
+ [AWS Glue](#how-zeppelin-iam-glue)
+ [CloudWatch Log](#how-zeppelin-iam-cw)
+ [Aliran Kinesis](#how-zeppelin-iam-streams)
+ [Klaster Amazon MSK](#how-zeppelin-iam-msk)

## AWS Glue
<a name="how-zeppelin-iam-glue"></a>

Contoh kebijakan berikut memberikan izin untuk mengakses database. AWS Glue 

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "GlueTable",
            "Effect": "Allow",
            "Action": [
                "glue:GetConnection",
                "glue:GetTable",
                "glue:GetTables",
                "glue:GetDatabase",
                "glue:CreateTable",
                "glue:UpdateTable"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:123456789012:connection/*",
                "arn:aws:glue:us-east-1:123456789012:table/<database-name>/*",
                "arn:aws:glue:us-east-1:123456789012:database/<database-name>",
                "arn:aws:glue:us-east-1:123456789012:database/hive",
                "arn:aws:glue:us-east-1:123456789012:catalog"
            ]
        },
        {
            "Sid": "GlueDatabase",
            "Effect": "Allow",
            "Action": "glue:GetDatabases",
            "Resource": "*"
        }
    ]
}
```

------

## CloudWatch Log
<a name="how-zeppelin-iam-cw"></a>

Kebijakan berikut memberikan izin untuk mengakses CloudWatch Log:

```
{
      "Sid": "ListCloudwatchLogGroups",
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogGroups"
      ],
      "Resource": [
        "arn:aws:logs:<region>:<accountId>:log-group:*"
      ]
    },
    {
      "Sid": "ListCloudwatchLogStreams",
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogStreams"
      ],
      "Resource": [
        "<logGroupArn>:log-stream:*"
      ]
    },
    {
      "Sid": "PutCloudwatchLogs",
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents"
      ],
      "Resource": [
        "<logStreamArn>"
      ]
    }
```

**catatan**  
Jika Anda membuat aplikasi menggunakan konsol, konsol akan menambahkan kebijakan yang diperlukan untuk mengakses CloudWatch Log ke peran aplikasi Anda.

## Aliran Kinesis
<a name="how-zeppelin-iam-streams"></a>

Aplikasi Anda dapat menggunakan Aliran Kinesis untuk sumber atau tujuan. Aplikasi Anda memerlukan izin baca untuk membaca dari aliran sumber, dan izin tulis untuk menulis ke aliran tujuan.

Kebijakan berikut memberikan izin untuk membaca dari Aliran Kinesis yang digunakan sebagai sumber:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisShardDiscovery",
            "Effect": "Allow",
            "Action": "kinesis:ListShards",
            "Resource": "*"
        },
        {
            "Sid": "KinesisShardConsumption",
            "Effect": "Allow",
            "Action": [
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:DeregisterStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>"
        },
        {
            "Sid": "KinesisEfoConsumer",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamConsumer",
                "kinesis:SubscribeToShard"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>/consumer/*"
        }
    ]
}
```

------

Kebijakan berikut memberikan izin untuk menulis ke Aliran Kinesis yang digunakan sebagai tujuan:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisStreamSink",
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:DescribeStreamSummary",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>"
        }
    ]
}
```

------

Jika aplikasi Anda mengakses aliran Kinesis terenkripsi, Anda harus memberikan izin tambahan untuk mengakses aliran dan kunci enkripsi aliran. 

Kebijakan berikut memberikan izin untuk mengakses aliran sumber terenkripsi dan kunci enkripsi aliran:

```
{
      "Sid": "ReadEncryptedKinesisStreamSource",
      "Effect": "Allow",
      "Action": [
        "kms:Decrypt"
      ],
      "Resource": [
        "<inputStreamKeyArn>"
      ]
    }
    ,
```

Kebijakan berikut memberikan izin untuk mengakses aliran tujuan terenkripsi dan kunci enkripsi aliran:

```
{
      "Sid": "WriteEncryptedKinesisStreamSink",
      "Effect": "Allow",
      "Action": [
        "kms:GenerateDataKey"
      ],
      "Resource": [
        "<outputStreamKeyArn>"
      ]
    }
```

## Klaster Amazon MSK
<a name="how-zeppelin-iam-msk"></a>

Untuk memberikan akses ke klaster Amazon MSK, Anda memberikan akses ke VPC klaster. Untuk contoh kebijakan untuk mengakses Amazon VPC, lihat [Izin Aplikasi VPC](https://docs.aws.amazon.com/managed-flink/latest/java/vpc-permissions.html).