Membuat dan menjalankan Managed Service untuk Apache Flink untuk aplikasi Python - Layanan Terkelola untuk Apache Flink

Amazon Managed Service untuk Apache Flink sebelumnya dikenal sebagai Amazon Kinesis Data Analytics untuk Apache Flink.

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Membuat dan menjalankan Managed Service untuk Apache Flink untuk aplikasi Python

Di bagian ini, Anda membuat Layanan Terkelola untuk aplikasi Apache Flink untuk aplikasi Python dengan aliran Kinesis sebagai sumber dan wastafel.

Buat sumber daya yang bergantung

Sebelum Anda membuat Layanan Terkelola untuk Apache Flink untuk latihan ini, Anda membuat sumber daya dependen berikut:

  • Dua aliran Kinesis untuk input dan output.

  • Bucket Amazon S3 untuk menyimpan kode aplikasi.

catatan

Tutorial ini mengasumsikan bahwa Anda menerapkan aplikasi Anda di Wilayah us-east-1. Jika Anda menggunakan Wilayah lain, Anda harus menyesuaikan semua langkah yang sesuai.

Buat dua aliran Kinesis

Sebelum Anda membuat Layanan Terkelola untuk aplikasi Apache Flink untuk latihan ini, buat dua aliran data Kinesis (ExampleInputStreamdanExampleOutputStream) di Wilayah yang sama yang akan Anda gunakan untuk menyebarkan aplikasi Anda (us-east-1 dalam contoh ini). Aplikasi Anda menggunakan aliran ini untuk sumber aplikasi dan aliran tujuan.

Anda dapat membuat aliran ini menggunakan konsol Amazon Kinesis atau perintah AWS CLI berikut. Untuk instruksi konsol, lihat Membuat dan Memperbarui Aliran Data di Panduan Developer Amazon Kinesis Data Streams.

Untuk membuat aliran data AWS CLI
  1. Untuk membuat stream (ExampleInputStream) pertama, gunakan perintah Amazon Kinesis create-stream AWS CLI berikut.

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1
  2. Untuk membuat aliran kedua yang digunakan aplikasi untuk menulis output, jalankan perintah yang sama, yang mengubah nama aliran menjadi ExampleOutputStream.

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1

Buat bucket Amazon S3.

Anda dapat membuat bucket Amazon S3 menggunakan konsol. Untuk petunjuk pembuatan sumber daya ini, lihat topik berikut:

  • Bagaimana Cara Membuat Bucket S3? di Panduan Pengguna Layanan Penyimpanan Sederhana Amazon. Berikan bucket Amazon S3 nama yang unik secara global, misalnya dengan menambahkan nama login Anda.

    catatan

    Pastikan Anda membuat bucket S3 di Region yang Anda gunakan untuk tutorial ini (us-east-1).

Sumber daya lainnya

Saat Anda membuat aplikasi, Managed Service for Apache Flink akan membuat CloudWatch resource Amazon berikut jika belum ada:

  • Grup log yang disebut /AWS/KinesisAnalytics-java/<my-application>.

  • Aliran log yang disebut kinesis-analytics-log-stream.

Siapkan lingkungan pengembangan lokal Anda

Untuk pengembangan dan debugging, Anda dapat menjalankan aplikasi Python Flink di mesin Anda. Anda dapat memulai aplikasi dari baris perintah dengan python main.py atau dengan Python pilihan IDE Anda.

catatan

Pada mesin pengembangan Anda, Anda harus menginstal Python 3.10 atau 3.11, Java 11, Apache Maven, dan Git. Kami menyarankan Anda menggunakan IDE seperti PyCharmatau Visual Studio Code. Untuk memverifikasi bahwa Anda memenuhi semua prasyarat, lihat Memenuhi prasyarat untuk menyelesaikan latihan sebelum melanjutkan.

