Migrasi ke Layanan Terkelola untuk Contoh Apache Flink Studio - Panduan Pengembang Amazon Kinesis Data Analytics SQL untuk Aplikasi

Setelah mempertimbangkan dengan cermat, kami memutuskan untuk menghentikan Amazon Kinesis Data Analytics SQL untuk aplikasi dalam dua langkah:

1. Mulai 15 Oktober 2025, Anda tidak akan dapat membuat Kinesis Data Analytics SQL baru untuk aplikasi.

2. Kami akan menghapus aplikasi Anda mulai 27 Januari 2026. Anda tidak akan dapat memulai atau mengoperasikan Amazon Kinesis Data Analytics Anda SQL untuk aplikasi. Support tidak akan lagi tersedia untuk Amazon Kinesis Data Analytics SQL sejak saat itu. Untuk informasi selengkapnya, lihat Amazon Kinesis Data Analytics SQL untuk penghentian Aplikasi.

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Migrasi ke Layanan Terkelola untuk Contoh Apache Flink Studio

Setelah mempertimbangkan dengan cermat, kami telah membuat keputusan untuk menghentikan Amazon Kinesis Data Analytics SQL untuk aplikasi. Untuk membantu Anda merencanakan dan bermigrasi dari Amazon Kinesis Data Analytics SQL untuk aplikasi, kami akan menghentikan penawaran secara bertahap selama 15 bulan. Ada dua tanggal penting yang perlu diperhatikan, 15 Oktober 2025, dan 27 Januari 2026.

  1. Mulai 15 Oktober 2025, Anda tidak akan dapat membuat Amazon Kinesis Data Analytics SQL baru untuk aplikasi.

  2. Kami akan menghapus aplikasi Anda mulai 27 Januari 2026. Anda tidak akan dapat memulai atau mengoperasikan Amazon Kinesis Data Analytics SQL untuk aplikasi. Support tidak akan lagi tersedia untuk Amazon Kinesis Data Analytics SQL untuk aplikasi sejak saat itu. Untuk mempelajari selengkapnya, lihat Amazon Kinesis Data Analytics SQL untuk penghentian Aplikasi.

Kami menyarankan Anda menggunakan Amazon Managed Service untuk Apache Flink. Ini menggabungkan kemudahan penggunaan dengan kemampuan analitis tingkat lanjut, memungkinkan Anda membangun aplikasi pemrosesan aliran dalam hitungan menit.

Bagian ini menyediakan contoh kode dan arsitektur untuk membantu Anda memindahkan Amazon Kinesis Data Analytics SQL untuk beban kerja aplikasi ke Managed Service for Apache Flink.

Untuk informasi tambahan, lihat juga posting AWS blog ini: Migrasi dari Amazon Kinesis Data Analytics SQL untuk Aplikasi ke Managed Service untuk Apache Flink Studio.

Untuk memigrasikan beban kerja Anda ke Managed Service for Apache Flink Studio atau Managed Service for Apache Flink, bagian ini menyediakan terjemahan kueri yang dapat Anda gunakan untuk kasus penggunaan umum.

Sebelum Anda menjelajahi contoh-contoh ini, kami sarankan Anda terlebih dahulu meninjau Menggunakan notebook Studio dengan Layanan Terkelola untuk Apache Flink.

Membuat ulang Kinesis Data Analytics SQL untuk kueri di Managed Service untuk Apache Flink Studio

Opsi berikut menyediakan terjemahan kueri aplikasi Kinesis Data Analytics SQL berbasis umum ke Managed Service for Apache Flink Studio.

SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "IN_APP_STREAM_001" ( ingest_time TIMESTAMP, ticker_symbol VARCHAR(4), sector VARCHAR(16), price REAL, change REAL); CREATE OR REPLACE PUMP "STREAM_PUMP_001" AS INSERT INTO "IN_APP_STREAM_001" SELECT STREAM APPROXIMATE_ARRIVAL_TIME, ticker_symbol, sector, price, change FROM "SOURCE_SQL_STREAM_001"; -- Second in-app stream and pump CREATE OR REPLACE STREAM "IN_APP_STREAM_02" (ingest_time TIMESTAMP, ticker_symbol VARCHAR(4), sector VARCHAR(16), price REAL, change REAL); CREATE OR REPLACE PUMP "STREAM_PUMP_02" AS INSERT INTO "IN_APP_STREAM_02" SELECT STREAM ingest_time, ticker_symbol, sector, price, change FROM "IN_APP_STREAM_001"; -- Destination in-app stream and third pump CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ingest_time TIMESTAMP, ticker_symbol VARCHAR(4), sector VARCHAR(16), price REAL, change REAL); CREATE OR REPLACE PUMP "STREAM_PUMP_03" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ingest_time, ticker_symbol, sector, price, change FROM "IN_APP_STREAM_02";
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001; CREATE TABLE SOURCE_SQL_STREAM_001 (TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), PRICE DOUBLE, CHANGE DOUBLE, APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS IN_APP_STREAM_001; CREATE TABLE IN_APP_STREAM_001 ( INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), PRICE DOUBLE, CHANGE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'IN_APP_STREAM_001', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS IN_APP_STREAM_02; CREATE TABLE IN_APP_STREAM_02 ( INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), PRICE DOUBLE, CHANGE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'IN_APP_STREAM_02', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), PRICE DOUBLE, CHANGE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'DESTINATION_SQL_STREAM', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - % flink.ssql(type = update ) INSERT INTO IN_APP_STREAM_001 SELECT APPROXIMATE_ARRIVAL_TIME AS INGEST_TIME, TICKER_SYMBOL, SECTOR, PRICE, CHANGE FROM SOURCE_SQL_STREAM_001; Query 3 - % flink.ssql(type = update ) INSERT INTO IN_APP_STREAM_02 SELECT INGEST_TIME, TICKER_SYMBOL, SECTOR, PRICE, CHANGE FROM IN_APP_STREAM_001; Query 4 - % flink.ssql(type = update ) INSERT INTO DESTINATION_SQL_STREAM SELECT INGEST_TIME, TICKER_SYMBOL, SECTOR, PRICE, CHANGE FROM IN_APP_STREAM_02;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( TICKER VARCHAR(4), EVENT_TIME TIMESTAMP(3), FIVE_MINUTES_BEFORE TIMESTAMP(3), EVENT_UNIX_TIMESTAMP INT, EVENT_TIMESTAMP_AS_CHAR VARCHAR(50), EVENT_SECOND INT) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE, UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP, DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR, EXTRACT(SECOND FROM EVENT_TIME) AS EVENT_SECOND FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( ticker_symbol VARCHAR(4), sector VARCHAR(12), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, sector, change, price FROM "SOURCE_SQL_STREAM_001" WHERE ( ABS(Change / (Price - Change)) * 100 ) > 1
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(4), CHANGE DOUBLE, PRICE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - % flink.ssql(type = update ) SELECT TICKER_SYMBOL, SECTOR, CHANGE, PRICE FROM DESTINATION_SQL_STREAM WHERE ( ABS(CHANGE / (PRICE - CHANGE)) * 100 ) > 1;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "CHANGE_STREAM"( ticker_symbol VARCHAR(4), sector VARCHAR(12), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "change_pump" AS INSERT INTO "CHANGE_STREAM" SELECT STREAM ticker_symbol, sector, change, price FROM "SOURCE_SQL_STREAM_001" WHERE ( ABS(Change / (Price - Change)) * 100 ) > 1; -- ** Trigger Count and Limit ** -- Counts "triggers" or those values that evaluated true against the previous where clause -- Then provides its own limit on the number of triggers per hour per ticker symbol to what is specified in the WHERE clause CREATE OR REPLACE STREAM TRIGGER_COUNT_STREAM ( ticker_symbol VARCHAR(4), change REAL, trigger_count INTEGER); CREATE OR REPLACE PUMP trigger_count_pump AS INSERT INTO TRIGGER_COUNT_STREAMSELECT STREAM ticker_symbol, change, trigger_count FROM ( SELECT STREAM ticker_symbol, change, COUNT(*) OVER W1 as trigger_countFROM "CHANGE_STREAM" --window to perform aggregations over last minute to keep track of triggers WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING ) ) WHERE trigger_count >= 1;
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(4), CHANGE DOUBLE, PRICE DOUBLE, EVENT_TIME AS PROCTIME()) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS TRIGGER_COUNT_STREAM; CREATE TABLE TRIGGER_COUNT_STREAM ( TICKER_SYMBOL VARCHAR(4), CHANGE DOUBLE, TRIGGER_COUNT INT) PARTITIONED BY (TICKER_SYMBOL); Query 2 - % flink.ssql(type = update ) SELECT TICKER_SYMBOL, SECTOR, CHANGE, PRICE FROM DESTINATION_SQL_STREAM WHERE ( ABS(CHANGE / (PRICE - CHANGE)) * 100 ) > 1; Query 3 - % flink.ssql(type = update ) SELECT * FROM( SELECT TICKER_SYMBOL, CHANGE, COUNT(*) AS TRIGGER_COUNT FROM DESTINATION_SQL_STREAM GROUP BY TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE), TICKER_SYMBOL, CHANGE ) WHERE TRIGGER_COUNT > 1;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"( TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM"( "TICKER", "TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001", "ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime", COUNT(*) AS "TickerCount " FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE), STEP("SOURCE_SQL_STREAM_001"." APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE), TICKER_SYMBOL; CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" ( "TICKER", "TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER", "TRADETIME", SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" FROM "CALC_COUNT_SQL_STREAM" WINDOW W1 AS ( PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING ) ;
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001; CREATE TABLE SOURCE_SQL_STREAM_001 ( TICKER_SYMBOL VARCHAR(4), TRADETIME AS PROCTIME(), APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS CALC_COUNT_SQL_STREAM; CREATE TABLE CALC_COUNT_SQL_STREAM ( TICKER VARCHAR(4), TRADETIME TIMESTAMP(3), WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'CALC_COUNT_SQL_STREAM', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv'); DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER VARCHAR(4), TRADETIME TIMESTAMP(3), WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ('connector' = 'kinesis', 'stream' = 'DESTINATION_SQL_STREAM', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv'); Query 2 - % flink.ssql(type = update ) INSERT INTO CALC_COUNT_SQL_STREAM SELECT TICKER, TO_TIMESTAMP(TRADETIME, 'yyyy-MM-dd HH:mm:ss') AS TRADETIME, TICKERCOUNT FROM ( SELECT TICKER_SYMBOL AS TICKER, DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00') AS TRADETIME, COUNT(*) AS TICKERCOUNT FROM SOURCE_SQL_STREAM_001 GROUP BY TUMBLE(TRADETIME, INTERVAL '1' MINUTE), DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00'), DATE_FORMAT(APPROXIMATE_ARRIVAL_TIME, 'yyyy-MM-dd HH:mm:00'), TICKER_SYMBOL ) ; Query 3 - % flink.ssql(type = update ) SELECT * FROM CALC_COUNT_SQL_STREAM; Query 4 - % flink.ssql(type = update ) INSERT INTO DESTINATION_SQL_STREAM SELECT TICKER, TRADETIME, SUM(TICKERCOUNT) OVER W1 AS TICKERCOUNT FROM CALC_COUNT_SQL_STREAM WINDOW W1 AS ( PARTITION BY TICKER ORDER BY TRADETIME RANGE INTERVAL '10' MINUTE PRECEDING ) ; Query 5 - % flink.ssql(type = update ) SELECT * FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM for cleaned up referrerCREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32)); CREATE OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "APPROXIMATE_ARRIVAL_TIME", SUBSTRING("referrer", 12, ( POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4 ) ) FROM "SOURCE_SQL_STREAM_001";
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( referrer VARCHAR(32), ingest_time AS PROCTIME() ) PARTITIONED BY (referrer) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT ingest_time, substring(referrer, 12, 6) as referrer FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM for cleaned up referrerCREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32)); CREATE OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "APPROXIMATE_ARRIVAL_TIME", REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0) FROM "SOURCE_SQL_STREAM_001";
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( referrer VARCHAR(32), ingest_time AS PROCTIME()) PARTITIONED BY (referrer) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT ingest_time, REGEXP_REPLACE(referrer, 'http', 'https') as referrer FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( sector VARCHAR(24), match1 VARCHAR(24), match2 VARCHAR(24)); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM T.SECTOR, T.REC.COLUMN1, T.REC.COLUMN2 FROM ( SELECT STREAM SECTOR, REGEX_LOG_PARSE(SECTOR, '.*([E].).*([R].*)') AS REC FROM SOURCE_SQL_STREAM_001 ) AS T;
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( CHANGE DOUBLE, PRICE DOUBLE, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16)) PARTITIONED BY (SECTOR) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT * FROM ( SELECT SECTOR, REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 1) AS MATCH1, REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 2) AS MATCH2 FROM DESTINATION_SQL_STREAM ) WHERE MATCH1 IS NOT NULL AND MATCH2 IS NOT NULL;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( TICKER VARCHAR(4), EVENT_TIME TIMESTAMP(3), FIVE_MINUTES_BEFORE TIMESTAMP(3), EVENT_UNIX_TIMESTAMP INT, EVENT_TIMESTAMP_AS_CHAR VARCHAR(50), EVENT_SECOND INT) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE, UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP, DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR, EXTRACT(SECOND FROM EVENT_TIME) AS EVENT_SECOND FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( event_time TIMESTAMP, ticker_symbol VARCHAR(4), ticker_count INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM EVENT_TIME, TICKER, COUNT(TICKER) AS ticker_count FROM "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY TICKER, EVENT_TIME RANGE INTERVAL '1' MINUTE);
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( EVENT_TIME TIMESTAMP(3), WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '60' SECOND, TICKER VARCHAR(4), TICKER_COUNT INT) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json' Query 2 - % flink.ssql(type = update ) SELECT EVENT_TIME, TICKER, COUNT(TICKER) AS ticker_count FROM DESTINATION_SQL_STREAM GROUP BY TUMBLE(EVENT_TIME, INTERVAL '60' second), EVENT_TIME, TICKER;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( TICKER VARCHAR(4), MIN_PRICE REAL, MAX_PRICE REAL); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, MIN(PRICE), MAX(PRICE) FROM "SOURCE_SQL_STREAM_001" GROUP BY TICKER, STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '60' SECOND);
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( ticker VARCHAR(4), price DOUBLE, event_time VARCHAR(32), processing_time AS PROCTIME()) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT ticker, min(price) AS MIN_PRICE, max(price) AS MAX_PRICE FROM DESTINATION_SQL_STREAM GROUP BY TUMBLE(processing_time, INTERVAL '60' second), ticker;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" ( "TICKER", "TRADETIME", "TICKERCOUNT") SELECT STREAM"TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime", COUNT(*) AS "TickerCount" FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE), STEP("SOURCE_SQL_STREAM_001". "APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE), TICKER_SYMBOL; CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" ( "TICKER", "TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER", "TRADETIME", SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" FROM "CALC_COUNT_SQL_STREAM" WINDOW W1 AS ( PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING ) ;
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER VARCHAR(4), EVENT_TIME TIMESTAMP(3), WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '1' SECONDS ) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - % flink.ssql(type = update ) SELECT * FROM ( SELECT TICKER, COUNT(*) as MOST_FREQUENT_VALUES, ROW_NUMBER() OVER (PARTITION BY TICKER ORDER BY TICKER DESC) AS row_num FROM DESTINATION_SQL_STREAM GROUP BY TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE), TICKER ) WHERE row_num <= 5;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ITEM VARCHAR(1024), ITEM_COUNT DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ITEM, ITEM_COUNT FROM TABLE(TOP_K_ITEMS_TUMBLING(CURSOR( SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"), 'column1', -- name of column in single quotes10, -- number of top items60 -- tumbling window size in seconds));
Managed Service for Apache Flink Studio
%flinkssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( TS TIMESTAMP(3), WATERMARK FOR TS as TS - INTERVAL '5' SECOND, ITEM VARCHAR(1024), PRICE DOUBLE) WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); %flink.ssql(type=update) SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY AGG_WINDOW ORDER BY ITEM_COUNT DESC) as rownum FROM ( select AGG_WINDOW, ITEM, ITEM_COUNT from ( select TUMBLE_ROWTIME(TS, INTERVAL '60' SECONDS) as AGG_WINDOW, ITEM, count(*) as ITEM_COUNT FROM SOURCE_SQL_STREAM_001 GROUP BY TUMBLE(TS, INTERVAL '60' SECONDS), ITEM ) ) ) where rownum <= 3
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( column1 VARCHAR(16), column2 VARCHAR(16), column3 VARCHAR(16), column4 VARCHAR(16), column5 VARCHAR(16), column6 VARCHAR(16), column7 VARCHAR(16)); CREATE OR REPLACE PUMP "myPUMP" ASINSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM l.r.COLUMN1, l.r.COLUMN2, l.r.COLUMN3, l.r.COLUMN4, l.r.COLUMN5, l.r.COLUMN6, l.r.COLUMN7 FROM ( SELECT STREAM W3C_LOG_PARSE("log", 'COMMON') FROM "SOURCE_SQL_STREAM_001" ) AS l(r);
Managed Service for Apache Flink Studio
%flink.ssql(type=update) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); % flink.ssql(type=update) select SPLIT_INDEX(log, ' ', 0), SPLIT_INDEX(log, ' ', 1), SPLIT_INDEX(log, ' ', 2), SPLIT_INDEX(log, ' ', 3), SPLIT_INDEX(log, ' ', 4), SPLIT_INDEX(log, ' ', 5), SPLIT_INDEX(log, ' ', 6) from SOURCE_SQL_STREAM_001;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16), "column_B" VARCHAR(16), "column_C" VARCHAR(16), "COL_1" VARCHAR(16), "COL_2" VARCHAR(16), "COL_3" VARCHAR(16)); CREATE OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM t."Col_A", t."Col_B", t."Col_C", t.r."COL_1", t.r."COL_2", t.r."COL_3" FROM ( SELECT STREAM "Col_A", "Col_B", "Col_C", VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured", 'COL_1 TYPE VARCHAR(16), COL_2 TYPE VARCHAR(16), COL_3 TYPE VARCHAR(16)', ',') AS r FROM "SOURCE_SQL_STREAM_001" ) as t;
Managed Service for Apache Flink Studio
%flink.ssql(type=update) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); % flink.ssql(type=update) select SPLIT_INDEX(log, ' ', 0), SPLIT_INDEX(log, ' ', 1), SPLIT_INDEX(log, ' ', 2), SPLIT_INDEX(log, ' ', 3), SPLIT_INDEX(log, ' ', 4), SPLIT_INDEX(log, ' ', 5) ) from SOURCE_SQL_STREAM_001;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(4), "Company" varchar(20), sector VARCHAR(12), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, "c"."Company", sector, change, priceFROM "SOURCE_SQL_STREAM_001" LEFT JOIN "CompanyName" as "c" ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(12), CHANGE INT, PRICE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - CREATE TABLE CompanyName ( Ticker VARCHAR(4), Company VARCHAR(4)) WITH ( 'connector' = 'filesystem', 'path' = 's3://kda-demo-sample/TickerReference.csv', 'format' = 'csv' ); Query 3 - % flink.ssql(type = update ) SELECT TICKER_SYMBOL, c.Company, SECTOR, CHANGE, PRICE FROM DESTINATION_SQL_STREAM LEFT JOIN CompanyName as c ON DESTINATION_SQL_STREAM.TICKER_SYMBOL = c.Ticker;
SQL-based Kinesis Data Analytics application
SELECT STREAM ticker_symbol, sector, change, ( price / 0 ) as ProblemColumnFROM "SOURCE_SQL_STREAM_001" WHERE sector SIMILAR TO '%TECH%';
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), CHANGE DOUBLE, PRICE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - % flink.pyflink @udf(input_types = [DataTypes.BIGINT()], result_type = DataTypes.BIGINT()) def DivideByZero(price): try: price / 0 except : return - 1 st_env.register_function("DivideByZero", DivideByZero) Query 3 - % flink.ssql(type = update ) SELECT CURRENT_TIMESTAMP AS ERROR_TIME, * FROM ( SELECT TICKER_SYMBOL, SECTOR, CHANGE, DivideByZero(PRICE) as ErrorColumn FROM DESTINATION_SQL_STREAM WHERE SECTOR SIMILAR TO '%TECH%' ) AS ERROR_STREAM;

