Amazon Managed Service untuk Apache Flink sebelumnya dikenal sebagai Amazon Kinesis Data Analytics untuk Apache Flink.
Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Buat notebook Studio dengan Amazon MSK
Tutorial ini menjelaskan cara membuat notebook Studio yang menggunakan klaster Amazon MSK sebagai sumber.
Tutorial ini berisi bagian-bagian berikut:
Siapkan kluster MSK Amazon
Untuk tutorial ini, Anda memerlukan klaster Amazon MSK yang memungkinkan akses plaintext. Jika Anda belum menyiapkan kluster MSK Amazon, ikuti tutorial Memulai Menggunakan Amazon MSK untuk membuat Amazon VPC, kluster MSK Amazon, topik, dan instance klien Amazon. EC2
Saat mengikuti tutorial, lakukan hal berikut:
Di Langkah 3: Buat Klaster Amazon MSK, di langkah 4, ubah nilai
ClientBroker
dariTLS
kePLAINTEXT
.
Tambahkan gateway NAT ke VPC Anda
Jika Anda membuat klaster Amazon MSK dengan mengikuti tutorial Memulai Menggunakan Amazon MSK, atau jika Amazon VPC Anda yang sudah ada tidak memiliki gateway NAT untuk subnet privatnya, Anda harus menambahkan Gateway NAT ke Amazon VPC Anda. Diagram berikut menunjukkan arsitektur.

