Jendela Geser - Panduan Pengembang Amazon Kinesis Data Analytics SQL untuk Aplikasi

Untuk proyek baru, kami menyarankan Anda menggunakan Managed Service baru untuk Apache Flink Studio melalui Kinesis Data Analytics for Applications. SQL Layanan Terkelola untuk Apache Flink Studio menggabungkan kemudahan penggunaan dengan kemampuan analitis tingkat lanjut, memungkinkan Anda membangun aplikasi pemrosesan aliran yang canggih dalam hitungan menit.

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Jendela Geser

Alih-alih mengelompokkan catatan menggunakan GROUP BY, Anda dapat menentukan jendela berbasis waktu atau berbasis baris. Anda melakukan ini dengan menambahkan klausul WINDOW eksplisit.

Dalam kasus ini, saat jendela meluncur seiring waktu, Amazon Kinesis Data Analytics memancarkan output saat catatan baru muncul di aliran. Kinesis Data Analytics memancarkan output ini dengan memproses baris di jendela. Windows dapat tumpang tindih dalam tipe pemrosesan ini, dan catatan dapat menjadi bagian dari beberapa jendela dan diproses dengan setiap jendela. Contoh berikut menggambarkan jendela geser.

Pertimbangkan kueri sederhana yang menghitung catatan di aliran. Contoh ini mengasumsikan jendela 5 detik. Dalam contoh aliran berikut, catatan baru tiba pada waktu t1, t2, t6, dan t7, serta tiga catatan tiba pada waktu t8 detik.

Timeline showing record arrivals at t1, t2, t6, t7, and multiple at t8 within a 5-second window.

Ingatlah hal-hal berikut ini:

  • Contoh mengasumsikan jendela 5 detik. Jendela 5 detik terus meluncur seiring waktu.

  • Untuk setiap baris yang memasuki jendela, baris output dipancarkan oleh jendela geser. Segera setelah aplikasi dimulai, Anda melihat kueri memancarkan output untuk setiap catatan baru yang muncul di aliran, meskipun jendela 5 detik belum diteruskan. Sebagai contoh, kueri memancarkan output ketika catatan muncul di detik pertama dan detik kedua. Selanjutnya, kueri memproses catatan di jendela 5 detik.

  • Jendela meluncur seiring waktu. Jika catatan lama di aliran masuk ke jendela, kueri tidak memancarkan output kecuali juga ada catatan baru di aliran yang masuk dalam jendela 5 detik.

Misalkan kueri mulai mengeksekusi di t0. Kemudian hal berikut terjadi:

  1. Pada waktu t0, kueri dimulai. Kueri tidak memancarkan output (nilai jumlah) karena saat ini tidak ada catatan.

    Timeline showing a stream starting at t0 with no output initially indicated.
  2. Pada waktu t1, catatan baru muncul di aliran, dan kueri memancarkan nilai hitungan 1.

    Timeline showing a stream with a record appearing at time t1, and an arrow pointing to t0.
  3. Pada waktu t2, catatan baru muncul di aliran, dan kueri memancarkan hitungan 2.

    Timeline showing stream events at different time points, with two vertical bars at the end.
  4. Jendela 5 detik meluncur seiring waktu:

    • Di t3, jendela geser t3 ke t0

    • Di t4 (jendela geser t4 ke t0)

    • Di t5, jendela geser t5–t0

    Pada semua waktu ini, jendela 5 detik memiliki catatan yang sama—tidak ada catatan baru. Oleh karena itu, kueri tidak memancarkan output apa pun.

    Timeline showing stream with multiple time points and colored rectangles representing data windows.
  5. Pada waktu t6, jendela 5 detik adalah (t6 ke t1). Kueri mendeteksi satu catatan baru di t6 sehingga memancarkan output 2. Catatan di t1 tidak lagi berada di jendela dan tidak masuk hitungan.

    Timeline showing stream events at different time points with a sliding 5-second window.
  6. Pada waktu t7, jendela 5 detik adalah t7 ke t2. Kueri mendeteksi satu catatan baru di t7 sehingga memancarkan output 2. Catatan di t2 tidak lagi di jendela 5 detik, dan oleh karena itu tidak dihitung.

    Timeline showing stream events and time points from t0 to t7, with a 5-second window highlighted.
  7. Pada waktu t8, jendela 5 detik adalah t8 ke t3. Kueri mendeteksi tiga catatan baru, dan oleh karena itu memancarkan catatan hitungan 5.

    Timeline showing stream events with orange bars representing record counts at different time intervals.