Jika Anda ingin memindahkan beban kerja yang menggunakan Random Cut Forest dari Kinesis Analytics SQL ke Managed Service untuk Apache Flink, posting blog AWS ini menunjukkan cara menggunakan Managed Service untuk Apache Flink untuk menjalankan algoritma online untuk deteksi anomali. RCF

Lihat Converting- KDASQL -KDAStudio/untuk tutorial lengkap.

Dalam latihan berikut, Anda akan mengubah aliran data Anda untuk menggunakan Amazon Managed Service untuk Apache Flink Studio. Ini juga berarti beralih dari Amazon Kinesis Data Firehose ke Amazon Kinesis Data Streams.

Pertama kita berbagi tipikal KDA - SQL arsitektur, sebelum menunjukkan bagaimana Anda dapat mengganti ini menggunakan Amazon Managed Service untuk Apache Flink Studio dan Amazon Kinesis Data Streams. Atau Anda dapat meluncurkan AWS CloudFormation template di sini:

Amazon Kinesis Data Analytics SQL - dan Amazon Kinesis Data Firehose

Berikut adalah alur arsitektur Amazon Kinesis Data SQL Analytics:

Architectural flow diagram showing data movement through Amazon Kinesis services to Amazon S3.

Kami pertama-tama memeriksa penyiapan Amazon Kinesis Data Analytics SQL - dan Amazon Kinesis Data Firehose lama. Kasus penggunaan adalah pasar perdagangan di mana data perdagangan, termasuk ticker saham dan harga, mengalir dari sumber eksternal ke sistem Amazon Kinesis. Amazon Kinesis Data Analytics SQL untuk menggunakan aliran input untuk mengeksekusi kueri Windowed seperti jendela Tumbling untuk menentukan volume perdagangan danmax,, average dan min harga perdagangan selama satu menit untuk setiap ticker saham. 