Untuk mengembangkan aplikasi Anda dan menjalankannya secara lokal, Anda harus menginstal perpustakaan Flink Python.

  1. Buat lingkungan Python mandiri VirtualEnv menggunakan, Conda, atau alat Python serupa.

  2. Instal PyFlink perpustakaan di lingkungan itu. Gunakan versi runtime Apache Flink yang sama yang akan Anda gunakan di Amazon Managed Service untuk Apache Flink. Saat ini, runtime yang disarankan adalah 1.19.1.

    $ pip install apache-flink==1.19.1
  3. Pastikan lingkungan aktif saat Anda menjalankan aplikasi. Jika Anda menjalankan aplikasi diIDE, pastikan bahwa menggunakan lingkungan sebagai runtime. IDE Prosesnya tergantung pada IDE yang Anda gunakan.

    catatan

    Anda hanya perlu menginstal PyFlink perpustakaan. Anda tidak perlu menginstal cluster Apache Flink di mesin Anda.

Otentikasi sesi Anda AWS

Aplikasi ini menggunakan aliran data Kinesis untuk mempublikasikan data. Saat berjalan secara lokal, Anda harus memiliki sesi AWS otentikasi yang valid dengan izin untuk menulis ke aliran data Kinesis. Gunakan langkah-langkah berikut untuk mengautentikasi sesi Anda:

  1. Jika Anda tidak memiliki AWS CLI dan profil bernama dengan kredensi valid yang dikonfigurasi, lihatMengatur AWS Command Line Interface (AWS CLI).

  2. Verifikasi bahwa Anda AWS CLI telah dikonfigurasi dengan benar dan pengguna Anda memiliki izin untuk menulis ke aliran data Kinesis dengan menerbitkan catatan pengujian berikut:

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. Jika Anda IDE memiliki plugin untuk diintegrasikan AWS, Anda dapat menggunakannya untuk meneruskan kredensil ke aplikasi yang berjalan di file. IDE Untuk informasi selengkapnya, lihat AWS Toolkit for PyCharm, AWS Toolkit for Visual Studio Code, AWS dan Toolkit for IntelliJ. IDEA

Unduh dan periksa kode Python streaming Apache Flink

Kode aplikasi Python untuk contoh ini tersedia dari. GitHub Untuk mengunduh kode aplikasi, lakukan hal berikut:

  1. Kloning repositori jarak jauh menggunakan perintah berikut:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. Buka direktori ./python/GettingStarted tersebut.

Tinjau komponen aplikasi

Kode aplikasi terletak dimain.py. Kami menggunakan SQL embedded dalam Python untuk menentukan aliran aplikasi.

catatan

