

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Contoh dan tutorial untuk notebook Studio di Managed Service untuk Apache Flink
<a name="how-zeppelin-examples"></a>

**Topics**
+ [Tutorial: Membuat notebook Studio di Managed Service untuk Apache Flink](example-notebook.md)
+ [Tutorial: Menyebarkan notebook Studio sebagai Layanan Terkelola untuk aplikasi Apache Flink dengan status tahan lama](example-notebook-deploy.md)
+ [Lihat contoh kueri untuk menganalisis data di buku catatan Studio](how-zeppelin-sql-examples.md)

# Tutorial: Membuat notebook Studio di Managed Service untuk Apache Flink
<a name="example-notebook"></a>

Tutorial berikut menunjukkan cara membuat notebook Studio yang membaca data dari aliran data Kinesis atau cluster MSK Amazon.

**Topics**
+ [Lengkapi prasyarat](#example-notebook-setup)
+ [Buat AWS Glue database](#example-notebook-glue)
+ [Langkah selanjutnya: Buat notebook Studio dengan Kinesis Data Streams atau Amazon MSK](#examples-notebook-nextsteps)
+ [Buat notebook Studio dengan Kinesis Data Streams](example-notebook-streams.md)
+ [Buat notebook Studio dengan Amazon MSK](example-notebook-msk.md)
+ [Bersihkan aplikasi Anda dan sumber daya yang bergantung](example-notebook-cleanup.md)

## Lengkapi prasyarat
<a name="example-notebook-setup"></a>

Pastikan versi Anda AWS CLI adalah versi 2 atau yang lebih baru. Untuk menginstal yang terbaru AWS CLI, lihat [Menginstal, memperbarui, dan menghapus instalasi AWS CLI versi 2](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html).

## Buat AWS Glue database
<a name="example-notebook-glue"></a>

Notebook Studio Anda menggunakan basis data [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) untuk metadata tentang sumber data Amazon MSK Anda.

**Buat AWS Glue Database**

1. Buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Pilih **Add database** (Tambahkan basis data). Di jendela **Add database** (Tambahkan basis data), masukkan **default** untuk **Database name** (Nama basis data). Pilih **Create** (Buat). 

## Langkah selanjutnya: Buat notebook Studio dengan Kinesis Data Streams atau Amazon MSK
<a name="examples-notebook-nextsteps"></a>

Dengan tutorial ini, Anda dapat membuat notebook Studio yang menggunakan Kinesis Data Streams atau Amazon MSK:
+ [Buat notebook Studio dengan Kinesis Data Streams](example-notebook-streams.md): Dengan Kinesis Data Streams, Anda dengan cepat membuat aplikasi yang menggunakan aliran data Kinesis sebagai sumber. Anda hanya perlu membuat Kinesis data stream sebagai sumber daya dependen.
+ [Buat notebook Studio dengan Amazon MSK](example-notebook-msk.md): Dengan Amazon MSK, Anda membuat aplikasi yang menggunakan klaster Amazon MSK sebagai sumber. Anda perlu membuat Amazon VPC, instans klien Amazon EC2, dan klaster Amazon MSK sebagai sumber daya dependen.

# Buat notebook Studio dengan Kinesis Data Streams
<a name="example-notebook-streams"></a>

Tutorial ini menjelaskan cara membuat notebook Studio yang menggunakan Kinesis data stream sebagai sumber.

**Topics**
+ [Lengkapi prasyarat](#example-notebook-streams-setup)
+ [Buat AWS Glue tabel](#example-notebook-streams-glue)
+ [Buat notebook Studio dengan Kinesis Data Streams](#example-notebook-streams-create)
+ [Kirim data ke Kinesis data stream Anda](#example-notebook-streams-send)
+ [Uji notebook Studio Anda](#example-notebook-streams-test)

## Lengkapi prasyarat
<a name="example-notebook-streams-setup"></a>

Sebelum Anda membuat notebook Studio, buat Kinesis data stream (`ExampleInputStream`). Aplikasi Anda menggunakan aliran ini untuk sumber aplikasi.

Anda dapat membuat aliran ini menggunakan konsol Amazon Kinesis atau perintah AWS CLI . Untuk instruksi konsol, lihat [Membuat dan Memperbarui Aliran Data](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) di *Panduan Developer Amazon Kinesis Data Streams*. Beri nama aliran **ExampleInputStream** dan atur **Number of open shards** (Jumlah serpihan terbuka) ke **1**.

Untuk membuat stream (`ExampleInputStream`) menggunakan AWS CLI, gunakan perintah Amazon Kinesis `create-stream` AWS CLI berikut.

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## Buat AWS Glue tabel
<a name="example-notebook-streams-glue"></a>

Notebook Studio Anda menggunakan basis data [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) untuk metadata tentang sumber data Kinesis Data Streams Anda.

**catatan**  
Anda dapat membuat database secara manual terlebih dahulu atau Anda dapat membiarkan Managed Service for Apache Flink membuatnya untuk Anda saat Anda membuat buku catatan. Demikian pula, Anda dapat membuat tabel secara manual seperti yang dijelaskan di bagian ini, atau Anda dapat menggunakan kode konektor buat tabel untuk Layanan Terkelola untuk Apache Flink di buku catatan Anda dalam Apache Zeppelin untuk membuat tabel Anda melalui pernyataan DDL. Anda kemudian dapat check-in AWS Glue untuk memastikan tabel dibuat dengan benar.

**Buat Tabel**

1. Masuk ke Konsol Manajemen AWS dan buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Jika Anda belum memiliki AWS Glue database, pilih **Database** dari bilah navigasi kiri. Pilih **Add database** (Tambahkan basis data). Di jendela **Add database** (Tambahkan basis data), masukkan **default** untuk **Database name** (Nama basis data). Pilih **Create** (Buat).

1. Di bilah navigasi sebelah kiri, pilih **Tables** (Tabel). Di halaman **Tabel**, pilih **Add tables** (Tambahkan tabel), **Add table manually** (Tambahkan tabel secara manual).

1. Di halaman **Set up your table's properties** (Siapkan properti tabel Anda), masukkan **stock** untuk **Table name** (Nama tabel). Pastikan Anda memilih basis data yang Anda buat sebelumnya. Pilih **Berikutnya**.

1. Di halaman **Tambahkan penyimpanan data**, pilih **Kinesis**. Untuk **Stream name** (Nama aliran), masukkan **ExampleInputStream**. untuk **Kinesis source URL** (URL sumber Kinesis), pilih masukkan **https://kinesis.us-east-1.amazonaws.com**. Jika Anda menyalin dan menempel **URL sumber Kinesis**, pastikan untuk menghapus spasi awal atau akhir. Pilih **Berikutnya**.

1. Di halaman **Klasifikasi**, pilih **JSON**. Pilih **Berikutnya**.

1. Di halaman **Tentukan skema**, pilih Add Column (Tambahkan kolom) untuk menambahkan kolom. Tambahkan kolom dengan properti berikut:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/managed-flink/latest/java/example-notebook-streams.html)

   Pilih **Berikutnya**.

1. Di halaman berikutnya, verifikasi pengaturan Anda, dan pilih **Finish** (Selesai).

1. Pilih tabel yang baru dibuat dari daftar tabel.

1. Pilih **Edit table** (Edit tabel) dan tambahkan properti dengan kunci `managed-flink.proctime` dan nilai `proctime`.

1. Pilih **Apply** (Terapkan).

## Buat notebook Studio dengan Kinesis Data Streams
<a name="example-notebook-streams-create"></a>

Sekarang Anda sudah membuat sumber daya yang digunakan aplikasi Anda, Anda membuat notebook Studio Anda. 

**Topics**
+ [Buat notebook Studio menggunakan Konsol Manajemen AWS](#example-notebook-create-streams-console)
+ [Buat notebook Studio menggunakan AWS CLI](#example-notebook-msk-create-api)

### Buat notebook Studio menggunakan Konsol Manajemen AWS
<a name="example-notebook-create-streams-console"></a>

1. [Buka Layanan Terkelola untuk konsol Apache Flink di https://console.aws.amazon.com/managed-flink/ rumah? region=us-east-1\$1/aplikasi/dasbor](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard). 

1. Di halaman **Managed Service for Apache Flink Apache Applications**, pilih tab **Studio**. Pilih **Create Studio notebook** (Buat notebook Studio).
**catatan**  
Anda juga dapat membuat notebook Studio dari konsol Amazon MSK atau Kinesis Data Streams dengan memilih klaster Amazon MSK input atau Kinesis data stream, dan memilih **Process data in real time** (Proses data secara langsung).

1. Di halaman **Buat notebook Studio**, berikan informasi berikut:
   + Masukkan **MyNotebook** untuk nama notebook.
   + Pilih **default** untuk **Basis data AWS Glue**.

   Pilih **Create Studio notebook** (Buat notebook Studio).

1. Di **MyNotebook**halaman, pilih **Jalankan**. Tunggu **Status** hingga menampilkan **Running** (Berjalan). Biaya berlaku saat notebook berjalan.

### Buat notebook Studio menggunakan AWS CLI
<a name="example-notebook-msk-create-api"></a>

Untuk membuat notebook Studio menggunakan AWS CLI, lakukan hal berikut:

1. Verifikasi ID akun Anda. Anda memerlukan nilai ini untuk membuat aplikasi Anda.

1. Buat peran `arn:aws:iam::AccountID:role/ZeppelinRole` dan tambahkan izin berikut ke peran yang dibuat secara otomatis oleh konsol.

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. Buat file bernama `create.json` dengan konten berikut. Ganti nilai placeholder dengan informasi Anda.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Jalankan perintah berikut untuk membuat aplikasi Anda.

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. Setelah perintah selesai, Anda melihat output yang menampilkan detail untuk notebook Studio baru Anda. Berikut adalah contoh output.

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Jalankan perintah berikut untuk memulai aplikasi Anda. Ganti nilai sampel dengan ID akun Anda.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Kirim data ke Kinesis data stream Anda
<a name="example-notebook-streams-send"></a>

Untuk mengirim data uji ke Kinesis data stream, lakukan hal berikut:

1. Buka [ Kinesis Data Generator](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). 

1. Pilih **Buat Pengguna Cognito** dengan. CloudFormation

1.  CloudFormation Konsol terbuka dengan template Kinesis Data Generator. Pilih **Berikutnya**.

1. Di halaman **Tentukan detail tumpukan**, masukkan nama pengguna dan kata sandi pengguna Cognito Anda. Pilih **Berikutnya**.

1. Di halaman **Konfigurasikan opsi tumpukan**, pilih **Next** (Berikutnya).

1. Di halaman **Review Kinesis-Data-Generator-Cognito -User**, pilih yang **saya akui yang AWS CloudFormation mungkin membuat sumber daya IAM**. kotak centang. Pilih **Buat tumpukan**.

1. Tunggu CloudFormation tumpukan selesai dibuat. **Setelah tumpukan selesai, buka tumpukan **Kinesis-Data-Generator-Cognito-User** di konsol, dan pilih tab Output. CloudFormation ** Buka URL yang terdaftar untuk nilai **KinesisDataGeneratorUrl**output.

1. Di halaman **Amazon Kinesis Data Generator**, masuk dengan kredensial yang Anda buat di langkah 4.

1. Di halaman berikutnya, berikan nilai berikut:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/managed-flink/latest/java/example-notebook-streams.html)

   Untuk **Record Template** (Templat Catatan), tempel kode berikut:

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. Pilih **Send data** (Kirim data).

1. Generator akan mengirimkan data ke Kinesis data stream Anda. 

   Biarkan generator berjalan sewaktu Anda menyelesaikan bagian berikutnya.

## Uji notebook Studio Anda
<a name="example-notebook-streams-test"></a>

Di bagian ini, Anda menggunakan notebook Studio untuk mengkueri data dari Kinesis data stream Anda.

1. [Buka Layanan Terkelola untuk konsol Apache Flink di https://console.aws.amazon.com/managed-flink/ rumah? region=us-east-1\$1/aplikasi/dasbor](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard).

1. Pada halaman **Managed Service for Apache Flink Apache Applications**, pilih tab **notebook Studio**. Pilih **MyNotebook**.

1. Di **MyNotebook**halaman, pilih **Buka di Apache Zeppelin**.

   Antarmuka Apache Zeppelin terbuka di tab baru.

1. Di halaman **Selamat Datang di Zeppelin\$1**, pilih **Zeppelin Note** (Catatan Zeppelin).

1. Di halaman **Zeppelin Note** (Catatan Zeppelin), masukkan kueri berikut ke dalam catatan baru:

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Pilih ikon jalankan.

   Setelah beberapa saat, catatan menampilkan data dari Kinesis data stream.

Untuk membuka Dasbor Apache Flink untuk aplikasi Anda agar dapat melihat aspek operasional, pilih **FLINK JOB** (TUGAS FLINK). Untuk informasi selengkapnya tentang Dasbor Flink, lihat Dasbor [Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) di [Managed Service for Apache](https://docs.aws.amazon.com/) Flink Developer Guide.

Untuk contoh kueri SQL Flink Streaming selengkapnya, lihat [Kueri](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) di [Dokumentasi Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Buat notebook Studio dengan Amazon MSK
<a name="example-notebook-msk"></a>

Tutorial ini menjelaskan cara membuat notebook Studio yang menggunakan klaster Amazon MSK sebagai sumber.

**Topics**
+ [Siapkan kluster MSK Amazon](#example-notebook-msk-setup)
+ [Tambahkan gateway NAT ke VPC Anda](#example-notebook-msk-nat)
+ [Buat AWS Glue koneksi dan tabel](#example-notebook-msk-glue)
+ [Buat notebook Studio dengan Amazon MSK](#example-notebook-msk-create)
+ [Kirim data ke klaster Amazon MSK Anda](#example-notebook-msk-send)
+ [Uji notebook Studio Anda](#example-notebook-msk-test)

## Siapkan kluster MSK Amazon
<a name="example-notebook-msk-setup"></a>

Untuk tutorial ini, Anda memerlukan klaster Amazon MSK yang memungkinkan akses plaintext. Jika Anda belum menyiapkan kluster MSK Amazon, ikuti tutorial [Memulai Menggunakan Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) untuk membuat Amazon VPC, kluster MSK Amazon, topik, dan instance klien Amazon. EC2 

Saat mengikuti tutorial, lakukan hal berikut:
+ Di [Langkah 3: Buat Klaster Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html), di langkah 4, ubah nilai `ClientBroker` dari `TLS` ke **PLAINTEXT**.

## Tambahkan gateway NAT ke VPC Anda
<a name="example-notebook-msk-nat"></a>

Jika Anda membuat klaster Amazon MSK dengan mengikuti tutorial [Memulai Menggunakan Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html), atau jika Amazon VPC Anda yang sudah ada tidak memiliki gateway NAT untuk subnet privatnya, Anda harus menambahkan Gateway NAT ke Amazon VPC Anda. Diagram berikut menunjukkan arsitektur. 

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/id_id/managed-flink/latest/java/images/vpc_05.png)


Untuk membuat gateway NAT untuk VPC Amazon Anda, lakukan hal berikut:

1. Buka konsol VPC Amazon di. [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)

1. Pilih **NAT Gateways** (Gateway NAT) dari bilah navigasi sebelah kiri.

1. Di halaman **Gateway NAT**, pilih **Create NAT Gateway** (Buat Gateway NAT).

1. Di halaman **Buat Gateway NAT**, berikan nilai berikut:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/managed-flink/latest/java/example-notebook-msk.html)

   Pilih **Create NAT Gateway** (Buat Gateway NAT).

1. Di bilah navigasi sebelah kiri, pilih **Route Tables** (Tabel Rute).

1. Pilih **Create Route Table** (Buat Tabel Rute).

1. Di halaman **Create route table** (Buat tabel rute), berikan informasi berikut:
   + **Name tag** (Tanda nama): **ZeppelinRouteTable**
   + **VPC****: Pilih VPC Anda (misalnya VPC).AWS KafkaTutorial**

   Pilih **Buat**.

1. Dalam daftar tabel rute, pilih **ZeppelinRouteTable**. Pilih tab **Routes** (Rute), dan pilih **Edit routes** (Edit rute).

1. Di halaman **Edit Rute**, pilih **Add route** (Tambahkan rute).

1. Di ****Untuk **Tujuan**, masukkan **0.0.0.0/0**. Untuk **Target**, pilih **NAT Gateway**, **ZeppelinGateway**. Pilih **Save Routes** (Simpan Rute). Pilih **Close** (Tutup).

1. Pada halaman Tabel Rute, dengan **ZeppelinRouteTable**dipilih, pilih tab **Asosiasi Subnet**. Pilih **Edit subnet associations** (Edit asosiasi subnet).

1. Di halaman **Edit asosiasi subnet**, pilih **AWS KafkaTutorialSubnet2** dan **AWS KafkaTutorialSubnet3**. Pilih **Simpan**.

## Buat AWS Glue koneksi dan tabel
<a name="example-notebook-msk-glue"></a>

Notebook Studio Anda menggunakan basis data [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) untuk metadata tentang sumber data Amazon MSK Anda. Di bagian ini, Anda membuat AWS Glue sambungan yang menjelaskan cara mengakses kluster MSK Amazon, dan AWS Glue tabel yang menjelaskan cara menyajikan data dalam sumber data ke klien seperti buku catatan Studio Anda. 

**Buat Koneksi**

1. Masuk ke Konsol Manajemen AWS dan buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Jika Anda belum memiliki AWS Glue database, pilih **Database** dari bilah navigasi kiri. Pilih **Add database** (Tambahkan basis data). Di jendela **Add database** (Tambahkan basis data), masukkan **default** untuk **Database name** (Nama basis data). Pilih **Create** (Buat).

1. Pilih **Connections** (Koneksi) dari bilah navigasi sebelah kiri. Pilih **Add Connection** (Tambahkan Koneksi).

1. Di jendela **Tambahkan Koneksi**, berikan nilai berikut:
   + Untuk **Connection name** (Nama koneksi), masukkan **ZeppelinConnection**.
   + Untuk **Connection type** (Tipe koneksi), pilih **Kafka**.
   + Untuk **server bootstrap Kafka URLs**, berikan string broker bootstrap untuk cluster Anda. Anda bisa mendapatkan broker bootstrap dari konsol MSK, atau dengan memasukkan perintah CLI berikut:

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + Hapus centang di kotak centang **Require SSL connection** (Perlu koneksi SSL).

   Pilih **Berikutnya**.

1. Di halaman **VPC**, berikan nilai berikut:
   + **Untuk **VPC**, pilih nama VPC Anda (misalnya VPC.) AWS KafkaTutorial**
   + Untuk **Subnet**, pilih **AWS KafkaTutorialSubnet2**.
   + Untuk **Security groups** (Grup keamanan), pilih semua grup yang tersedia.

   Pilih **Berikutnya**.

1. Di halaman **Properti koneksi** / **Akses koneksi**, pilih **Finish** (Selesai).

**Buat Tabel**
**catatan**  
Anda dapat membuat tabel secara manual seperti yang dijelaskan dalam langkah-langkah berikut, atau Anda dapat menggunakan kode konektor buat tabel untuk Layanan Terkelola untuk Apache Flink di buku catatan Anda dalam Apache Zeppelin untuk membuat tabel Anda melalui pernyataan DDL. Anda kemudian dapat check-in AWS Glue untuk memastikan tabel dibuat dengan benar.

1. Di bilah navigasi sebelah kiri, pilih **Tables** (Tabel). Di halaman **Tabel**, pilih **Add tables** (Tambahkan tabel), **Add table manually** (Tambahkan tabel secara manual).

1. Di halaman **Set up your table's properties** (Siapkan properti tabel Anda), masukkan **stock** untuk **Table name** (Nama tabel). Pastikan Anda memilih basis data yang Anda buat sebelumnya. Pilih **Berikutnya**.

1. Di halaman **Tambahkan penyimpanan data**, pilih **Kafka**. Untuk **nama Topik**, masukkan nama topik Anda (mis. **AWS KafkaTutorialTopic**). Untuk **Koneksi**, pilih **ZeppelinConnection**.

1. Di halaman **Klasifikasi**, pilih **JSON**. Pilih **Berikutnya**.

1. Di halaman **Tentukan skema**, pilih Add Column (Tambahkan kolom) untuk menambahkan kolom. Tambahkan kolom dengan properti berikut:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/id_id/managed-flink/latest/java/example-notebook-msk.html)

   Pilih **Berikutnya**.

1. Di halaman berikutnya, verifikasi pengaturan Anda, dan pilih **Finish** (Selesai).

1. Pilih tabel yang baru dibuat dari daftar tabel.

1. Pilih **Edit tabel** dan tambahkan properti berikut:
   + kunci:`managed-flink.proctime`, nilai: `proctime`
   + kunci:`flink.properties.group.id`, nilai: `test-consumer-group`
   + kunci:`flink.properties.auto.offset.reset`, nilai: `latest`
   + kunci:`classification`, nilai: `json`

   Tanpa pasangan kunci/nilai ini, notebook Flink mengalami kesalahan. 

1. Pilih **Terapkan**.

## Buat notebook Studio dengan Amazon MSK
<a name="example-notebook-msk-create"></a>

Sekarang Anda sudah membuat sumber daya yang digunakan aplikasi Anda, Anda membuat notebook Studio Anda. 

**Topics**
+ [Buat notebook Studio menggunakan Konsol Manajemen AWS](#example-notebook-create-msk-console)
+ [Buat notebook Studio menggunakan AWS CLI](#example-notebook-msk-create-api)

**catatan**  
Anda juga dapat membuat notebook Studio dari konsol Amazon MSK dengan memilih klaster yang sudah ada, lalu memilih **Process data in real time** (Proses data secara langsung).

### Buat notebook Studio menggunakan Konsol Manajemen AWS
<a name="example-notebook-create-msk-console"></a>

1. [Buka Layanan Terkelola untuk konsol Apache Flink di https://console.aws.amazon.com/managed-flink/ rumah? region=us-east-1\$1/aplikasi/dasbor](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard).

1. Di halaman **Managed Service for Apache Flink Apache Applications**, pilih tab **Studio**. Pilih **Create Studio notebook** (Buat notebook Studio).
**catatan**  
Untuk membuat notebook Studio dari konsol Amazon MSK atau Kinesis Data Streams pilih klaster Amazon MSK input atau Kinesis data stream Anda, lalu pilih **Process data in real time** (Proses data secara langsung).

1. Di halaman **Buat notebook Studio**, berikan informasi berikut:
   + Masukkan **MyNotebook** untuk **Studio notebook Name** (Nama notebook Studio).
   + Pilih **default** untuk **Basis data AWS Glue**.

   Pilih **Create Studio notebook** (Buat notebook Studio).

1. Di **MyNotebook**halaman, pilih tab **Konfigurasi**. Di bagian **Jaringan**, pilih **Edit**.

1. Di MyNotebook halaman **Edit jaringan untuk**, pilih **konfigurasi VPC berdasarkan kluster MSK Amazon**. Pilih klaster Amazon MSK untuk **Amazon MSK Cluster** (Klaster Amazon MSK). Pilih **Simpan perubahan**.

1. Di **MyNotebook**halaman, pilih **Jalankan**. Tunggu **Status** hingga menampilkan **Running** (Berjalan).

### Buat notebook Studio menggunakan AWS CLI
<a name="example-notebook-msk-create-api"></a>

Untuk membuat buku catatan Studio menggunakan AWS CLI, lakukan hal berikut:

1. Pastikan bahwa Anda memiliki informasi berikut. Anda perlu nilai-nilai ini untuk membuat aplikasi Anda.
   + ID akun Anda.
   + ID subnet IDs dan grup keamanan untuk VPC Amazon yang berisi kluster MSK Amazon Anda.

1. Buat file bernama `create.json` dengan konten berikut. Ganti nilai placeholder dengan informasi Anda.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Jalankan perintah berikut untuk membuat aplikasi Anda.

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. Setelah perintah selesai, Anda akan melihat output yang serupa dengan yang berikut, yang menampilkan detail untuk notebook Studio baru Anda:

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Jalankan perintah berikut untuk memulai aplikasi Anda. Ganti nilai sampel dengan ID akun Anda.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Kirim data ke klaster Amazon MSK Anda
<a name="example-notebook-msk-send"></a>

Di bagian ini, Anda menjalankan skrip Python di EC2 klien Amazon Anda untuk mengirim data ke sumber data MSK Amazon Anda.

1. Connect ke EC2 klien Amazon Anda.

1. Jalankan perintah berikut untuk menginstal Python versi 3, Pip, dan Kafka untuk paket Python, dan mengonfirmasi tindakan:

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. Konfigurasikan AWS CLI pada mesin klien Anda dengan memasukkan perintah berikut:

   ```
   aws configure
   ```

   Berikan kredensial akun Anda, dan **us-east-1** untuk `region`.

1. Buat file bernama `stock.py` dengan konten berikut. Ganti nilai sampel dengan string Bootstrap Brokers kluster MSK Amazon Anda, dan perbarui nama topik jika topik Anda bukan **AWS KafkaTutorialTopic**:

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. Jalankan skrip dengan perintah berikut:

   ```
   $ python3 stock.py
   ```

1. Biarkan skrip berjalan saat Anda menyelesaikan bagian berikut.

## Uji notebook Studio Anda
<a name="example-notebook-msk-test"></a>

Di bagian ini, Anda menggunakan notebook Studio Anda untuk mengkueri data dari klaster Amazon MSK Anda.

1. [Buka Layanan Terkelola untuk konsol Apache Flink di https://console.aws.amazon.com/managed-flink/ rumah? region=us-east-1\$1/aplikasi/dasbor](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard).

1. Pada halaman **Managed Service for Apache Flink Apache Applications**, pilih tab **notebook Studio**. Pilih **MyNotebook**.

1. Di **MyNotebook**halaman, pilih **Buka di Apache Zeppelin**.

   Antarmuka Apache Zeppelin terbuka di tab baru.

1. Di halaman **Selamat Datang di Zeppelin\$1**, pilih **Zeppelin new note** (Catatan baru Zeppelin).

1. Di halaman **Zeppelin Note** (Catatan Zeppelin), masukkan kueri berikut ke dalam catatan baru:

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Pilih ikon jalankan.

   Aplikasi menampilkan data dari klaster Amazon MSK.

Untuk membuka Dasbor Apache Flink untuk aplikasi Anda agar dapat melihat aspek operasional, pilih **FLINK JOB** (TUGAS FLINK). Untuk informasi selengkapnya tentang Dasbor Flink, lihat Dasbor [Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) di [Managed Service for Apache](https://docs.aws.amazon.com/) Flink Developer Guide.

Untuk contoh kueri SQL Flink Streaming selengkapnya, lihat [Kueri](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) di [Dokumentasi Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Bersihkan aplikasi Anda dan sumber daya yang bergantung
<a name="example-notebook-cleanup"></a>

## Hapus notebook Studio Anda
<a name="example-notebook-cleanup-app"></a>

1. Buka Layanan Terkelola untuk konsol Apache Flink.

1. Pilih **MyNotebook**.

1. Pilih **Actions** (Tindakan), lalu **Delete** (Hapus).

## Hapus AWS Glue database dan koneksi Anda
<a name="example-notebook-cleanup-glue"></a>

1. Buka AWS Glue konsol di [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Pilih **Databases** (Basis Data) dari bilah navigasi sebelah kiri. Centang kotak centang di sebelah **Default** untuk memilihnya. Pilih **Action** (Tindakan), **Delete Database** (Hapus Basis Data). Konfirmasikan pilihan Anda.

1. Pilih **Connections** (Koneksi) dari bilah navigasi sebelah kiri. Centang kotak di sebelah untuk **ZeppelinConnection**memilihnya. Pilih **Action** (Tindakan), **Delete Connection** (Hapus Koneksi). Konfirmasikan pilihan Anda.

## Hapus IAM role dan kebijakan IAM Anda
<a name="example-notebook-msk-cleanup-iam"></a>

1. Buka konsol IAM di [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Pilih **Roles** (Peran) dari bilah navigasi sebelah kiri.

1. Gunakan bilah pencarian untuk mencari **ZeppelinRole**peran.

1. Pilih **ZeppelinRole**peran. Pilih **Delete Role** (Hapus Peran). Konfirmasi penghapusan.

## Hapus grup CloudWatch log Anda
<a name="example-notebook-cleanup-cw"></a>

Konsol membuat grup CloudWatch Log dan aliran log untuk Anda saat Anda membuat aplikasi menggunakan konsol. Anda tidak memiliki grup dan aliran log jika Anda membuat aplikasi menggunakan AWS CLI.

1. Buka CloudWatch konsol di [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. Pilih **Log groups** (Grup log) dari bilah navigasi sebelah kiri.

1. Pilih grup**/AWS/KinesisAnalytics/MyNotebook**log.

1. Pilih **Actions** (Tindakan), **Delete log group(s)** (Hapus grup log). Konfirmasi penghapusan.

## Bersihkan sumber daya Kinesis Data Streams
<a name="example-notebook-cleanup-streams"></a>

Untuk menghapus aliran Kinesis, buka konsol Kinesis Data Streams, pilih aliran Kinesis, lalu pilih **Actions** (Tindakan), **Delete** (Hapus).

## Bersihkan sumber daya MSK
<a name="example-notebook-cleanup-msk"></a>

Ikuti langkah-langkah di bagian ini jika Anda membuat klaster Amazon MSK untuk tutorial ini. Bagian ini berisi petunjuk untuk membersihkan instans klien Amazon EC2, Amazon VPC, dan klaster Amazon MSK Anda.

### Hapus kluster MSK Amazon Anda
<a name="example-notebook-msk-cleanup-msk"></a>

Ikuti langkah-langkah ini jika Anda membuat klaster Amazon MSK untuk tutorial ini.

1. Buka konsol MSK Amazon di [https://console.aws.amazon.com/msk/rumah? region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. Pilih **AWS KafkaTutorialCluster**. Pilih **Delete** (Hapus). Masukkan **delete** di jendela yang muncul, dan konfirmasikan pilihan Anda.

### Akhiri intans klien Anda
<a name="example-notebook-msk-cleanup-client"></a>

Ikuti langkah-langkah ini jika Anda membuat instans klien Amazon EC2 untuk tutorial ini.

1. Buka konsol Amazon EC2 di. [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/)

1. Pilih **Instances** (Instans) dari panel navigasi sebelah kiri.

1. Pilih kotak centang di sebelah untuk **ZeppelinClient**memilihnya.

1. Pilih **Instance State** (Status Instans), **Terminate Instance** (Akhiri Instans).

### Hapus Amazon VPC Anda
<a name="example-notebook-msk-cleanup-vpc"></a>

Ikuti langkah-langkah ini jika Anda membuat klaster Amazon VPC untuk tutorial ini.

1. Buka konsol Amazon EC2 di. [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/)

1. Pilih **Network Interfaces** (Antarmuka Jaringan) dari bilah navigasi sebelah kiri.

1. Masukkan ID VPC Anda di bilah pencarian dan tekan enter untuk mencari.

1. Pilih kotak centang di header tabel untuk memilih semua antarmuka jaringan yang ditampilkan.

1. Pilih **Actions** (Tindakan), **Detach** (Lepaskan). Di jendela yang muncul, pilih **Enable** (Aktifkan) di bawah **Force detachment** (Lepas paksa). Pilih **Detach** (Lepaskan), dan tunggu hingga semua antarmuka jaringan mencapai status **Available** (Tersedia).

1. Pilih kotak centang di header tabel untuk memilih lagi semua antarmuka jaringan yang ditampilkan.

1. Pilih **Actions** (Tindakan), **Delete** (Hapus). Konfirmasikan tindakan.

1. Buka konsol Amazon VPC di. [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)

1. Pilih **AWS KafkaTutorialVPC**. Pilih **Actions** (Tindakan), **Delete VPC** (Hapus VPC). Masukkan **delete** dan konfirmasikan penghapusan.

# Tutorial: Menyebarkan notebook Studio sebagai Layanan Terkelola untuk aplikasi Apache Flink dengan status tahan lama
<a name="example-notebook-deploy"></a>

Tutorial berikut menunjukkan cara menyebarkan notebook Studio sebagai Layanan Terkelola untuk aplikasi Apache Flink dengan status tahan lama.

**Topics**
+ [Prasyarat lengkap](#example-notebook-durable-setup)
+ [Menyebarkan aplikasi dengan status tahan lama menggunakan Konsol Manajemen AWS](#example-notebook-deploy-console)
+ [Menyebarkan aplikasi dengan status tahan lama menggunakan AWS CLI](#example-notebook-deploy-cli)

## Prasyarat lengkap
<a name="example-notebook-durable-setup"></a>

Buat notebook Studio baru dengan mengikuti [Tutorial: Membuat notebook Studio di Managed Service untuk Apache Flink](example-notebook.md), menggunakan Kinesis Data Streams atau Amazon MSK. Beri nama notebook Studio `ExampleTestDeploy`.

## Menyebarkan aplikasi dengan status tahan lama menggunakan Konsol Manajemen AWS
<a name="example-notebook-deploy-console"></a>

1. Tambahkan lokasi bucket S3 tempat Anda ingin kode yang dikemas disimpan di bawah **Lokasi kode aplikasi - *opsional*** di konsol. Ini mengaktifkan langkah-langkah untuk men-deploy dan menjalankan aplikasi Anda langsung dari notebook.

1. Tambahkan izin yang diperlukan ke peran aplikasi untuk mengaktifkan peran yang Anda gunakan untuk membaca dan menulis ke bucket Amazon S3, dan untuk meluncurkan Layanan Terkelola untuk aplikasi Apache Flink:
   + AmazonS3 FullAccess
   + Amazondikelola- flinkFullAccess
   + Akses ke sumber, tujuan, dan VPCs sebagaimana berlaku. Untuk informasi selengkapnya, lihat [Tinjau izin IAM untuk notebook Studio](how-zeppelin-iam.md).

1. Gunakan kode sampel berikut:

   ```
   %flink.ssql(type=update) 
   CREATE TABLE exampleoutput (
     'ticket' VARCHAR,
     'price' DOUBLE
   )
   WITH (
     'connector' = 'kinesis',
     'stream' = 'ExampleOutputStream',
     'aws.region' = 'us-east-1',
     'scan.stream.initpos' = 'LATEST',
     'format' = 'json'
   );
   
   INSERT INTO exampleoutput SELECT ticker, price FROM exampleinputstream
   ```

1. Dengan peluncuran fitur ini, Anda akan melihat menu menurun baru di sudut kanan atas setiap catatan di notebook Anda dengan nama notebook. Anda dapat melakukan tindakan berikut:
   + Lihat pengaturan notebook Studio di Konsol Manajemen AWS.
   + Bangun Zeppelin Note dan ekspor ke Amazon S3. Di titik ini, beri nama aplikasi Anda dan pilih **Build and Export** (Bangun dan Ekspor). Anda akan mendapatkan notifikasi saat ekspor selesai.
   + Jika perlu, Anda dapat melihat dan menjalankan tes tambahan pada executable di Amazon S3.
   + Setelah selesai dibangun, Anda akan dapat men-deploy kode Anda sebagai aplikasi streaming Kinesis dengan status tahan lama dan penskalaan otomatis.
   + Gunakan menu menurun dan pilih **Deploy Zeppelin Note as Kinesis streaming application** (Deploy Zeppelin Note sebagai aplikasi streaming Kinesis). Tinjau nama aplikasi dan pilih **Deploy via AWS Console**.
   + Ini akan membawa Anda ke Konsol Manajemen AWS halaman untuk membuat Layanan Terkelola untuk aplikasi Apache Flink. Perhatikan bahwa nama aplikasi, paralelisme, lokasi kode, Glue DB default, VPC (jika berlaku) dan IAM role sudah diisi sebelumnya. Pastikan IAM role memiliki izin yang diperlukan untuk sumber dan tujuan Anda. Snapshot diaktifkan secara default untuk manajemen state aplikasi yang tahan lama.
   + Pilih **create application** (buat aplikasi).
   + Anda dapat memilih **configure** (konfigurasikan) dan mengubah pengaturan apa pun, lalu memilih **Run** (Jalankan) untuk memulai aplikasi streaming Anda.

## Menyebarkan aplikasi dengan status tahan lama menggunakan AWS CLI
<a name="example-notebook-deploy-cli"></a>

Untuk menyebarkan aplikasi menggunakan AWS CLI, Anda harus memperbarui AWS CLI untuk menggunakan model layanan yang disediakan dengan informasi Beta 2 Anda. Untuk informasi tentang cara menggunakan model layanan yang diperbarui, lihat [Lengkapi prasyaratPrasyarat lengkap](example-notebook.md#example-notebook-setup).

Kode contoh berikut membuat notebook Studio baru:

```
aws kinesisanalyticsv2 create-application \
     --application-name <app-name> \
     --runtime-environment ZEPPELIN-FLINK-3_0 \
     --application-mode INTERACTIVE \
     --service-execution-role <iam-role>
     --application-configuration '{ 
       "ZeppelinApplicationConfiguration": { 
         "CatalogConfiguration": { 
           "GlueDataCatalogConfiguration": { 
             "DatabaseARN": "arn:aws:glue:us-east-1:<account>:database/<glue-database-name>" 
           } 
         } 
       },
       "FlinkApplicationConfiguration": {
         "ParallelismConfiguration": {
           "ConfigurationType": "CUSTOM",
           "Parallelism": 4,
           "ParallelismPerKPU": 4
         }
       },
       "DeployAsApplicationConfiguration": {
            "S3ContentLocation": { 
               "BucketARN": "arn:aws:s3:::<s3bucket>",
               "BasePath": "/something/"
            }
        },
       "VpcConfigurations": [
         {
           "SecurityGroupIds": [
             "<security-group>"
           ],
           "SubnetIds": [
             "<subnet-1>",
             "<subnet-2>"
           ]
         }
       ]
     }' \
     --region us-east-1
```

Contoh kode berikut memulai notebook Studio baru:

```
aws kinesisanalyticsv2 start-application \
    --application-name <app-name> \
    --region us-east-1 \
    --no-verify-ssl
```

Kode berikut mengembalikan URL untuk halaman notebook Apache Zeppelin aplikasi:

```
aws kinesisanalyticsv2 create-application-presigned-url \
    --application-name <app-name> \
    --url-type ZEPPELIN_UI_URL \

    --region us-east-1 \
    --no-verify-ssl
```

# Lihat contoh kueri untuk menganalisis data di buku catatan Studio
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [Buat tabel dengan Amazon MSK/Apache Kafka](#how-zeppelin-examples-creating-tables)
+ [Buat tabel dengan Kinesis](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [Kueri jendela yang jatuh](#how-zeppelin-examples-tumbling)
+ [Kueri jendela geser](#how-zeppelin-examples-sliding)
+ [Gunakan SQL interaktif](#how-zeppelin-examples-interactive-sql)
+ [Gunakan konektor BlackHole SQL](#how-zeppelin-examples-blackhole-connector-sql)
+ [Gunakan Scala untuk menghasilkan data sampel](#notebook-example-data-generator)
+ [Gunakan Scala interaktif](#notebook-example-interactive-scala)
+ [Gunakan Python interaktif](#notebook-example-interactive-python)
+ [Gunakan kombinasi Python interaktif, SQL, dan Scala](#notebook-example-interactive-pythonsqlscala)
+ [Gunakan aliran data Kinesis lintas akun](#notebook-example-crossaccount-kds)

Untuk informasi tentang pengaturan kueri SQL Apache Flink, lihat [Flink pada Notebook Zeppelin untuk Analisis Data Interaktif](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html).

Untuk melihat aplikasi Anda di dasbor Apache Flink, pilih **FLINK JOB** (TUGAS FLINK) di halaman **Zeppelin Note** aplikasi Anda.

Untuk informasi selengkapnya tentang kueri jendela, lihat [Windows](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html) (Jendela) di [Dokumentasi Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

Untuk contoh kueri SQL Apache Flink Streaming selengkapnya, lihat [Kueri](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) di [Dokumentasi Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

## Buat tabel dengan Amazon MSK/Apache Kafka
<a name="how-zeppelin-examples-creating-tables"></a>

Anda dapat menggunakan konektor Amazon MSK Flink dengan Managed Service for Apache Flink Studio untuk mengautentikasi koneksi Anda dengan otentikasi Plaintext, SSL, atau IAM. Buat tabel Anda menggunakan properti spesifik sesuai kebutuhan Anda.

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

Anda dapat menggabungkan ini dengan properti lain di [Apache Kafka SQL](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/) Connector.

## Buat tabel dengan Kinesis
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

Dalam contoh berikut, Anda membuat tabel menggunakan Kinesis:

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

Untuk informasi selengkapnya tentang properti lain yang dapat Anda gunakan, lihat [Konektor SQL Amazon Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/).

## Kueri jendela yang jatuh
<a name="how-zeppelin-examples-tumbling"></a>

Kueri SQL Flink Streaming berikut memilih harga tertinggi di setiap jendela tumbling lima detik dari tabel `ZeppelinTopic`:

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## Kueri jendela geser
<a name="how-zeppelin-examples-sliding"></a>

Kueri SQL Apache Flink Streaming berikut memilih harga tertinggi di setiap jendela geser lima detik dari tabel `ZeppelinTopic`:

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## Gunakan SQL interaktif
<a name="how-zeppelin-examples-interactive-sql"></a>

Contoh ini mencetak maks. waktu peristiwa dan waktu pemrosesan serta jumlah nilai dari tabel nilai kunci. Pastikan Anda memiliki skrip pembuatan data sampel dari [Gunakan Scala untuk menghasilkan data sampel](#notebook-example-data-generator) yang berjalan. Untuk mencoba kueri SQL lainnya seperti filter dan gabung di notebook Studio Anda, lihat dokumentasi Apache Flink: [Kueri](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) di dokumentasi Apache Flink.

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## Gunakan konektor BlackHole SQL
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

Konektor BlackHole SQL tidak mengharuskan Anda membuat aliran data Kinesis atau kluster MSK Amazon untuk menguji kueri Anda. Untuk informasi tentang konektor BlackHole SQL, lihat Konektor [BlackHole SQL dalam dokumentasi](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html) Apache Flink. Dalam contoh ini, katalog default adalah katalog dalam memori.

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## Gunakan Scala untuk menghasilkan data sampel
<a name="notebook-example-data-generator"></a>

Contoh ini menggunakan Scala untuk menghasilkan data sampel. Anda dapat menggunakan data sampel ini untuk menguji berbagai kueri. Gunakan pernyataan buat tabel untuk membuat tabel nilai kunci.

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## Gunakan Scala interaktif
<a name="notebook-example-interactive-scala"></a>

Ini adalah terjemahan Scala dari [Gunakan SQL interaktif](#how-zeppelin-examples-interactive-sql). Untuk contoh Scala lainnya, lihat [Tabel API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) di dokumentasi Apache Flink.

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Gunakan Python interaktif
<a name="notebook-example-interactive-python"></a>

Ini adalah terjemahan Python dari [Gunakan SQL interaktif](#how-zeppelin-examples-interactive-sql). Untuk contoh Python lainnya, lihat [Tabel API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) di dokumentasi Apache Flink. 

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Gunakan kombinasi Python interaktif, SQL, dan Scala
<a name="notebook-example-interactive-pythonsqlscala"></a>

Anda dapat menggunakan kombinasi SQL, Python, dan Scala apa pun di notebook Anda untuk analisis interaktif. Dalam notebook Studio yang Anda rencanakan untuk di-deploy sebagai aplikasi dengan status tahan lama, Anda dapat menggunakan kombinasi SQL dan Scala. Contoh ini menunjukkan bagian yang diabaikan dan bagian yang dapat digunakan dalam aplikasi dengan status tahan lama.

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## Gunakan aliran data Kinesis lintas akun
<a name="notebook-example-crossaccount-kds"></a>

Untuk menggunakan Kinesis data stream yang ada di akun selain akun yang memiliki notebook Studio, buat peran eksekusi layanan di akun tempat notebook Studio Anda berjalan dan kebijakan kepercayaan peran di akun yang memiliki aliran data. Gunakan `aws.credentials.provider`, `aws.credentials.role.arn`, dan `aws.credentials.role.sessionName` di konektor Kinesis dalam pernyataan DDL buat tabel Anda untuk membuat tabel pada aliran data.

Gunakan peran eksekusi layanan berikut untuk akun notebook Studio.

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

Gunakan kebijakan `AmazonKinesisFullAccess` dan kebijakan kepercayaan peran berikut untuk akun aliran data.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

Gunakan paragraf berikut untuk membuat pernyataan tabel.

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```