Amazon Kinesis Data Analytics SQL - diatur untuk menelan data dari Amazon Kinesis Data Firehose. API Setelah diproses, Amazon Kinesis Data Analytics SQL - mengirimkan data yang diproses ke Amazon Kinesis Data Firehose lainnya, yang kemudian menyimpan output dalam bucket Amazon S3.

Dalam hal ini, Anda menggunakan Amazon Kinesis Data Generator. Amazon Kinesis Data Generator memungkinkan Anda mengirim data pengujian ke Amazon Kinesis Data Streams atau aliran pengiriman Amazon Kinesis Data Firehose. Untuk memulai, ikuti instruksi di sini. Gunakan AWS CloudFormation templat di sini sebagai pengganti yang disediakan dalam instruksi:.

Setelah Anda menjalankan AWS CloudFormation template, bagian output akan memberikan url Amazon Kinesis Data Generator. Masuk ke portal menggunakan id pengguna Cognito dan kata sandi yang Anda atur di sini. Pilih Wilayah dan nama aliran target. Untuk status saat ini, pilih aliran Pengiriman Amazon Kinesis Data Firehose. Untuk status baru, pilih nama Amazon Kinesis Data Firehose Streams. Anda dapat membuat beberapa template, tergantung pada kebutuhan Anda, dan menguji template menggunakan tombol Uji template sebelum mengirimnya ke aliran target.

Berikut ini adalah contoh payload menggunakan Amazon Kinesis Data Generator. Generator data menargetkan input Amazon Kinesis Firehose Streams untuk mengalirkan data secara terus menerus. SDKKlien Amazon Kinesis dapat mengirim data dari produsen lain juga. 

2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763, "AMZN",3352023-02-17 09:28:07.763, "GOOGL",1852023-02-17 09:28:07.763, "AAPL",11162023-02-17 09:28:07.763, "GOOGL",1582

Berikut JSON ini digunakan untuk menghasilkan serangkaian waktu dan tanggal perdagangan acak, ticker saham, dan harga saham:

date.now(YYYY-MM-DD HH:mm:ss.SSS), "random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])", random.number(2000)

Setelah Anda memilih Kirim data, generator akan mulai mengirim data tiruan.

Sistem eksternal mengalirkan data ke Amazon Kinesis Data Firehose. Menggunakan Amazon Kinesis Data Analytics SQL untuk Aplikasi, Anda dapat menganalisis data streaming menggunakan SQL standar. Layanan ini memungkinkan Anda untuk membuat dan menjalankan SQL kode terhadap sumber streaming untuk melakukan analitik deret waktu, memberi umpan dasbor waktu nyata, dan membuat metrik waktu nyata. Amazon Kinesis Data Analytics SQL for Applications dapat membuat aliran tujuan SQL dari kueri pada aliran input dan mengirim aliran tujuan ke Amazon Kinesis Data Firehose lainnya. Tujuan Amazon Kinesis Data Firehose dapat mengirim data analitik ke Amazon S3 sebagai status akhir.