Singkatnya, jendela adalah ukuran tetap dan meluncur seiring waktu. Kueri memancarkan output ketika catatan baru muncul.

catatan

Sebaiknya gunakan jendela geser tidak lebih dari satu jam. Jika Anda menggunakan jendela yang lebih lama, aplikasi memerlukan waktu lebih lama untuk memulai ulang setelah pemeliharaan sistem reguler. Hal ini karena sumber data harus dibaca kembali dari aliran.

Contoh kueri berikut menggunakan klausa WINDOW untuk menentukan jendela dan melakukan agregat. Karena kueri tidak menentukan GROUP BY, kueri menggunakan pendekatan jendela geser untuk memproses catatan di aliran.

Contoh 1: Proses Streaming menggunakan Jendela Geser 1 Menit

Pertimbangkan aliran demo dalam latihan Memulai yang mengisi aliran dalam aplikasi, SOURCE_SQL_STREAM_001. Berikut ini adalah skemanya.

(TICKER_SYMBOL VARCHAR(4), SECTOR varchar(16), CHANGE REAL, PRICE REAL)

Misalkan Anda ingin aplikasi Anda menghitung agregat menggunakan jendela geser 1 menit. Yaitu, untuk setiap catatan baru yang muncul di aliran, Anda ingin aplikasi memancarkan output dengan menerapkan agregat pada catatan di jendela 1 menit sebelumnya.

Anda dapat menggunakan kueri jendela berbasis waktu berikut. Kueri menggunakan klausul WINDOW untuk menentukan interval rentang 1 menit. PARTITION BY di klausa WINDOW mengelompokkan catatan berdasarkan nilai ticker dalam jendela geser.

SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
Untuk menguji kueri
  1. Siapkan aplikasi dengan mengikuti Latihan Memulai.

  2. Ubah pernyataan SELECT dalam kode aplikasi dengan kueri SELECT sebelumnya. Kode aplikasi yang dihasilkan adalah sebagai berikut.

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Min_Price double, Max_Price double, Avg_Price double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);

Contoh 2: Kueri yang Menerapkan Agregat di Jendela Geser

Kueri pada aliran demo berikut mengembalikan rata-rata perubahan persen dalam harga setiap ticker dalam jendela 10 detik.

SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

Untuk menguji kueri
  1. Siapkan aplikasi dengan mengikuti Latihan Memulai.

  2. Ubah pernyataan SELECT dalam kode aplikasi dengan kueri SELECT sebelumnya. Kode aplikasi yang dihasilkan adalah sebagai berikut.

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Avg_Percent_Change double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

Contoh 3: Kueri Data dari Beberapa Jendela Geser di Aliran yang Sama

Anda dapat menulis kueri untuk memancarkan output tempat setiap nilai kolom dihitung menggunakan jendela geser yang berbeda yang didefinisikan melalui aliran yang sama.

Pada contoh berikut, kueri memancarkan ticker output, harga, a2, dan a10. Ini memancarkan output untuk simbol ticker yang rata-rata pergerakan dua barisnya melintasi rata-rata pergerakan sepuluh baris. Nilai kolom a2 dan a10 diturunkan dari jendela geser dua baris dan sepuluh baris.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(12), price double, average_last2rows double, average_last10rows double); CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, price, avg(price) over last2rows, avg(price) over last10rows FROM SOURCE_SQL_STREAM_001 WINDOW last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING), last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);

Untuk menguji kueri ini terhadap aliran demo, ikuti prosedur pengujian yang dijelaskan dalam Contoh 1.