Untuk membuat gateway NAT untuk VPC Amazon Anda, lakukan hal berikut:
Buka konsol VPC Amazon di. https://console.aws.amazon.com/vpc/
Pilih NAT Gateways (Gateway NAT) dari bilah navigasi sebelah kiri.
Di halaman Gateway NAT, pilih Create NAT Gateway (Buat Gateway NAT).
Di halaman Buat Gateway NAT, berikan nilai berikut:
Nama - opsional ZeppelinGateway
Subnet AWS KafkaTutorialSubnet1 ID alokasi IP elastis Pilih IP Elastis yang tersedia. Jika tidak ada Elastic IPs yang tersedia, pilih Alokasikan IP Elastis, lalu pilih IP Elasic yang dibuat konsol. Pilih Create NAT Gateway (Buat Gateway NAT).
Di bilah navigasi sebelah kiri, pilih Route Tables (Tabel Rute).
Pilih Create Route Table (Buat Tabel Rute).
Di halaman Create route table (Buat tabel rute), berikan informasi berikut:
Name tag (Tanda nama):
ZeppelinRouteTable
VPC: Pilih VPC Anda (misalnya VPC).AWS KafkaTutorial
Pilih Buat.
Dalam daftar tabel rute, pilih ZeppelinRouteTable. Pilih tab Routes (Rute), dan pilih Edit routes (Edit rute).
Di halaman Edit Rute, pilih Add route (Tambahkan rute).
Di Untuk Tujuan, masukkan
0.0.0.0/0
. Untuk Target, pilih NAT Gateway, ZeppelinGateway. Pilih Save Routes (Simpan Rute). Pilih Close (Tutup).Pada halaman Tabel Rute, dengan ZeppelinRouteTabledipilih, pilih tab Asosiasi Subnet. Pilih Edit subnet associations (Edit asosiasi subnet).
Di halaman Edit asosiasi subnet, pilih AWS KafkaTutorialSubnet2 dan AWS KafkaTutorialSubnet3. Pilih Simpan.
Buat AWS Glue koneksi dan tabel
Notebook Studio Anda menggunakan basis data AWS Glue untuk metadata tentang sumber data Amazon MSK Anda. Di bagian ini, Anda membuat AWS Glue sambungan yang menjelaskan cara mengakses kluster MSK Amazon, dan AWS Glue tabel yang menjelaskan cara menyajikan data dalam sumber data ke klien seperti buku catatan Studio Anda.
Buat Koneksi
Masuk ke AWS Management Console dan buka AWS Glue konsol di https://console.aws.amazon.com/glue/
. Jika Anda belum memiliki AWS Glue database, pilih Database dari bilah navigasi kiri. Pilih Add database (Tambahkan basis data). Di jendela Add database (Tambahkan basis data), masukkan
default
untuk Database name (Nama basis data). Pilih Create (Buat).Pilih Connections (Koneksi) dari bilah navigasi sebelah kiri. Pilih Add Connection (Tambahkan Koneksi).
Di jendela Tambahkan Koneksi, berikan nilai berikut:
Untuk Connection name (Nama koneksi), masukkan
ZeppelinConnection
.Untuk Connection type (Tipe koneksi), pilih Kafka.
Untuk server bootstrap Kafka URLs, berikan string broker bootstrap untuk cluster Anda. Anda bisa mendapatkan broker bootstrap dari konsol MSK, atau dengan memasukkan perintah CLI berikut:
aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn
ClusterArn
Hapus centang di kotak centang Require SSL connection (Perlu koneksi SSL).
Pilih Berikutnya.
Di halaman VPC, berikan nilai berikut:
Untuk VPC, pilih nama VPC Anda (misalnya VPC.) AWS KafkaTutorial
Untuk Subnet, pilih AWS KafkaTutorialSubnet2.
Untuk Security groups (Grup keamanan), pilih semua grup yang tersedia.
Pilih Berikutnya.
Di halaman Properti koneksi / Akses koneksi, pilih Finish (Selesai).
Buat Tabel
catatan
Anda dapat membuat tabel secara manual seperti yang dijelaskan dalam langkah-langkah berikut, atau Anda dapat menggunakan kode konektor buat tabel untuk Layanan Terkelola untuk Apache Flink di buku catatan Anda dalam Apache Zeppelin untuk membuat tabel Anda melalui pernyataan DDL. Anda kemudian dapat check-in AWS Glue untuk memastikan tabel dibuat dengan benar.
Di bilah navigasi sebelah kiri, pilih Tables (Tabel). Di halaman Tabel, pilih Add tables (Tambahkan tabel), Add table manually (Tambahkan tabel secara manual).
Di halaman Set up your table's properties (Siapkan properti tabel Anda), masukkan
stock
untuk Table name (Nama tabel). Pastikan Anda memilih basis data yang Anda buat sebelumnya. Pilih Berikutnya.Di halaman Tambahkan penyimpanan data, pilih Kafka. Untuk nama Topik, masukkan nama topik Anda (mis. AWS KafkaTutorialTopic). Untuk Koneksi, pilih ZeppelinConnection.
Di halaman Klasifikasi, pilih JSON. Pilih Berikutnya.
Di halaman Tentukan skema, pilih Add Column (Tambahkan kolom) untuk menambahkan kolom. Tambahkan kolom dengan properti berikut:
Nama kolom Tipe data ticker
string
price
double
Pilih Berikutnya.
Di halaman berikutnya, verifikasi pengaturan Anda, dan pilih Finish (Selesai).
-
Pilih tabel yang baru dibuat dari daftar tabel.
-
Pilih Edit tabel dan tambahkan properti berikut:
-
kunci:
managed-flink.proctime
, nilai:proctime
-
kunci:
flink.properties.group.id
, nilai:test-consumer-group
-
kunci:
flink.properties.auto.offset.reset
, nilai:latest
-
kunci:
classification
, nilai:json
Tanpa pasangan kunci/nilai ini, notebook Flink mengalami kesalahan.
-
-
Pilih Terapkan.
Buat notebook Studio dengan Amazon MSK
Sekarang Anda sudah membuat sumber daya yang digunakan aplikasi Anda, Anda membuat notebook Studio Anda.
Anda dapat membuat aplikasi Anda menggunakan salah satu AWS Management Console atau AWS CLI.
catatan
Anda juga dapat membuat notebook Studio dari konsol Amazon MSK dengan memilih klaster yang sudah ada, lalu memilih Process data in real time (Proses data secara langsung).
Buat notebook Studio menggunakan AWS Management Console
Di halaman Managed Service for Apache Flink Apache Applications, pilih tab Studio. Pilih Create Studio notebook (Buat notebook Studio).
catatan
Untuk membuat notebook Studio dari konsol Amazon MSK atau Kinesis Data Streams pilih klaster Amazon MSK input atau Kinesis data stream Anda, lalu pilih Process data in real time (Proses data secara langsung).
Di halaman Buat notebook Studio, berikan informasi berikut:
-
Masukkan
MyNotebook
untuk Studio notebook Name (Nama notebook Studio). Pilih default untuk Basis data AWS Glue.
Pilih Create Studio notebook (Buat notebook Studio).
-
Di MyNotebookhalaman, pilih tab Konfigurasi. Di bagian Jaringan, pilih Edit.
Di MyNotebook halaman Edit jaringan untuk, pilih konfigurasi VPC berdasarkan kluster MSK Amazon. Pilih klaster Amazon MSK untuk Amazon MSK Cluster (Klaster Amazon MSK). Pilih Simpan perubahan.
Di MyNotebookhalaman, pilih Jalankan. Tunggu Status hingga menampilkan Running (Berjalan).
Buat notebook Studio menggunakan AWS CLI
Untuk membuat buku catatan Studio menggunakan AWS CLI, lakukan hal berikut:
Pastikan bahwa Anda memiliki informasi berikut. Anda perlu nilai-nilai ini untuk membuat aplikasi Anda.
ID akun Anda.
ID subnet IDs dan grup keamanan untuk VPC Amazon yang berisi kluster MSK Amazon Anda.
Buat file bernama
create.json
dengan konten berikut. Ganti nilai placeholder dengan informasi Anda.{ "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::
AccountID
:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1
", "SubnetID 2
", "SubnetID 3
" ], "SecurityGroupIds": [ "VPC Security Group ID
" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID
:database/default" } } } } }Jalankan perintah berikut untuk membuat aplikasi Anda.
aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
Setelah perintah selesai, Anda akan melihat output yang serupa dengan yang berikut, yang menampilkan detail untuk notebook Studio baru Anda:
{ "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
Jalankan perintah berikut untuk memulai aplikasi Anda. Ganti nilai sampel dengan ID akun Anda.
aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:
012345678901
:application/MyNotebook\
Kirim data ke klaster Amazon MSK Anda
Di bagian ini, Anda menjalankan skrip Python di EC2 klien Amazon Anda untuk mengirim data ke sumber data MSK Amazon Anda.
Connect ke EC2 klien Amazon Anda.
Jalankan perintah berikut untuk menginstal Python versi 3, Pip, dan Kafka untuk paket Python, dan mengonfirmasi tindakan:
sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
Konfigurasikan AWS CLI pada mesin klien Anda dengan memasukkan perintah berikut:
aws configure
Berikan kredensial akun Anda, dan
us-east-1
untukregion
.Buat file bernama
stock.py
dengan konten berikut. Ganti nilai sampel dengan string Bootstrap Brokers kluster MSK Amazon Anda, dan perbarui nama topik jika topik Anda bukan AWS KafkaTutorialTopic:from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "
<<Bootstrap Broker List>>
" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())Jalankan skrip dengan perintah berikut:
$ python3 stock.py
Biarkan skrip berjalan saat Anda menyelesaikan bagian berikut.
Uji notebook Studio Anda
Di bagian ini, Anda menggunakan notebook Studio Anda untuk mengkueri data dari klaster Amazon MSK Anda.
Pada halaman Managed Service for Apache Flink Apache Applications, pilih tab notebook Studio. Pilih MyNotebook.
Di MyNotebookhalaman, pilih Buka di Apache Zeppelin.
Antarmuka Apache Zeppelin terbuka di tab baru.
Di halaman Selamat Datang di Zeppelin!, pilih Zeppelin new note (Catatan baru Zeppelin).
Di halaman Zeppelin Note (Catatan Zeppelin), masukkan kueri berikut ke dalam catatan baru:
%flink.ssql(type=update) select * from stock
Pilih ikon jalankan.
Aplikasi menampilkan data dari klaster Amazon MSK.
Untuk membuka Dasbor Apache Flink untuk aplikasi Anda agar dapat melihat aspek operasional, pilih FLINK JOB (TUGAS FLINK). Untuk informasi selengkapnya tentang Dasbor Flink, lihat Dasbor Apache Flink di Managed Service for Apache Flink Developer Guide.
Untuk contoh kueri SQL Flink Streaming selengkapnya, lihat Kueri