Amazon Kinesis Data Analytics SQL - kode lama didasarkan pada perpanjangan SQL Standar.

Anda menggunakan kueri berikut di Amazon Kinesis Data Analytics SQL -. Anda pertama kali membuat aliran tujuan untuk output kueri. Kemudian, Anda akan menggunakanPUMP, yang merupakan Objek Repositori Amazon Kinesis Data Analytics (ekstensi dari SQL Standar) yang menyediakan fungsionalitas kueri yang terus INSERT INTO stream SELECT ... FROM berjalan, sehingga memungkinkan hasil kueri untuk terus dimasukkan ke dalam aliran bernama. 

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP, INGEST_TIME TIMESTAMP, TICKER VARCHAR(16), VOLUME BIGINT, AVG_PRICE DOUBLE, MIN_PRICE DOUBLE, MAX_PRICE DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME", "ticker", COUNT(*) AS VOLUME, AVG("tradePrice") AS AVG_PRICE, MIN("tradePrice") AS MIN_PRICE, MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001" GROUP BY "ticker", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);

Yang sebelumnya SQL menggunakan dua jendela waktu — tradeTimestamp yang berasal dari muatan aliran masuk dan juga ROWTIME.tradeTimestamp disebut atau. Event Time client-side time Waktu ini sering kali berguna ketika digunakan dalam analitik karena merupakan waktu ketika peristiwa terjadi. Namun, banyak sumber peristiwa, seperti ponsel dan klien web, tidak memiliki jam yang dapat diandalkan, yang dapat menyebabkan waktu yang tidak akurat. Selain itu, masalah konektivitas dapat menyebabkan catatan yang muncul di aliran tidak dalam urutan yang sama dengan peristiwa yang terjadi. 

Aliran dalam aplikasi juga menyertakan kolom khusus yang disebut. ROWTIME Kolom ini menyimpan stempel waktu ketika Amazon Kinesis Data Analytics memasukkan baris di aliran dalam aplikasi pertama. ROWTIME mencerminkan stempel waktu tempat Amazon Kinesis Data Analytics memasukkan catatan ke aliran dalam aplikasi pertama setelah membaca dari sumber streaming. Nilai ROWTIME ini selanjutnya dipertahankan di seluruh aplikasi Anda. 

Ini SQL menentukan jumlah ticker sebagaivolume,, minmax, dan average harga selama interval 60 detik. 

Menggunakan setiap waktu ini dalam kueri jendela yang berbasis waktu memiliki kelebihan dan kekurangan. Pilih satu atau lebih dari waktu-waktu ini, dan strategi untuk menangani kerugian yang relevan berdasarkan skenario kasus penggunaan Anda. 

Strategi dua jendela menggunakan dua waktu berbasis, keduanya ROWTIME dan salah satu waktu lainnya seperti waktu acara.

  • Gunakan ROWTIME sebagai jendela pertama, yang mengontrol seberapa sering kueri memancarkan hasil, seperti yang ditunjukkan dalam contoh berikut. Ini tidak digunakan sebagai waktu logis.

  • Gunakan salah satu waktu lain yang merupakan waktu logis yang ingin Anda kaitkan dengan analitik Anda. Waktu ini mewakili kapan peristiwa terjadi. Pada contoh berikut, tujuan analitik adalah mengelompokkan catatan dan dan kembali menghitung dengan ticker

Layanan Dikelola Amazon untuk Apache Flink Studio 

Dalam arsitektur yang diperbarui, Anda mengganti Amazon Kinesis Data Firehose dengan Amazon Kinesis Data Streams. Amazon Kinesis Data Analytics SQL untuk Aplikasi digantikan oleh Amazon Managed Service untuk Apache Flink Studio. Kode Apache Flink dijalankan secara interaktif dalam Notebook Apache Zeppelin. Amazon Managed Service untuk Apache Flink Studio mengirimkan data perdagangan agregat ke bucket Amazon S3 untuk penyimpanan. Langkah-langkahnya ditunjukkan sebagai berikut:

Berikut adalah Amazon Managed Service untuk aliran arsitektur Apache Flink Studio:

Data flow from Producer through Kinesis streams to Analytics Studio and S3 storage.

Buat Aliran Data Kinesis

Untuk membuat aliran data menggunakan konsol
  1. Masuk ke AWS Management Console dan buka konsol Kinesis di /kinesis. https://console.aws.amazon.com

  2. Di bilah navigasi, perluas pemilih Wilayah dan pilih Wilayah.

  3. Pilih Create data stream (Buat aliran data).

  4. Pada halaman Create Kinesis stream, masukkan nama untuk aliran data Anda dan terima mode kapasitas On-Demand default.

    Dengan mode On-Demand, Anda kemudian dapat memilih Create Kinesis stream untuk membuat aliran data Anda.

    Pada halaman Kinesis streams, Status streaming Anda adalah Membuat saat aliran sedang dibuat. Saat aliran siap digunakan, Status berubah menjadi Aktif.

  5. Pilih nama streaming Anda. Halaman Detail Stream menampilkan ringkasan konfigurasi aliran Anda, bersama dengan informasi pemantauan.

  6. Di Amazon Kinesis Data Generator, ubah aliran Stream/pengiriman ke Amazon Kinesis Data Streams baru: _ _. TRADE SOURCE STREAM

    JSONdan Payload akan sama seperti yang Anda gunakan untuk Amazon Kinesis Data Analytics-. SQL Gunakan Amazon Kinesis Data Generator untuk menghasilkan beberapa contoh data muatan perdagangan dan menargetkan TRADE_ SOURCE _ STREAM Data Stream untuk latihan ini:

    {{date.now(YYYY-MM-DD HH:mm:ss.SSS)}}, "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}", {{random.number(2000)}}
  7. Saat AWS Management Console pergi ke Managed Service for Apache Flink dan kemudian pilih Create Application.

  8. Di panel navigasi kiri, pilih buku catatan Studio lalu pilih Buat buku catatan studio.

  9. Masukkan nama untuk notebook studio.

  10. Di bawah database AWS Glue, sediakan AWS Glue database yang ada yang akan menentukan metadata untuk sumber dan tujuan Anda. Jika Anda tidak memiliki AWS Glue database, pilih Buat dan lakukan hal berikut:

    1. Di konsol AWS Glue, pilih Databases di bawah katalog Data dari menu sebelah kiri.

    2. Pilih Buat database

    3. Di halaman Buat database, masukkan nama untuk database. Di bagian Lokasi - opsional, pilih Jelajahi Amazon S3 dan pilih bucket Amazon S3. Jika Anda belum memiliki bucket Amazon S3 yang sudah disiapkan, Anda dapat melewati langkah ini dan kembali lagi nanti.

    4. (Opsional). Masukkan deskripsi untuk database.

    5. Pilih Buat basis data.

  11. Pilih Buat buku catatan

  12. Setelah buku catatan Anda dibuat, pilih Jalankan.

  13. Setelah notebook berhasil dimulai, luncurkan notebook Zeppelin dengan memilih Buka di Apache Zeppelin.

  14. Pada halaman Notebook Zeppelin, pilih Buat catatan baru dan beri nama. MarketDataFeed