Untuk pengalaman pengembang yang dioptimalkan, aplikasi ini dirancang untuk berjalan tanpa perubahan kode apa pun baik di Amazon Managed Service untuk Apache Flink maupun secara lokal, untuk pengembangan di mesin Anda. Aplikasi ini menggunakan variabel lingkungan IS_LOCAL = true untuk mendeteksi ketika sedang berjalan secara lokal. Anda harus mengatur variabel lingkungan IS_LOCAL = true baik pada shell Anda atau dalam konfigurasi run AndaIDE.

  • Aplikasi mengatur lingkungan eksekusi dan membaca konfigurasi runtime. Untuk bekerja baik di Amazon Managed Service untuk Apache Flink dan secara lokal, aplikasi memeriksa variabel. IS_LOCAL

    • Berikut ini adalah perilaku default saat aplikasi berjalan di Amazon Managed Service untuk Apache Flink:

      1. Muat dependensi yang dikemas dengan aplikasi. Untuk informasi lebih lanjut, lihat (tautan)

      2. Muat konfigurasi dari properti Runtime yang Anda tentukan di Amazon Managed Service untuk aplikasi Apache Flink. Untuk informasi lebih lanjut, lihat (tautan)

    • Saat aplikasi mendeteksi IS_LOCAL = true kapan Anda menjalankan aplikasi secara lokal:

      1. Memuat dependensi eksternal dari proyek.

      2. Memuat konfigurasi dari application_properties.json file yang disertakan dalam proyek.

        ... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
  • Aplikasi mendefinisikan tabel sumber dengan CREATE TABLE pernyataan, menggunakan Konektor Kinesis. Tabel ini membaca data dari aliran Kinesis masukan. Aplikasi mengambil nama aliran, Wilayah, dan posisi awal dari konfigurasi runtime.

    table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """)
  • Aplikasi ini juga mendefinisikan tabel wastafel menggunakan Konektor Kinesis dalam contoh ini. Kisah ini mengirimkan data ke aliran Kinesis keluaran.

    table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""")
  • Akhirnya, aplikasi mengeksekusi tabel INSERT INTO... wastafel dari tabel sumber. SQL Dalam aplikasi yang lebih kompleks, Anda mungkin memiliki langkah-langkah tambahan mengubah data sebelum menulis ke wastafel.

    table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""")
  • Anda harus menambahkan langkah lain di akhir main() fungsi untuk menjalankan aplikasi secara lokal:

    if is_local: table_result.wait()

    Tanpa pernyataan ini, aplikasi segera berakhir ketika Anda menjalankannya secara lokal. Anda tidak boleh menjalankan pernyataan ini ketika Anda menjalankan aplikasi Anda di Amazon Managed Service untuk Apache Flink.

Kelola JAR dependensi

PyFlink Aplikasi biasanya membutuhkan satu atau lebih konektor. Aplikasi dalam tutorial ini menggunakan Konektor Kinesis. Karena Apache Flink berjalan di JavaJVM, konektor didistribusikan sebagai JAR file, terlepas dari apakah Anda mengimplementasikan aplikasi Anda dengan Python. Anda harus mengemas dependensi ini dengan aplikasi saat Anda menerapkannya di Amazon Managed Service untuk Apache Flink.

Dalam contoh ini, kami menunjukkan bagaimana menggunakan Apache Maven untuk mengambil dependensi dan paket aplikasi untuk dijalankan pada Managed Service untuk Apache Flink.

catatan

Ada cara alternatif untuk mengambil dan mengemas dependensi. Contoh ini menunjukkan metode yang bekerja dengan benar dengan satu atau lebih konektor. Ini juga memungkinkan Anda menjalankan aplikasi secara lokal, untuk pengembangan, dan pada Managed Service untuk Apache Flink tanpa perubahan kode.

Gunakan file pom.xml

Apache Maven menggunakan pom.xml file untuk mengontrol dependensi dan kemasan aplikasi.

Setiap JAR dependensi ditentukan dalam pom.xml file di blok. <dependencies>...</dependencies>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...

Untuk menemukan artefak dan versi konektor yang benar untuk digunakan, lihatGunakan konektor Apache Flink dengan Managed Service untuk Apache Flink. Pastikan Anda merujuk ke versi Apache Flink yang Anda gunakan. Untuk contoh ini, kami menggunakan konektor Kinesis. Untuk Apache Flink 1.19, versi konektornya adalah. 4.3.0-1.19

catatan

Jika Anda menggunakan Apache Flink 1.19, tidak ada versi konektor yang dirilis khusus untuk versi ini. Gunakan konektor yang dilepaskan untuk 1,18.

Download dan paket dependensi

Gunakan Maven untuk mengunduh dependensi yang ditentukan dalam pom.xml file dan mengemasnya untuk aplikasi Python Flink.

  1. Arahkan ke direktori yang berisi proyek Python Getting Started yang disebut. python/GettingStarted

  2. Jalankan perintah berikut:

$ mvn package

Maven membuat file baru bernama. ./target/pyflink-dependencies.jar Saat Anda mengembangkan secara lokal di mesin Anda, aplikasi Python mencari file ini.

catatan

Jika Anda lupa menjalankan perintah ini, ketika Anda mencoba menjalankan aplikasi Anda, itu akan gagal dengan kesalahan: Tidak dapat menemukan pabrik apa pun untuk pengenal “kinesis.

Tulis catatan sampel ke aliran input

Di bagian ini, Anda akan mengirim catatan sampel ke aliran untuk aplikasi untuk diproses. Anda memiliki dua opsi untuk menghasilkan data sampel, baik menggunakan skrip Python atau Kinesis Data Generator.

Menghasilkan data sampel menggunakan skrip Python

Anda dapat menggunakan skrip Python untuk mengirim catatan sampel ke aliran.

catatan

Untuk menjalankan skrip Python ini, Anda harus menggunakan Python 3.x dan menginstal pustaka for AWS SDKPython (Boto).

Untuk mulai mengirim data uji ke aliran input Kinesis:

  1. Unduh skrip stock.py Python generator data dari repositori generator GitHub Data.

  2. Jalankan skrip stock.py.

    $ python stock.py

Jaga agar skrip tetap berjalan saat Anda menyelesaikan sisa tutorial. Anda sekarang dapat menjalankan aplikasi Apache Flink Anda.

Hasilkan data sampel menggunakan Kinesis Data Generator

Atau menggunakan skrip Python, Anda dapat menggunakan Kinesis Data Generator, juga tersedia dalam versi yang dihosting, untuk mengirim data sampel acak ke aliran. Kinesis Data Generator berjalan di browser Anda, dan Anda tidak perlu menginstal apa pun di mesin Anda.

Untuk mengatur dan menjalankan Kinesis Data Generator:

  1. Ikuti petunjuk dalam dokumentasi Kinesis Data Generator untuk mengatur akses ke alat. Anda akan menjalankan AWS CloudFormation template yang mengatur pengguna dan kata sandi.

  2. Akses Kinesis Data Generator melalui yang URL dihasilkan oleh template. CloudFormation Anda dapat menemukan URL di Output tab setelah CloudFormation template selesai.

  3. Konfigurasikan generator data:

    • Wilayah: Pilih Wilayah yang Anda gunakan untuk tutorial ini: us-east-1

    • Stream/streaming pengiriman: Pilih aliran input yang akan digunakan aplikasi: ExampleInputStream

    • Catatan per detik: 100

    • Rekam templat: Salin dan tempel templat berikut:

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. Uji template: Pilih template Uji dan verifikasi bahwa catatan yang dihasilkan mirip dengan yang berikut:

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. Mulai generator data: Pilih Pilih Kirim Data.

Kinesis Data Generator sekarang mengirimkan data ke file. ExampleInputStream

Jalankan aplikasi Anda secara lokal

Anda dapat menguji aplikasi secara lokal, berjalan dari baris perintah dengan python main.py atau dari AndaIDE.

Untuk menjalankan aplikasi Anda secara lokal, Anda harus menginstal versi PyFlink pustaka yang benar seperti yang dijelaskan di bagian sebelumnya. Untuk informasi lebih lanjut, lihat (tautan)

catatan

Sebelum melanjutkan, verifikasi bahwa aliran input dan output tersedia. Lihat Buat dua aliran data Amazon Kinesis. Juga, verifikasi bahwa Anda memiliki izin untuk membaca dan menulis dari kedua aliran. Lihat Otentikasi sesi Anda AWS.

Impor proyek Python ke IDE

Untuk mulai mengerjakan aplikasi di AndaIDE, Anda harus mengimpornya sebagai proyek Python.

Repositori yang Anda kloning berisi beberapa contoh. Setiap contoh adalah proyek terpisah. Untuk tutorial ini, impor konten dalam ./python/GettingStarted subdirektori ke dalam AndaIDE.

Impor kode sebagai proyek Python yang ada.

catatan

Proses yang tepat untuk mengimpor proyek Python baru bervariasi tergantung pada yang IDE Anda gunakan.

Periksa konfigurasi aplikasi lokal

Saat berjalan secara lokal, aplikasi menggunakan konfigurasi dalam application_properties.json file di folder sumber daya proyek di bawah./src/main/resources. Anda dapat mengedit file ini untuk menggunakan nama atau Wilayah aliran Kinesis yang berbeda.

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

Jalankan aplikasi Python Anda secara lokal

Anda dapat menjalankan aplikasi Anda secara lokal, baik dari baris perintah sebagai skrip Python biasa, atau dari file. IDE

Untuk menjalankan aplikasi Anda dari baris perintah
  1. Pastikan bahwa lingkungan Python mandiri seperti Conda VirtualEnv atau tempat Anda menginstal pustaka Python Flink saat ini aktif.

  2. Pastikan Anda berlari mvn package setidaknya satu kali.

  3. Atur variabel lingkungan IS_LOCAL = true:

    $ export IS_LOCAL=true
  4. Jalankan aplikasi sebagai skrip Python biasa.

    $python main.py
Untuk menjalankan aplikasi dari dalam IDE
  1. IDEKonfigurasikan Anda untuk menjalankan main.py skrip dengan konfigurasi berikut:

    1. Gunakan lingkungan Python mandiri seperti Conda VirtualEnv atau tempat Anda menginstal perpustakaan. PyFlink

    2. Gunakan AWS kredensil untuk mengakses input dan output Kinesis aliran data.

    3. Atur IS_LOCAL = true.

  2. Proses yang tepat untuk mengatur konfigurasi run tergantung pada Anda IDE dan bervariasi.

  3. Ketika Anda telah mengatur AndaIDE, jalankan skrip Python dan gunakan tooling yang disediakan oleh Anda IDE saat aplikasi sedang berjalan.

Periksa log aplikasi secara lokal

Saat berjalan secara lokal, aplikasi tidak menampilkan log apa pun di konsol, selain dari beberapa baris yang dicetak dan ditampilkan saat aplikasi dimulai. PyFlink menulis log ke file di direktori tempat pustaka Python Flink diinstal. Aplikasi mencetak lokasi log saat dimulai. Anda juga dapat menjalankan perintah berikut untuk menemukan log:

$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
  1. Buat daftar file di direktori logging. Anda biasanya menemukan satu .log file.

  2. Ekor file saat aplikasi sedang berjalan:tail -f <log-path>/<log-file>.log.

Amati data input dan output dalam aliran Kinesis

Anda dapat mengamati catatan yang dikirim ke aliran input oleh (menghasilkan sampel Python) atau Kinesis Data Generator (link) dengan menggunakan Data Viewer di konsol Amazon Kinesis.

Untuk mengamati catatan:

Menghentikan aplikasi Anda berjalan secara lokal

Hentikan aplikasi yang berjalan di AndaIDE. IDEBiasanya menyediakan opsi “berhenti”. Lokasi dan metode yang tepat tergantung padaIDE.

Package kode aplikasi Anda

Di bagian ini, Anda menggunakan Apache Maven untuk mengemas kode aplikasi dan semua dependensi yang diperlukan dalam file.zip.

Jalankan perintah paket Maven lagi:

$ mvn package

Perintah ini menghasilkan filetarget/managed-flink-pyflink-getting-started-1.0.0.zip.

Unggah paket aplikasi ke bucket Amazon S3

Di bagian ini, Anda mengunggah file.zip yang Anda buat di bagian sebelumnya ke bucket Amazon Simple Storage Service (Amazon S3) yang Anda buat di awal tutorial ini. Jika Anda belum menyelesaikan langkah ini, lihat (tautan).

Untuk mengunggah JAR file kode aplikasi
  1. Buka konsol Amazon S3 di. https://console.aws.amazon.com/s3/

  2. Pilih bucket yang sebelumnya Anda buat untuk kode aplikasi.

  3. Pilih Unggah.

  4. Pilih Tambahkan file.

  5. Arahkan ke file.zip yang dihasilkan pada langkah sebelumnya:target/managed-flink-pyflink-getting-started-1.0.0.zip.

  6. Pilih Unggah tanpa mengubah pengaturan lainnya.

Buat dan konfigurasikan Layanan Terkelola untuk aplikasi Apache Flink

Anda dapat membuat dan mengkonfigurasi Layanan Terkelola untuk aplikasi Apache Flink menggunakan konsol atau aplikasi. AWS CLI Untuk tutorial ini, kita akan menggunakan konsol.

Buat aplikasi

  1. Buka Layanan Terkelola untuk konsol Apache Flink di /flink https://console.aws.amazon.com

  2. Verifikasi bahwa Wilayah yang benar dipilih: US East (Virginia N.) us-east-1.

  3. Buka menu sisi kanan dan pilih aplikasi Apache Flink dan kemudian Buat aplikasi streaming. Atau, pilih Buat aplikasi streaming dari bagian Memulai di halaman awal.

  4. Pada halaman Buat aplikasi streaming:

    • Untuk Memilih metode untuk mengatur aplikasi pemrosesan aliran, pilih Buat dari awal.

    • Untuk konfigurasi Apache Flink, versi Application Flink, pilih Apache Flink 1.19.

    • Untuk konfigurasi Aplikasi:

      • Untuk Application name (Nama aplikasi), masukkan MyApplication.

      • Untuk Description (Deskripsi), masukkan My Python test app.

      • Di Akses ke sumber daya aplikasi, pilih Buat/IAMperbarui peran kinesis-analytics- MyApplication -us-east-1 dengan kebijakan yang diperlukan.

    • Untuk Template untuk pengaturan aplikasi:

      • Untuk Template, pilih Development.

    • Pilih Buat aplikasi streaming.

catatan

Saat membuat aplikasi Managed Service for Apache Flink menggunakan konsol, Anda memiliki opsi untuk membuat IAM peran dan kebijakan untuk aplikasi Anda. Aplikasi Anda menggunakan peran dan kebijakan ini untuk mengakses sumber daya dependen. IAMSumber daya ini diberi nama menggunakan nama aplikasi dan Wilayah Anda sebagai berikut:

  • Kebijakan: kinesis-analytics-service-MyApplication-us-west-2

  • Peran: kinesisanalytics-MyApplication-us-west-2

Amazon Managed Service untuk Apache Flink sebelumnya dikenal sebagai Kinesis Data Analytics. Nama sumber daya yang dihasilkan secara otomatis diawali dengan kinesis-analytics kompatibilitas mundur.

Edit IAM kebijakan

Edit IAM kebijakan untuk menambahkan izin untuk mengakses bucket Amazon S3.

Untuk mengedit IAM kebijakan untuk menambahkan izin bucket S3
  1. Buka IAM konsol di https://console.aws.amazon.com/iam/.

  2. Pilih Policies (Kebijakan). Pilih kebijakan kinesis-analytics-service-MyApplication-us-east-1 yang dibuat konsol untuk Anda di bagian sebelumnya.

  3. Pilih Edit dan kemudian pilih JSONtab.

  4. Tambahkan bagian yang disorot dari contoh kebijakan berikut ke kebijakan. Ganti akun sampel IDs (012345678901) dengan ID akun Anda.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. Pilih Berikutnya dan kemudian pilih Simpan perubahan.

Konfigurasikan aplikasi

Edit konfigurasi aplikasi untuk mengatur artefak kode aplikasi.

Untuk mengonfigurasi aplikasi
  1. Pada MyApplicationhalaman, pilih Konfigurasi.

  2. Di bagian Lokasi kode aplikasi:

    • Untuk bucket Amazon S3, pilih bucket yang sebelumnya Anda buat untuk kode aplikasi. Pilih Browse dan pilih bucket yang benar, lalu pilih Pilih. Jangan pilih nama bucket.

    • Untuk Jalur ke objek Amazon S3, masukkan managed-flink-pyflink-getting-started-1.0.0.zip.

  3. Untuk izin Akses, pilih Buat/perbarui IAM peran kinesis-analytics-MyApplication-us-east-1 dengan kebijakan yang diperlukan.

  4. Pindah ke properti Runtime dan pertahankan nilai default untuk semua pengaturan lainnya.

  5. Pilih Tambahkan item baru dan tambahkan masing-masing parameter berikut:

    ID Grup Kunci Nilai
    InputStream0 stream.name ExampleInputStream
    InputStream0 flink.stream.initpos LATEST
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
    kinesis.analytics.flink.run.options python main.py
    kinesis.analytics.flink.run.options jarfile lib/pyflink-dependencies.jar
  6. Jangan memodifikasi bagian lain dan pilih Simpan perubahan.

catatan

Saat Anda memilih untuk mengaktifkan CloudWatch pencatatan Amazon, Layanan Terkelola untuk Apache Flink membuat grup log dan aliran log untuk Anda. Nama-nama sumber daya ini adalah sebagai berikut:

  • Grup log: /aws/kinesis-analytics/MyApplication

  • Aliran log: kinesis-analytics-log-stream

Jalankan aplikasi

Aplikasi sekarang dikonfigurasi dan siap dijalankan.

Untuk menjalankan aplikasi
  1. Di konsol untuk Amazon Managed Service untuk Apache Flink, pilih Aplikasi Saya dan pilih Jalankan.

  2. Pada halaman berikutnya, halaman konfigurasi Pemulihan aplikasi, pilih Jalankan dengan snapshot terbaru dan kemudian pilih Jalankan.

    Status dalam Aplikasi merinci transisi dari Ready ke Starting dan kemudian ke Running saat aplikasi telah dimulai.

Saat aplikasi dalam Running status, Anda sekarang dapat membuka dasbor Flink.

Untuk membuka dasbor
  1. Pilih Buka dasbor Apache Flink. Dasbor terbuka di halaman baru.

  2. Dalam daftar pekerjaan Runing, pilih satu pekerjaan yang dapat Anda lihat.

    catatan

    Jika Anda menyetel properti Runtime atau mengedit IAM kebijakan secara tidak benar, status aplikasi mungkin berubah menjadiRunning, tetapi dasbor Flink menunjukkan bahwa pekerjaan terus dimulai ulang. Ini adalah skenario kegagalan umum jika aplikasi salah konfigurasi atau tidak memiliki izin untuk mengakses sumber daya eksternal.

    Ketika ini terjadi, periksa tab Pengecualian di dasbor Flink untuk melihat penyebab masalah.

Amati metrik aplikasi yang sedang berjalan

Pada MyApplicationhalaman, di bagian CloudWatch metrik Amazon, Anda dapat melihat beberapa metrik dasar dari aplikasi yang sedang berjalan.

Untuk melihat metrik
  1. Di sebelah tombol Refresh, pilih 10 detik dari daftar dropdown.

  2. Saat aplikasi berjalan dan sehat, Anda dapat melihat metrik uptime terus meningkat.

  3. Metrik fullrestart harus nol. Jika meningkat, konfigurasi mungkin memiliki masalah. Untuk menyelidiki masalah ini, tinjau tab Pengecualian di dasbor Flink.

  4. Jumlah metrik pos pemeriksaan yang gagal harus nol dalam aplikasi yang sehat.

    catatan

    Dasbor ini menampilkan satu set metrik tetap dengan perincian 5 menit. Anda dapat membuat dasbor aplikasi khusus dengan metrik apa pun di CloudWatch dasbor.

Amati data keluaran dalam aliran Kinesis

Pastikan Anda masih mempublikasikan data ke input, baik menggunakan script Python atau Kinesis Data Generator.

Anda sekarang dapat mengamati output dari aplikasi yang berjalan pada Managed Service untuk Apache Flink dengan menggunakan Data Viewer di https://console.aws.amazon.com/kinesis/, mirip dengan apa yang sudah Anda lakukan sebelumnya.

Untuk melihat output
  1. Buka konsol Kinesis di /kinesis. https://console.aws.amazon.com

  2. Verifikasi bahwa Region sama dengan yang Anda gunakan untuk menjalankan tutorial ini. Secara default, itu adalah AS-Timur-1us Timur (Virginia N.). Ubah Wilayah jika perlu.

  3. Pilih Aliran Data.

  4. Pilih aliran yang ingin Anda amati. Untuk tutorial ini, gunakan ExampleOutputStream.

  5. Pilih tab Penampil data.

  6. Pilih Shard apa saja, simpan Terbaru sebagai posisi Awal, lalu pilih Dapatkan catatan. Anda mungkin melihat kesalahan “tidak ada catatan ditemukan untuk permintaan ini”. Jika demikian, pilih Coba lagi mendapatkan catatan. Catatan terbaru yang diterbitkan ke tampilan streaming.

  7. Pilih nilai di kolom Data untuk memeriksa konten catatan dalam JSON format.

Hentikan aplikasi

Untuk menghentikan aplikasi, buka halaman konsol dari aplikasi Managed Service for Apache Flink bernama. MyApplication

Untuk menghentikan aplikasi
  1. Dari daftar dropdown Action, pilih Stop.

  2. Status dalam Aplikasi merinci transisi dari Running keStopping, dan kemudian ke Ready saat aplikasi benar-benar dihentikan.

    catatan

    Jangan lupa juga untuk berhenti mengirim data ke input stream dari script Python atau Kinesis Data Generator.

Langkah selanjutnya

Bersihkan AWS sumber daya