SQLKode Flink dijelaskan berikut, tetapi pertama-tama inilah tampilan layar notebook Zeppelin. Setiap jendela dalam notebook adalah blok kode terpisah, dan mereka dapat dijalankan satu per satu.

Layanan Dikelola Amazon untuk Kode Apache Flink Studio

Amazon Managed Service untuk Apache Flink Studio menggunakan Notebook Zeppelin untuk menjalankan kode. Pemetaan dilakukan untuk contoh ini ke kode ssql berdasarkan Apache Flink 1.13. Kode di Notebook Zeppelin ditampilkan berikut, satu blok pada satu waktu. 

Sebelum menjalankan kode apa pun di Notebook Zeppelin Anda, perintah konfigurasi Flink harus dijalankan. Jika Anda perlu mengubah pengaturan konfigurasi apa pun setelah menjalankan kode (ssql, Python, atau Scala), Anda harus berhenti dan memulai ulang notebook Anda. Dalam contoh ini, Anda harus mengatur checkpointing. Checkpointing diperlukan agar Anda dapat mengalirkan data ke file di Amazon S3. Ini memungkinkan streaming data ke Amazon S3 untuk dibuang ke file. Pernyataan berikut menetapkan interval ke 5000 milidetik. 

%flink.conf execution.checkpointing.interval 5000

%flink.confmenunjukkan bahwa blok ini adalah pernyataan konfigurasi. Untuk informasi selengkapnya tentang konfigurasi Flink termasuk checkpointing, lihat Apache Flink Checkpointing. 

Tabel input untuk sumber Amazon Kinesis Data Streams dibuat dengan kode ssql Flink berikut. Perhatikan bahwa TRADE_TIME bidang menyimpan tanggal/waktu yang dibuat oleh generator data.

%flink.ssql DROP TABLE IF EXISTS TRADE_SOURCE_STREAM; CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, TRADE_TIME TIMESTAMP(3), WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE, STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM', 'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');

Anda dapat melihat aliran masukan dengan pernyataan ini:

%flink.ssql(type=update)-- testing the source stream select * from TRADE_SOURCE_STREAM;

Sebelum mengirim data agregat ke Amazon S3, Anda dapat melihatnya langsung di Amazon Managed Service untuk Apache Flink Studio dengan kueri pilih jendela tumbling. Ini mengumpulkan data perdagangan dalam jendela waktu satu menit. Perhatikan bahwa pernyataan %flink.ssql harus memiliki penunjukan (type=update):

%flink.ssql(type=update) select TUMBLE_ROWTIME(TRADE_TIME, INTERVAL '1' MINUTE) as TRADE_WINDOW, TICKER, COUNT(*) as VOLUME, AVG(PRICE) as AVG_PRICE, MIN(PRICE) as MIN_PRICE, MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;

Anda kemudian dapat membuat tabel untuk tujuan di Amazon S3. Anda harus menggunakan watermark. Tanda air adalah metrik kemajuan yang menunjukkan titik waktu ketika Anda yakin bahwa tidak ada lagi peristiwa yang tertunda yang akan tiba. Alasan tanda air adalah untuk memperhitungkan kedatangan yang terlambat. Interval ‘5’ Second ini memungkinkan perdagangan untuk memasuki Amazon Kinesis Data Stream terlambat 5 detik dan masih disertakan jika mereka memiliki stempel waktu di dalam jendela. Untuk informasi selengkapnya, lihat Menghasilkan Tanda Air.   

%flink.ssql(type=update) DROP TABLE IF EXISTS TRADE_DESTINATION_S3; CREATE TABLE TRADE_DESTINATION_S3 ( TRADE_WINDOW_START TIMESTAMP(3), WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND, TICKER STRING,  VOLUME BIGINT, AVG_PRICE DOUBLE, MIN_PRICE DOUBLE, MAX_PRICE DOUBLE) WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');

Pernyataan ini menyisipkan data ke dalam. TRADE_DESTINATION_S3 TUMPLE_ROWTIMEadalah stempel waktu dari batas atas inklusif dari jendela yang jatuh.

%flink.ssql(type=update) insert into TRADE_DESTINATION_S3 select TUMBLE_ROWTIME(TRADE_TIME, INTERVAL '1' MINUTE), TICKER, COUNT(*) as VOLUME, AVG(PRICE) as AVG_PRICE, MIN(PRICE) as MIN_PRICE, MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;

Biarkan pernyataan Anda berjalan selama 10 hingga 20 menit untuk mengumpulkan beberapa data di Amazon S3. Kemudian batalkan pernyataan Anda.

Ini menutup file di Amazon S3 sehingga dapat dilihat.

Berikut adalah apa isinya terlihat seperti:

Financial data table showing stock prices and volumes for tech companies on March 1, 2023.

Anda dapat menggunakan AWS CloudFormation template untuk membuat infrastruktur.

AWS CloudFormation akan membuat sumber daya berikut di AWS akun Anda:

  • Amazon Kinesis Data Streams

  • Layanan Dikelola Amazon untuk Apache Flink Studio

  • AWS Glue basis data

  • Bucket Amazon S3

  • IAMperan dan kebijakan Amazon Managed Service untuk Apache Flink Studio untuk mengakses sumber daya yang sesuai

Impor notebook dan ubah nama bucket Amazon S3 dengan bucket Amazon S3 baru yang dibuat oleh. AWS CloudFormation

SQL code snippet creating a table with timestamp, ticker, volume, and price fields.
Lihat lebih banyak

Berikut adalah beberapa sumber daya tambahan yang dapat Anda gunakan untuk mempelajari lebih lanjut tentang penggunaan Layanan Terkelola untuk Apache Flink Studio:

Tujuan dari pola ini adalah untuk menunjukkan bagaimana memanfaatkan notebook Kinesis Data Analytics-Studio Zeppelin untuk memproses data UDFs dalam aliran Kinesis. Layanan Terkelola untuk Apache Flink Studio menggunakan Apache Flink untuk menyediakan kemampuan analitis tingkat lanjut, termasuk tepat sekali memproses semantik, jendela waktu acara, ekstensibilitas menggunakan fungsi yang ditentukan pengguna dan integrasi pelanggan, dukungan bahasa imperatif, status aplikasi yang tahan lama, penskalaan horizontal, dukungan untuk beberapa sumber data, integrasi yang dapat diperluas, dan banyak lagi. Hal ini penting untuk memastikan keakuratan, kelengkapan, konsistensi, dan keandalan pemrosesan aliran data dan tidak tersedia dengan Amazon Kinesis Data Analytics. SQL

Dalam aplikasi sampel ini, kami akan mendemonstrasikan cara memanfaatkan UDFs notebook KDA -Studio Zeppelin untuk memproses data dalam aliran Kinesis. Notebook studio untuk Kinesis Data Analytics memungkinkan Anda untuk secara interaktif melakukan kueri aliran data secara real time, dan dengan mudah membangun dan menjalankan aplikasi pemrosesan aliran menggunakan standarSQL, Python, dan Scala. Dengan beberapa klik AWS Management Console, Anda dapat meluncurkan notebook tanpa server untuk menanyakan aliran data dan mendapatkan hasil dalam hitungan detik. Untuk informasi selengkapnya, lihat Menggunakan notebook Studio dengan Kinesis Data Analytics untuk Apache Flink.

Fungsi Lambda yang digunakan untuk pemrosesan pra/pasca data di - aplikasi: KDA SQL

Data flow diagram showing SQL App processing between source and destination streams.

Fungsi yang ditentukan pengguna untuk pemrosesan data pra/pasca menggunakan notebook -Studio Zeppelin KDA

Flink Studio Zeppelin Notebook workflow with in-memory tables and user-defined functions.

Fungsi yang ditentukan pengguna () UDFs

Untuk menggunakan kembali logika bisnis umum menjadi operator, akan berguna untuk mereferensikan fungsi yang ditentukan pengguna untuk mengubah aliran data Anda. Hal ini dapat dilakukan baik dalam Managed Service for Apache Flink Studio notebook, atau sebagai file jar aplikasi yang direferensikan secara eksternal. Memanfaatkan fungsi yang ditentukan pengguna dapat menyederhanakan transformasi atau pengayaan data yang mungkin Anda lakukan melalui streaming data.

Di notebook Anda, Anda akan mereferensikan jar aplikasi Java sederhana yang memiliki fungsi untuk menganonimkan nomor telepon pribadi. Anda juga dapat menulis Python atau Scala UDFs untuk digunakan dalam notebook. Kami memilih jar aplikasi Java untuk menyoroti fungsionalitas mengimpor jar aplikasi ke notebook Pyflink.

Pengaturan lingkungan

Untuk mengikuti panduan ini dan berinteraksi dengan data streaming Anda, Anda akan menggunakan AWS CloudFormation skrip untuk meluncurkan sumber daya berikut:

  • Sumber dan target Kinesis Data Streams

  • Database Glue

  • Peran IAM

  • Layanan Terkelola untuk Aplikasi Apache Flink Studio

  • Fungsi Lambda untuk memulai Managed Service untuk Apache Flink Studio Application

  • Peran Lambda untuk menjalankan fungsi Lambda sebelumnya

  • Sumber daya khusus untuk menjalankan fungsi Lambda

Unduh AWS CloudFormation template di sini.

Buat AWS CloudFormation tumpukan
  1. Pergi ke AWS Management Console dan pilih CloudFormationdi bawah daftar layanan.

  2. Pada CloudFormationhalaman, pilih Stacks dan kemudian pilih Create Stack dengan sumber daya baru (standar).

  3. Pada halaman Buat tumpukan, pilih Unggah File Template, lalu pilih kda-flink-udf.yml yang Anda unduh sebelumnya. Unggah file dan kemudian pilih Berikutnya.

  4. Beri nama template, seperti kinesis-UDF agar mudah diingat, dan perbarui Parameter input seperti input-stream jika Anda menginginkan nama yang berbeda. Pilih Berikutnya.

  5. Pada halaman Configure stack options, tambahkan Tag jika Anda mau, lalu pilih Next.

  6. Pada halaman Tinjauan, centang kotak yang memungkinkan pembuatan IAM sumber daya dan kemudian pilih Kirim.

AWS CloudFormation Tumpukan mungkin membutuhkan waktu 10 hingga 15 menit untuk diluncurkan tergantung pada Wilayah yang Anda luncurkan. Setelah Anda melihat CREATE_COMPLETE status untuk seluruh tumpukan, Anda siap untuk melanjutkan.

Bekerja dengan Layanan Terkelola untuk notebook Apache Flink Studio

Notebook studio untuk Kinesis Data Analytics memungkinkan Anda untuk secara interaktif melakukan kueri aliran data secara real time, dan dengan mudah membangun dan menjalankan aplikasi pemrosesan aliran menggunakan standarSQL, Python, dan Scala. Dengan beberapa klik AWS Management Console, Anda dapat meluncurkan notebook tanpa server untuk menanyakan aliran data dan mendapatkan hasil dalam hitungan detik.

Notebook adalah lingkungan pengembangan berbasis web. Dengan notebook, Anda mendapatkan pengalaman pengembangan interaktif sederhana yang dikombinasikan dengan kemampuan pemrosesan aliran data canggih yang disediakan oleh Apache Flink. Notebook studio menggunakan notebook yang didukung oleh Apache Zeppelin, dan menggunakan Apache Flink sebagai mesin pemrosesan aliran. Notebook studio menggabungkan teknologi ini dengan mulus untuk membuat analitik lanjutan pada aliran data dapat diakses oleh pengembang dari semua keahlian.

Apache Zeppelin memberi notebook Studio Anda dengan rangkaian alat analitik lengkap, termasuk yang berikut:

  • Visualisasi Data

  • Mengekspor data ke file

  • Mengontrol format output untuk analisis yang lebih mudah

Menggunakan notebook
  1. Pergi ke AWS Management Console dan pilih Amazon Kinesis di bawah daftar layanan.

  2. Di halaman navigasi sebelah kiri, pilih aplikasi Analytics, lalu pilih buku catatan Studio.

  3. Verifikasi bahwa KinesisDataAnalyticsStudionotebook sedang berjalan.

  4. Pilih notebook dan kemudian pilih Buka di Apache Zeppelin.

  5. Unduh file Data Producer Zeppelin Notebook yang akan Anda gunakan untuk membaca dan memuat data ke dalam Kinesis Stream.

  6. Impor Data Producer Notebook Zeppelin. Pastikan untuk memodifikasi input STREAM_NAME dan REGION dalam kode notebook. Nama aliran input dapat ditemukan di output AWS CloudFormation tumpukan.

  7. Jalankan notebook Data Producer dengan memilih tombol Jalankan paragraf ini untuk menyisipkan data sampel ke dalam input Kinesis Data Stream.

  8. Saat data sampel dimuat, unduh MaskPhoneNumber-Interactive notebook, yang akan membaca data input, menganonimkan nomor telepon dari aliran input dan menyimpan data anonim ke dalam aliran output.

  9. Impor notebook MaskPhoneNumber-interactive Zeppelin.

  10. Jalankan setiap paragraf di buku catatan.

    1. Dalam paragraf 1, Anda mengimpor Fungsi yang Ditetapkan Pengguna untuk menganonimkan nomor telepon.

      %flink(parallelism=1) import com.mycompany.app.MaskPhoneNumber stenv.registerFunction("MaskPhoneNumber", new MaskPhoneNumber())
    2. Pada paragraf berikutnya, Anda membuat tabel dalam memori untuk membaca data aliran input. Pastikan nama stream dan AWS Region sudah benar.

      %flink.ssql(type=update) DROP TABLE IF EXISTS customer_reviews; CREATE TABLE customer_reviews ( customer_id VARCHAR, product VARCHAR, review VARCHAR, phone VARCHAR ) WITH ( 'connector' = 'kinesis', 'stream' = 'KinesisUDFSampleInputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json');
    3. Periksa apakah data dimuat ke dalam tabel dalam memori.

      %flink.ssql(type=update) select * from customer_reviews
    4. Memanggil fungsi yang ditentukan pengguna untuk menganonimkan nomor telepon.

      %flink.ssql(type=update) select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
    5. Sekarang setelah nomor telepon ditutup, buat tampilan dengan nomor bertopeng.

      %flink.ssql(type=update) DROP VIEW IF EXISTS sentiments_view; CREATE VIEW sentiments_view AS select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
    6. Verifikasi datanya.

      %flink.ssql(type=update) select * from sentiments_view
    7. Buat tabel dalam memori untuk Output Kinesis Stream. Pastikan nama stream dan AWS Region sudah benar.

      %flink.ssql(type=update) DROP TABLE IF EXISTS customer_reviews_stream_table; CREATE TABLE customer_reviews_stream_table ( customer_id VARCHAR, product VARCHAR, review VARCHAR, phoneNumber varchar ) WITH ( 'connector' = 'kinesis', 'stream' = 'KinesisUDFSampleOutputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'TRIM_HORIZON', 'format' = 'json');
    8. Masukkan catatan yang diperbarui di Aliran Kinesis target.

      %flink.ssql(type=update) INSERT INTO customer_reviews_stream_table SELECT customer_id, product, review, phoneNumber FROM sentiments_view
    9. Lihat dan verifikasi data dari Aliran Kinesis target.

      %flink.ssql(type=update) select * from customer_reviews_stream_table

Mempromosikan notebook sebagai aplikasi

Sekarang setelah Anda menguji kode notebook Anda secara interaktif, Anda akan menyebarkan kode sebagai aplikasi streaming dengan status tahan lama. Anda harus terlebih dahulu memodifikasi konfigurasi Aplikasi untuk menentukan lokasi kode Anda di Amazon S3.

  1. Pada AWS Management Console, pilih buku catatan Anda dan di Deploy sebagai konfigurasi aplikasi - opsional, pilih Edit.

  2. Di bawah Tujuan untuk kode di Amazon S3, pilih bucket Amazon S3 yang dibuat oleh skrip.AWS CloudFormation Prosesnya mungkin memakan waktu beberapa menit.

  3. Anda tidak akan dapat mempromosikan catatan apa adanya. Jika Anda mencoba, Anda akan mengalami kesalahan karena Select pernyataan tidak didukung. Untuk mencegah masalah ini, unduh Notebook MaskPhoneNumber-Streaming Zeppelin.

  4. Impor MaskPhoneNumber-streaming Notebook Zeppelin.

  5. Buka catatan dan pilih Tindakan untuk KinesisDataAnalyticsStudio.

  6. Pilih Build MaskPhoneNumber -Streaming dan ekspor ke S3. Pastikan untuk mengganti nama Nama Aplikasi dan tidak menyertakan karakter khusus.

  7. Pilih Bangun dan Ekspor. Ini akan memakan waktu beberapa menit untuk mengatur Aplikasi Streaming.

  8. Setelah build selesai, pilih Deploy using AWS console.

  9. Di halaman berikutnya, tinjau pengaturan dan pastikan untuk memilih IAM peran yang benar. Selanjutnya, pilih Buat aplikasi streaming.

  10. Setelah beberapa menit, Anda akan melihat pesan bahwa aplikasi streaming berhasil dibuat.

Untuk informasi selengkapnya tentang penerapan aplikasi dengan status dan batas tahan lama, lihat Menerapkan sebagai aplikasi dengan status tahan lama.

Pembersihan

Secara opsional, Anda sekarang dapat menghapus tumpukan. AWS CloudFormation Ini akan menghapus semua layanan yang Anda atur sebelumnya.