遷移至 Managed Service for Apache Flink Studio 範例 - Amazon Kinesis Data Analytics for SQL Applications 開發人員指南

在仔細考慮之後,我們決定在兩個步驟中停止 Amazon Kinesis Data Analytics for SQL 應用程式:

1. 從 2025 年 10 月 15 日起,您將無法為SQL應用程式建立新的 Kinesis Data Analytics。

2. 我們將從 2026 年 1 月 27 日起刪除您的應用程式。您將無法啟動或操作SQL應用程式的 Amazon Kinesis Data Analytics。從那時SQL起,Amazon Kinesis Data Analytics 將不再提供 的支援。如需詳細資訊,請參閱Amazon Kinesis Data Analytics for SQL 應用程式終止

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

遷移至 Managed Service for Apache Flink Studio 範例

在仔細考慮之後,我們決定停止 Amazon Kinesis Data Analytics for SQL 應用程式。為了協助您規劃和遷移 Amazon Kinesis Data Analytics for SQL 應用程式,我們將在 15 個月期間逐漸停止該方案。需要注意的兩個重要日期是 2025 年 10 月 15 日和 2026 年 1 月 27 日。

  1. 2025 年 10 月 15 日起,您將無法為SQL應用程式建立新的 Amazon Kinesis Data Analytics。

  2. 我們將從 2026 年 1 月 27 日起刪除您的應用程式。您將無法啟動或操作SQL應用程式的 Amazon Kinesis Data Analytics。從那時起,Amazon Kinesis Data Analytics SQL 應用程式將不再提供支援。如需進一步了解,請參閱 Amazon Kinesis Data Analytics for SQL 應用程式終止

建議您使用適用於 Apache Flink 的 Amazon Managed Service。它結合了易用性與進階分析功能,可讓您在幾分鐘內建置串流處理應用程式。

本節提供程式碼和架構範例,協助您將SQL應用程式工作負載的 Amazon Kinesis Data Analytics 移至 Managed Service for Apache Flink。

如需詳細資訊,請參閱此AWS 部落格文章:從 Amazon Kinesis Data Analytics for SQL Applications 遷移至 Managed Service for Apache Flink Studio。

若要將您的工作負載遷移至 Managed Service for Apache Flink Studio 或 Managed Service for Apache Flink,本節提供可用於常見使用案例的查詢翻譯。

探索這些範例之前,建議您先檢閱使用 Studio 筆記本搭配 Managed Service for Apache Flink。

在 Managed Service for Apache Flink Studio 中重新建立SQL查詢的 Kinesis Data Analytics

下列選項提供通用 SQLKinesis Data Analytics 應用程式查詢的翻譯至 Managed Service for Apache Flink Studio。

SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "IN_APP_STREAM_001" ( ingest_time TIMESTAMP, ticker_symbol VARCHAR(4), sector VARCHAR(16), price REAL, change REAL); CREATE OR REPLACE PUMP "STREAM_PUMP_001" AS INSERT INTO "IN_APP_STREAM_001" SELECT STREAM APPROXIMATE_ARRIVAL_TIME, ticker_symbol, sector, price, change FROM "SOURCE_SQL_STREAM_001"; -- Second in-app stream and pump CREATE OR REPLACE STREAM "IN_APP_STREAM_02" (ingest_time TIMESTAMP, ticker_symbol VARCHAR(4), sector VARCHAR(16), price REAL, change REAL); CREATE OR REPLACE PUMP "STREAM_PUMP_02" AS INSERT INTO "IN_APP_STREAM_02" SELECT STREAM ingest_time, ticker_symbol, sector, price, change FROM "IN_APP_STREAM_001"; -- Destination in-app stream and third pump CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ingest_time TIMESTAMP, ticker_symbol VARCHAR(4), sector VARCHAR(16), price REAL, change REAL); CREATE OR REPLACE PUMP "STREAM_PUMP_03" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ingest_time, ticker_symbol, sector, price, change FROM "IN_APP_STREAM_02";
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001; CREATE TABLE SOURCE_SQL_STREAM_001 (TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), PRICE DOUBLE, CHANGE DOUBLE, APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS IN_APP_STREAM_001; CREATE TABLE IN_APP_STREAM_001 ( INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), PRICE DOUBLE, CHANGE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'IN_APP_STREAM_001', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS IN_APP_STREAM_02; CREATE TABLE IN_APP_STREAM_02 ( INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), PRICE DOUBLE, CHANGE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'IN_APP_STREAM_02', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), PRICE DOUBLE, CHANGE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'DESTINATION_SQL_STREAM', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - % flink.ssql(type = update ) INSERT INTO IN_APP_STREAM_001 SELECT APPROXIMATE_ARRIVAL_TIME AS INGEST_TIME, TICKER_SYMBOL, SECTOR, PRICE, CHANGE FROM SOURCE_SQL_STREAM_001; Query 3 - % flink.ssql(type = update ) INSERT INTO IN_APP_STREAM_02 SELECT INGEST_TIME, TICKER_SYMBOL, SECTOR, PRICE, CHANGE FROM IN_APP_STREAM_001; Query 4 - % flink.ssql(type = update ) INSERT INTO DESTINATION_SQL_STREAM SELECT INGEST_TIME, TICKER_SYMBOL, SECTOR, PRICE, CHANGE FROM IN_APP_STREAM_02;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( TICKER VARCHAR(4), EVENT_TIME TIMESTAMP(3), FIVE_MINUTES_BEFORE TIMESTAMP(3), EVENT_UNIX_TIMESTAMP INT, EVENT_TIMESTAMP_AS_CHAR VARCHAR(50), EVENT_SECOND INT) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE, UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP, DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR, EXTRACT(SECOND FROM EVENT_TIME) AS EVENT_SECOND FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( ticker_symbol VARCHAR(4), sector VARCHAR(12), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, sector, change, price FROM "SOURCE_SQL_STREAM_001" WHERE ( ABS(Change / (Price - Change)) * 100 ) > 1
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(4), CHANGE DOUBLE, PRICE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - % flink.ssql(type = update ) SELECT TICKER_SYMBOL, SECTOR, CHANGE, PRICE FROM DESTINATION_SQL_STREAM WHERE ( ABS(CHANGE / (PRICE - CHANGE)) * 100 ) > 1;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "CHANGE_STREAM"( ticker_symbol VARCHAR(4), sector VARCHAR(12), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "change_pump" AS INSERT INTO "CHANGE_STREAM" SELECT STREAM ticker_symbol, sector, change, price FROM "SOURCE_SQL_STREAM_001" WHERE ( ABS(Change / (Price - Change)) * 100 ) > 1; -- ** Trigger Count and Limit ** -- Counts "triggers" or those values that evaluated true against the previous where clause -- Then provides its own limit on the number of triggers per hour per ticker symbol to what is specified in the WHERE clause CREATE OR REPLACE STREAM TRIGGER_COUNT_STREAM ( ticker_symbol VARCHAR(4), change REAL, trigger_count INTEGER); CREATE OR REPLACE PUMP trigger_count_pump AS INSERT INTO TRIGGER_COUNT_STREAMSELECT STREAM ticker_symbol, change, trigger_count FROM ( SELECT STREAM ticker_symbol, change, COUNT(*) OVER W1 as trigger_countFROM "CHANGE_STREAM" --window to perform aggregations over last minute to keep track of triggers WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING ) ) WHERE trigger_count >= 1;
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(4), CHANGE DOUBLE, PRICE DOUBLE, EVENT_TIME AS PROCTIME()) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS TRIGGER_COUNT_STREAM; CREATE TABLE TRIGGER_COUNT_STREAM ( TICKER_SYMBOL VARCHAR(4), CHANGE DOUBLE, TRIGGER_COUNT INT) PARTITIONED BY (TICKER_SYMBOL); Query 2 - % flink.ssql(type = update ) SELECT TICKER_SYMBOL, SECTOR, CHANGE, PRICE FROM DESTINATION_SQL_STREAM WHERE ( ABS(CHANGE / (PRICE - CHANGE)) * 100 ) > 1; Query 3 - % flink.ssql(type = update ) SELECT * FROM( SELECT TICKER_SYMBOL, CHANGE, COUNT(*) AS TRIGGER_COUNT FROM DESTINATION_SQL_STREAM GROUP BY TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE), TICKER_SYMBOL, CHANGE ) WHERE TRIGGER_COUNT > 1;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"( TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM"( "TICKER", "TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001", "ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime", COUNT(*) AS "TickerCount " FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE), STEP("SOURCE_SQL_STREAM_001"." APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE), TICKER_SYMBOL; CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" ( "TICKER", "TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER", "TRADETIME", SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" FROM "CALC_COUNT_SQL_STREAM" WINDOW W1 AS ( PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING ) ;
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001; CREATE TABLE SOURCE_SQL_STREAM_001 ( TICKER_SYMBOL VARCHAR(4), TRADETIME AS PROCTIME(), APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS CALC_COUNT_SQL_STREAM; CREATE TABLE CALC_COUNT_SQL_STREAM ( TICKER VARCHAR(4), TRADETIME TIMESTAMP(3), WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'CALC_COUNT_SQL_STREAM', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv'); DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER VARCHAR(4), TRADETIME TIMESTAMP(3), WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ('connector' = 'kinesis', 'stream' = 'DESTINATION_SQL_STREAM', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv'); Query 2 - % flink.ssql(type = update ) INSERT INTO CALC_COUNT_SQL_STREAM SELECT TICKER, TO_TIMESTAMP(TRADETIME, 'yyyy-MM-dd HH:mm:ss') AS TRADETIME, TICKERCOUNT FROM ( SELECT TICKER_SYMBOL AS TICKER, DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00') AS TRADETIME, COUNT(*) AS TICKERCOUNT FROM SOURCE_SQL_STREAM_001 GROUP BY TUMBLE(TRADETIME, INTERVAL '1' MINUTE), DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00'), DATE_FORMAT(APPROXIMATE_ARRIVAL_TIME, 'yyyy-MM-dd HH:mm:00'), TICKER_SYMBOL ) ; Query 3 - % flink.ssql(type = update ) SELECT * FROM CALC_COUNT_SQL_STREAM; Query 4 - % flink.ssql(type = update ) INSERT INTO DESTINATION_SQL_STREAM SELECT TICKER, TRADETIME, SUM(TICKERCOUNT) OVER W1 AS TICKERCOUNT FROM CALC_COUNT_SQL_STREAM WINDOW W1 AS ( PARTITION BY TICKER ORDER BY TRADETIME RANGE INTERVAL '10' MINUTE PRECEDING ) ; Query 5 - % flink.ssql(type = update ) SELECT * FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM for cleaned up referrerCREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32)); CREATE OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "APPROXIMATE_ARRIVAL_TIME", SUBSTRING("referrer", 12, ( POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4 ) ) FROM "SOURCE_SQL_STREAM_001";
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( referrer VARCHAR(32), ingest_time AS PROCTIME() ) PARTITIONED BY (referrer) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT ingest_time, substring(referrer, 12, 6) as referrer FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM for cleaned up referrerCREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32)); CREATE OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "APPROXIMATE_ARRIVAL_TIME", REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0) FROM "SOURCE_SQL_STREAM_001";
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( referrer VARCHAR(32), ingest_time AS PROCTIME()) PARTITIONED BY (referrer) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT ingest_time, REGEXP_REPLACE(referrer, 'http', 'https') as referrer FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( sector VARCHAR(24), match1 VARCHAR(24), match2 VARCHAR(24)); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM T.SECTOR, T.REC.COLUMN1, T.REC.COLUMN2 FROM ( SELECT STREAM SECTOR, REGEX_LOG_PARSE(SECTOR, '.*([E].).*([R].*)') AS REC FROM SOURCE_SQL_STREAM_001 ) AS T;
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( CHANGE DOUBLE, PRICE DOUBLE, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16)) PARTITIONED BY (SECTOR) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT * FROM ( SELECT SECTOR, REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 1) AS MATCH1, REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 2) AS MATCH2 FROM DESTINATION_SQL_STREAM ) WHERE MATCH1 IS NOT NULL AND MATCH2 IS NOT NULL;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( TICKER VARCHAR(4), EVENT_TIME TIMESTAMP(3), FIVE_MINUTES_BEFORE TIMESTAMP(3), EVENT_UNIX_TIMESTAMP INT, EVENT_TIMESTAMP_AS_CHAR VARCHAR(50), EVENT_SECOND INT) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE, UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP, DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR, EXTRACT(SECOND FROM EVENT_TIME) AS EVENT_SECOND FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( event_time TIMESTAMP, ticker_symbol VARCHAR(4), ticker_count INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM EVENT_TIME, TICKER, COUNT(TICKER) AS ticker_count FROM "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY TICKER, EVENT_TIME RANGE INTERVAL '1' MINUTE);
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( EVENT_TIME TIMESTAMP(3), WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '60' SECOND, TICKER VARCHAR(4), TICKER_COUNT INT) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json' Query 2 - % flink.ssql(type = update ) SELECT EVENT_TIME, TICKER, COUNT(TICKER) AS ticker_count FROM DESTINATION_SQL_STREAM GROUP BY TUMBLE(EVENT_TIME, INTERVAL '60' second), EVENT_TIME, TICKER;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( TICKER VARCHAR(4), MIN_PRICE REAL, MAX_PRICE REAL); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, MIN(PRICE), MAX(PRICE) FROM "SOURCE_SQL_STREAM_001" GROUP BY TICKER, STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '60' SECOND);
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( ticker VARCHAR(4), price DOUBLE, event_time VARCHAR(32), processing_time AS PROCTIME()) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT ticker, min(price) AS MIN_PRICE, max(price) AS MAX_PRICE FROM DESTINATION_SQL_STREAM GROUP BY TUMBLE(processing_time, INTERVAL '60' second), ticker;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" ( "TICKER", "TRADETIME", "TICKERCOUNT") SELECT STREAM"TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime", COUNT(*) AS "TickerCount" FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE), STEP("SOURCE_SQL_STREAM_001". "APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE), TICKER_SYMBOL; CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" ( "TICKER", "TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER", "TRADETIME", SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" FROM "CALC_COUNT_SQL_STREAM" WINDOW W1 AS ( PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING ) ;
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER VARCHAR(4), EVENT_TIME TIMESTAMP(3), WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '1' SECONDS ) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - % flink.ssql(type = update ) SELECT * FROM ( SELECT TICKER, COUNT(*) as MOST_FREQUENT_VALUES, ROW_NUMBER() OVER (PARTITION BY TICKER ORDER BY TICKER DESC) AS row_num FROM DESTINATION_SQL_STREAM GROUP BY TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE), TICKER ) WHERE row_num <= 5;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ITEM VARCHAR(1024), ITEM_COUNT DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ITEM, ITEM_COUNT FROM TABLE(TOP_K_ITEMS_TUMBLING(CURSOR( SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"), 'column1', -- name of column in single quotes10, -- number of top items60 -- tumbling window size in seconds));
Managed Service for Apache Flink Studio
%flinkssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( TS TIMESTAMP(3), WATERMARK FOR TS as TS - INTERVAL '5' SECOND, ITEM VARCHAR(1024), PRICE DOUBLE) WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); %flink.ssql(type=update) SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY AGG_WINDOW ORDER BY ITEM_COUNT DESC) as rownum FROM ( select AGG_WINDOW, ITEM, ITEM_COUNT from ( select TUMBLE_ROWTIME(TS, INTERVAL '60' SECONDS) as AGG_WINDOW, ITEM, count(*) as ITEM_COUNT FROM SOURCE_SQL_STREAM_001 GROUP BY TUMBLE(TS, INTERVAL '60' SECONDS), ITEM ) ) ) where rownum <= 3
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( column1 VARCHAR(16), column2 VARCHAR(16), column3 VARCHAR(16), column4 VARCHAR(16), column5 VARCHAR(16), column6 VARCHAR(16), column7 VARCHAR(16)); CREATE OR REPLACE PUMP "myPUMP" ASINSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM l.r.COLUMN1, l.r.COLUMN2, l.r.COLUMN3, l.r.COLUMN4, l.r.COLUMN5, l.r.COLUMN6, l.r.COLUMN7 FROM ( SELECT STREAM W3C_LOG_PARSE("log", 'COMMON') FROM "SOURCE_SQL_STREAM_001" ) AS l(r);
Managed Service for Apache Flink Studio
%flink.ssql(type=update) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); % flink.ssql(type=update) select SPLIT_INDEX(log, ' ', 0), SPLIT_INDEX(log, ' ', 1), SPLIT_INDEX(log, ' ', 2), SPLIT_INDEX(log, ' ', 3), SPLIT_INDEX(log, ' ', 4), SPLIT_INDEX(log, ' ', 5), SPLIT_INDEX(log, ' ', 6) from SOURCE_SQL_STREAM_001;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16), "column_B" VARCHAR(16), "column_C" VARCHAR(16), "COL_1" VARCHAR(16), "COL_2" VARCHAR(16), "COL_3" VARCHAR(16)); CREATE OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM t."Col_A", t."Col_B", t."Col_C", t.r."COL_1", t.r."COL_2", t.r."COL_3" FROM ( SELECT STREAM "Col_A", "Col_B", "Col_C", VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured", 'COL_1 TYPE VARCHAR(16), COL_2 TYPE VARCHAR(16), COL_3 TYPE VARCHAR(16)', ',') AS r FROM "SOURCE_SQL_STREAM_001" ) as t;
Managed Service for Apache Flink Studio
%flink.ssql(type=update) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); % flink.ssql(type=update) select SPLIT_INDEX(log, ' ', 0), SPLIT_INDEX(log, ' ', 1), SPLIT_INDEX(log, ' ', 2), SPLIT_INDEX(log, ' ', 3), SPLIT_INDEX(log, ' ', 4), SPLIT_INDEX(log, ' ', 5) ) from SOURCE_SQL_STREAM_001;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(4), "Company" varchar(20), sector VARCHAR(12), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, "c"."Company", sector, change, priceFROM "SOURCE_SQL_STREAM_001" LEFT JOIN "CompanyName" as "c" ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(12), CHANGE INT, PRICE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - CREATE TABLE CompanyName ( Ticker VARCHAR(4), Company VARCHAR(4)) WITH ( 'connector' = 'filesystem', 'path' = 's3://kda-demo-sample/TickerReference.csv', 'format' = 'csv' ); Query 3 - % flink.ssql(type = update ) SELECT TICKER_SYMBOL, c.Company, SECTOR, CHANGE, PRICE FROM DESTINATION_SQL_STREAM LEFT JOIN CompanyName as c ON DESTINATION_SQL_STREAM.TICKER_SYMBOL = c.Ticker;
SQL-based Kinesis Data Analytics application
SELECT STREAM ticker_symbol, sector, change, ( price / 0 ) as ProblemColumnFROM "SOURCE_SQL_STREAM_001" WHERE sector SIMILAR TO '%TECH%';
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), CHANGE DOUBLE, PRICE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - % flink.pyflink @udf(input_types = [DataTypes.BIGINT()], result_type = DataTypes.BIGINT()) def DivideByZero(price): try: price / 0 except : return - 1 st_env.register_function("DivideByZero", DivideByZero) Query 3 - % flink.ssql(type = update ) SELECT CURRENT_TIMESTAMP AS ERROR_TIME, * FROM ( SELECT TICKER_SYMBOL, SECTOR, CHANGE, DivideByZero(PRICE) as ErrorColumn FROM DESTINATION_SQL_STREAM WHERE SECTOR SIMILAR TO '%TECH%' ) AS ERROR_STREAM;

如果您想要將使用 Random Cut Forest 的工作負載從 Kinesis Analytics for SQL 移至 Managed Service for Apache Flink,此AWS 部落格文章示範如何使用 Managed Service for Apache Flink 執行線上RCF演算法進行異常偵測。

如需完整教學課程,請參閱 Converting-KDASQL-KDAStudio/

在下列練習中,您將變更資料流程來使用 Amazon Managed Service for Apache Flink Studio。這也意味著從 Amazon Kinesis Data Firehose 切換到 Amazon Kinesis Data Streams。

首先,我們會共用一個典型的 KDA-SQL 架構,然後再展示如何使用 Amazon Managed Service for Apache Flink Studio 和 Amazon Kinesis Data Streams 取代它。或者,您可以在此處啟動 AWS CloudFormation 範本: https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml

Amazon Kinesis Data Analytics SQL和 Amazon Kinesis Data Firehose

以下是 Amazon Kinesis Data Analytics SQL 架構流程:

Architectural flow diagram showing data movement through Amazon Kinesis services to Amazon S3.

我們首先會檢查舊版 Amazon Kinesis Data Analytics SQL和 Amazon Kinesis Data Firehose 的設定。此使用案例是交易市場,其中包括股票代號和價格在內的交易資料會從外部來源串流至 Amazon Kinesis 系統。適用於 的 Amazon Kinesis Data Analytics SQL使用輸入串流來執行如 Tumbling 時段之類的視窗查詢min,以判斷每個股票代號在一分鐘時段內的交易量和 max、 和 average交易價格。 

Amazon Kinesis Data Analytics-SQL 設定為從 Amazon Kinesis Data Firehose 擷取資料API。處理後,Amazon Kinesis Data Analytics-SQL 會將處理的資料傳送至另一個 Amazon Kinesis Data Firehose ,然後將輸出儲存在 Amazon S3 儲存貯體中。

在這種情況下,您可以使用 Amazon Kinesis 資料產生器。Amazon Kinesis 資料產生器可讓您將測試資料傳送到 Amazon Kinesis Data Streams 或 Amazon Kinesis Data Firehose 交付串流。若要開始使用,請依照此處的指示進行。使用此處的 AWS CloudFormation 範本取代指示中提供的範本:

執行 AWS CloudFormation 範本後,輸出區段會提供 Amazon Kinesis Data Generator URL。使用您在此處設定的 Cognito 使用者 ID 和密碼登入入口網站。選取地區和目標串流名稱。針對目前的狀態,選擇 Amazon Kinesis Data Firehose 交付串流。針對新狀態,選擇 Amazon Kinesis Data Firehose 串流名稱。您可以根據需求建立多個範本,並在傳送範本至目標串流前使用測試範本按鈕來測試範本。

以下是使用 Amazon Kinesis 資料產生器的範例承載。資料產生器之目標為 Amazon Kinesis Firehose 的輸入串流,以持續串流資料。Amazon Kinesis SDK用戶端也可以從其他生產者傳送資料。 

2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763, "AMZN",3352023-02-17 09:28:07.763, "GOOGL",1852023-02-17 09:28:07.763, "AAPL",11162023-02-17 09:28:07.763, "GOOGL",1582

以下內容JSON用於產生一系列的隨機交易時間和日期、股票代號和股票價格:

date.now(YYYY-MM-DD HH:mm:ss.SSS), "random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])", random.number(2000)

選擇傳送資料後,生成器將開始傳送模擬資料。

外部系統會將資料串流到 Amazon Kinesis Data Firehose。使用 Amazon Kinesis Data Analytics for SQL Applications,您可以使用標準 分析串流資料SQL。此服務可讓您針對串流來源編寫和執行SQL程式碼,以執行時間序列分析、提供即時儀表板,以及建立即時指標。Amazon Kinesis Data Analytics for SQL Applications 可以從輸入串流上的SQL查詢建立目的地串流,並將目的地串流傳送至另一個 Amazon Kinesis Data Firehose 目的地 Amazon Kinesis Data Firehose 可以將分析資料傳送到 Amazon S3 做為最終狀態。

Amazon Kinesis Data Analytics -SQL 舊版程式碼是以 SQL 標準延伸模組為基礎。

您可以在 Amazon Kinesis Data Analytics- 中使用下列查詢SQL。首先建立查詢輸出的目標串流。然後,您會使用 PUMP,這是 Amazon Kinesis Data Analytics 儲存庫物件 (SQL標準延伸),可提供持續執行的INSERT INTO stream SELECT ... FROM查詢功能,因此可讓查詢的結果持續輸入具名串流。 

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP, INGEST_TIME TIMESTAMP, TICKER VARCHAR(16), VOLUME BIGINT, AVG_PRICE DOUBLE, MIN_PRICE DOUBLE, MAX_PRICE DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME", "ticker", COUNT(*) AS VOLUME, AVG("tradePrice") AS AVG_PRICE, MIN("tradePrice") AS MIN_PRICE, MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001" GROUP BY "ticker", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);

上述 SQL使用兩個時段 – tradeTimestamp 來自傳入串流承載,也稱為 ROWTIME.tradeTimestamp Event Timeclient-side time。需要在分析中偏好使用此時間,因其為事件發生的時間。不過,許多事件來源 (例如行動電話和 Web 用戶端) 沒有可靠的時鐘,這可能會導致不正確的時間。此外,連線問題可能會導致串流上的記錄顯示順序與事件發生的順序不相同。 

應用程式內串流也包含一個名為 ROWTIME 的特殊資料欄。當 Amazon Kinesis Data Analytics 在第一個應用程式內串流中插入資料列時,會儲存時間戳記。 ROWTIME 指的是 Amazon Kinesis Data Analytics 從串流來源讀取後,將記錄插入第一個應用程式內串流的時間戳記。接著整個應用程式中皆會保留此 ROWTIME 值。 

會SQL決定 60 秒間隔內,代號計數為 volumemaxminaverage 價格。 

在時間類型的視窗查詢中,使用其中的任一個時間都有優點和缺點。選擇其中一個或多個時間,與根據使用案例情境來處理相關缺點的策略。 

雙視窗策略使用兩個時間類型,包含 ROWTIME 與另一個其他時間,如事件時間。

  • ROWTIME 當作第一個視窗,此視窗可控制查詢發出結果的頻率,如下列範例所示。這並非邏輯時間。

  • 把其中一個其他時間當作邏輯時間,即您想要連結到分析的時間 此時間表示事件發生的時間。在下面的例子中,分析目標是按股票代號對記錄進行分組和返回計數。

Amazon Managed Service for Apache Flink Studio 

在更新的架構中,您可以使用 Amazon Kinesis Data Streams 取代 Amazon Kinesis Data Firehose。Amazon Kinesis Data Analytics for SQL Applications 將由 Amazon Managed Service for Apache Flink Studio 取代。Apache Flink 程式碼會在 Apache Zeppelin 筆記本中交互運行。Amazon Managed Service for Apache Flink Studio 會將彙總的交易資料傳送到 Amazon S3 儲存貯體來儲存。步驟如下所示:

此為 Amazon Managed Service for Apache Flink Studio 的架構流程:

Data flow from Producer through Kinesis streams to Analytics Studio and S3 storage.

建立 Kinesis Data Stream

使用主控台建立資料串流
  1. 登入 AWS Management Console 並在 https://console.aws.amazon.com/kinesis 開啟 Kinesis 主控台。

  2. 在導覽列中,展開區域選擇工具,然後選擇一個區域。

  3. 選擇 建立資料串流

  4. 建立 Kinesis 串流頁面上,輸入資料串流的名稱,然後接受預設的隨需容量模式。

    隨需模式下,您可以選擇建立 Kinesis 串流來建立資料串流。

    建立串流時,在 Kinesis 串流頁面上,串流的狀態會是正在建立。當串流就緒可供使用後,其狀態將變成作用中

  5. 選擇串流名稱。串流詳細資訊頁面會顯示串流組態的摘要以及監控資訊。

  6. 在 Amazon Kinesis Data Generator 中,將串流/交付串流變更為新的 Amazon Kinesis Data Streams:TRADE_SOURCE_STREAM

    JSON 和承載將與您用於 Amazon Kinesis Data Analytics- 的相同SQL。使用 Amazon Kinesis Data Generator 產生一些交易承載資料範例,並以此練習的 TRADE_SOURCE_STREAM Data Stream 為目標:

    {{date.now(YYYY-MM-DD HH:mm:ss.SSS)}}, "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}", {{random.number(2000)}}
  7. AWS Management Console 前往 Managed Service for Apache Flink,然後選擇建立應用程式

  8. 在左邊的導覽窗格中,選擇 Studio 筆記本,然後選擇建立 Studio 筆記本

  9. 輸入 Studio 筆記本的名稱。

  10. AWS Glue 資料庫中,提供現有的資料 AWS Glue 資料庫,以定義您的來源和目的地之中繼資料。如果您沒有 AWS Glue 資料庫,請選擇建立並執行下列動作:

    1. 在 AWS Glue 主控台中,從左側選單中選擇資料目錄下的資料庫

    2. 選擇建立資料型錄

    3. 建立資料庫頁面中輸入資料庫的名稱。在位置 - 選用區段中,選擇瀏覽 Amazon S3並選取 Amazon S3 儲存貯體。如果您還沒有設定好 Amazon S3 儲存貯體,您可以跳過此步驟,稍後再回來。

    4. (選用)。輸入資料庫的說明。

    5. 選擇建立資料庫

  11. 選擇建立筆記本

  12. 建立您的筆記本後,選擇執行

  13. 筆記本成功啟動後,選擇在 Apache Zeppelin 中開啟,以啟動 Zeppelin 筆記本。

  14. 在 Zeppelin Notebook 頁面上,選擇建立新備註並命名 MarketDataFeed

Flink SQL程式碼說明如下,但首先這是 Zeppelin 筆記本畫面的外觀。筆記本中的每個視窗都是單獨的程式碼區塊,一次可運行一個。

Amazon Managed Service for Apache Flink Studio 程式碼

Amazon Managed Service for Apache Flink Studio 使用 Zeppelin 筆記本來運行程式碼。此範例以 Apache Flink 1.13 為基礎映射到 ssql 程式碼。Zeppelin Notebook 中的程式碼如下所示,一次一個區塊。 

在您的 Zeppelin 筆記本運行任何程式碼前,必須運行 Flink 組態命令。如果您在執行程式碼 (ssql、Python 或 Scala) 後需要變更任何組態設定,則必須停止並重新啟動筆記本。在此範例中,您必須設定檢查點。需要檢查點,才能將資料串流到 Amazon S3 中的檔案。這可將串流至 Amazon S3 的資料排清到檔案中。下列陳述式會將間隔設定為 5000 毫秒。 

%flink.conf execution.checkpointing.interval 5000

%flink.conf 表示此區塊為組態陳述式。如需包括檢查點在內的 Flink 組態詳細資訊,請參閱 Apache Flink Checkpointing 。 

使用下列 Flink ssql 程式碼建立來源 Amazon Kinesis Data Streams 的輸入表。請注意,TRADE_TIME 字段會儲存由資料生成器創建的日期/時間。

%flink.ssql DROP TABLE IF EXISTS TRADE_SOURCE_STREAM; CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, TRADE_TIME TIMESTAMP(3), WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE, STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM', 'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');

您可以使用以下陳述式查看輸入串流:

%flink.ssql(type=update)-- testing the source stream select * from TRADE_SOURCE_STREAM;

在彙總資料傳送到 Amazon S3 之前,您可以用翻轉視窗選擇查詢在 Amazon Managed Service for Apache Flink 中直接檢視該資料。這會在一分鐘的時段內彙總交易資料。請注意,%flink.ssql 陳述式必須具有 (類型 = 更新) 指定:

%flink.ssql(type=update) select TUMBLE_ROWTIME(TRADE_TIME, INTERVAL '1' MINUTE) as TRADE_WINDOW, TICKER, COUNT(*) as VOLUME, AVG(PRICE) as AVG_PRICE, MIN(PRICE) as MIN_PRICE, MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;

然後,您可以在 Amazon S3 中建立目的地路由表。您必須使用浮水印。浮水印是一種進度指標,指出您確信不會再有延遲事件的時間點。浮水印是為了因應遲到的情形。間隔 ‘5’ Second 允許交易延遲 5 秒進入 Amazon Kinesis Data Stream,如果在視窗內有時間戳記,則仍會包含在內。如需詳細資訊,請參閱產生 Watermarks。  

%flink.ssql(type=update) DROP TABLE IF EXISTS TRADE_DESTINATION_S3; CREATE TABLE TRADE_DESTINATION_S3 ( TRADE_WINDOW_START TIMESTAMP(3), WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND, TICKER STRING,  VOLUME BIGINT, AVG_PRICE DOUBLE, MIN_PRICE DOUBLE, MAX_PRICE DOUBLE) WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');

此陳述式會將資料插入到 TRADE_DESTINATION_S3TUMPLE_ROWTIME 是翻轉視窗包容性上界的時間戳記。

%flink.ssql(type=update) insert into TRADE_DESTINATION_S3 select TUMBLE_ROWTIME(TRADE_TIME, INTERVAL '1' MINUTE), TICKER, COUNT(*) as VOLUME, AVG(PRICE) as AVG_PRICE, MIN(PRICE) as MIN_PRICE, MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;

讓陳述式執行 10 到 20 分鐘,以便在 Amazon S3 中累積一些資料。然後中止你的陳述式。

此舉會關閉 Amazon S3 中的檔案,讓其成為可檢視狀態。

以下是內容的樣子:

Financial data table showing stock prices and volumes for tech companies on March 1, 2023.

您可以使用AWS CloudFormation 範本來建立基礎架構。

AWS CloudFormation 會在您的帳戶中建立下列資源 AWS :

  • Amazon Kinesis Data Streams

  • Amazon Managed Service for Apache Flink Studio

  • AWS Glue 資料庫

  • Amazon S3 儲存貯體

  • IAM Amazon Managed Service for Apache Flink Studio 存取適當資源的角色和政策

匯入筆記本,並使用 建立的新 Amazon S3 儲存貯體變更 Amazon S3 儲存貯體名稱 AWS CloudFormation。

SQL code snippet creating a table with timestamp, ticker, volume, and price fields.
查看更多

以下是一些其他資源,您可以用來進一步了解如何使用 Managed Service for Apache Flink Studio:

模式的目的是示範如何在 Kinesis Data Analytics-Studio Zeppelin 筆記本UDFs中利用 來處理 Kinesis 串流中的資料。Managed Service for Apache Flink Studio 使用 Apache Flink 提供進階分析功能,包括處理語意、事件時間窗口、使用使用者定義函數和客戶整合的可擴展性、必要語言支援、持久的應用程式狀態、水平擴展、支援多個資料來源、可擴展整合等。這些對於確保資料串流處理的準確性、完整性、一致性和可靠性至關重要,Amazon Kinesis Data Analytics for 不提供這些功能SQL。

在此範例應用程式中,我們將示範如何在 KDA-Studio Zeppelin 筆記本UDFs中利用 來處理 Kinesis 串流中的資料。Kinesis Data Analytics 的 Studio 筆記本可讓您即時以互動方式查詢資料串流,並使用標準 SQL、Python 和 Scala 輕鬆建置和執行串流處理應用程式。只要在 中按幾下 AWS Management Console,您就可以啟動無伺服器筆記本來查詢資料串流,並在幾秒鐘內取得結果。如需詳細資訊,請參閱搭配 Kinesis Data Analytics for Apache Flink 使用 Studio 筆記本

Lambda SQL函數用於在 - KDA應用程式中處理資料之前/之後:

Data flow diagram showing SQL App processing between source and destination streams.

使用 KDA-Studio Zeppelin 筆記本進行資料處理前/後的使用者定義函數

Flink Studio Zeppelin Notebook workflow with in-memory tables and user-defined functions.

使用者定義的函數 (UDFs)

若要將通用的商業邏輯重複使用到運算子中,不妨參考使用者定義函數來轉換資料串流。此舉可在 Managed Service for Apache Flink Studio 筆記本中完成,也可以將其當作外部引用的應用程式 JAR 文件。利用使用者定義的函數可以簡化轉換或資料擴充作業,這些作業可能會在串流資料上執行。

在筆記本中,您要引用一個簡單的 Java 應用程式 JAR,其具有匿名個人電話號碼的功能。您也可以編寫 Python 或 ScalaUDFs,以便在筆記本中使用。我們選擇了一個 Java 應用程式 JAR 來強調將應用程式 JAR 導入 Pyflink 筆記本的功能。

環境設定

若要遵循本指南並與串流資料互動,您將使用 AWS CloudFormation 指令碼來啟動下列資源:

  • Kinesis Data Streams 做為來源與目標

  • Glue 資料庫

  • IAM 角色

  • Managed Service for Apache Flink Studio 應用程式

  • 啟動 Managed Service for Apache Flink Studio 應用程式的 Lambda 函數

  • 執行上述 Lambda 函數的 Lambda 角色

  • 叫用 Lambda 函數的自訂資源

在此下載 AWS CloudFormation 範本 https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/kda-udf-sample/cfn/kda-flink-udf.yml

建立 AWS CloudFormation 堆疊
  1. 前往 AWS Management Console ,然後在 服務清單CloudFormation下選擇 。

  2. CloudFormation頁面上,選擇堆疊,然後選擇使用新資源建立堆疊 (標準)

  3. 建立堆疊頁面上,選擇上傳範本檔案,然後選擇您先前下載的 kda-flink-udf.yml 檔案。選擇檔案,然後選擇下一步

  4. 給模板一個名稱便於記憶,如 kinesis-UDF。如想要不同名稱的話可更新輸入參數,如輸入串流。選擇 Next (下一步)

  5. 設定堆疊選項頁面上,視需要新增標籤,然後選擇下一個

  6. 檢閱頁面上,勾選允許建立IAM資源的方塊,然後選擇提交

AWS CloudFormation 堆疊可能需要 10 到 15 分鐘才能啟動,具體取決於您啟動的區域。一旦您看到整個堆疊的 CREATE_COMPLETE 狀態,就可以繼續。

使用 Managed Service for Apache Flink Studio 筆記本

Kinesis Data Analytics 的 Studio 筆記本可讓您即時以互動方式查詢資料串流,並使用標準 SQL、Python 和 Scala 輕鬆建置和執行串流處理應用程式。只要在 中按幾下 AWS Management Console,您就可以啟動無伺服器筆記本來查詢資料串流,並在幾秒鐘內取得結果。

筆記本是基於 Web 的開發環境。使用筆記本,您不僅能獲得簡單的互動式開發體驗,還能使用 Apache Flink 提供的進階資料串流處理功能。Studio 筆記本使用 Apache Zeppelin 支援的筆記本,並使用 Apache Flink 作為串流處理引擎。Studio 筆記本無縫結合了這些技術,讓所有技能背景的開發人員都能存取資料串流的進階分析。

Apache Zeppelin 為您的 Studio 筆記本提供了完整的分析工具套件,包括以下專案:

  • 資料視覺化

  • 將資料匯出到檔案

  • 控制輸出格式以便於分析

使用筆記本
  1. 前往 AWS Management Console ,然後在 服務清單下選擇 Amazon Kinesis

  2. 在左側導覽頁面上,選擇分析應用程式,然後選擇 Studio 筆記本

  3. 確認KinesisDataAnalyticsStudio筆記本正在執行。

  4. 選擇筆記本,然後選擇在 Apache Zeppelin 中打開

  5. 下載資料生產者 Zeppelin 筆記本檔案,您可以使用該檔案讀取資料並將其載入 Kinesis 串流。

  6. 匯入 Data ProducerZeppelin 筆記本。確保您有在筆記本程式碼中修改輸入 STREAM_NAMEREGION。輸入串流名稱可以在 AWS CloudFormation 堆疊輸出中找到。

  7. 選擇執行此段落按鈕,將樣本資料插入輸入 Kinesis 資料串流,以執行資料生產者筆記本。

  8. 當範例資料載入時,下載 MaskPhoneNumber-Interactive notebook ,它會讀取輸入資料、從輸入串流匿名化電話號碼,並將匿名化資料儲存至輸出串流。

  9. 匯入 MaskPhoneNumber-interactiveZeppelin 筆記本。

  10. 執行筆記本中的每個段落。

    1. 在第 1 段中,您可以匯入使用者定義函數來匿名化電話號碼。

      %flink(parallelism=1) import com.mycompany.app.MaskPhoneNumber stenv.registerFunction("MaskPhoneNumber", new MaskPhoneNumber())
    2. 在下一段,請建立記憶體內資料表來讀取輸入串流資料。確定串流名稱和 AWS 區域正確無誤。

      %flink.ssql(type=update) DROP TABLE IF EXISTS customer_reviews; CREATE TABLE customer_reviews ( customer_id VARCHAR, product VARCHAR, review VARCHAR, phone VARCHAR ) WITH ( 'connector' = 'kinesis', 'stream' = 'KinesisUDFSampleInputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json');
    3. 檢查資料是否已載入記憶體內資料表。

      %flink.ssql(type=update) select * from customer_reviews
    4. 調用用戶定義的功能以匿名化電話號碼。

      %flink.ssql(type=update) select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
    5. 現在,電話號碼已被遮罩,請創建一個帶遮罩號碼的檢視。

      %flink.ssql(type=update) DROP VIEW IF EXISTS sentiments_view; CREATE VIEW sentiments_view AS select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
    6. 驗證資料。

      %flink.ssql(type=update) select * from sentiments_view
    7. 為輸出 Kinesis 串流建立記憶體內資料表。確定串流名稱和 AWS 區域正確無誤。

      %flink.ssql(type=update) DROP TABLE IF EXISTS customer_reviews_stream_table; CREATE TABLE customer_reviews_stream_table ( customer_id VARCHAR, product VARCHAR, review VARCHAR, phoneNumber varchar ) WITH ( 'connector' = 'kinesis', 'stream' = 'KinesisUDFSampleOutputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'TRIM_HORIZON', 'format' = 'json');
    8. 在目標 Kinesis 串流中插入更新的記錄。

      %flink.ssql(type=update) INSERT INTO customer_reviews_stream_table SELECT customer_id, product, review, phoneNumber FROM sentiments_view
    9. 檢視和驗證來自目標 Kinesis 串流的資料。

      %flink.ssql(type=update) select * from customer_reviews_stream_table

將筆記本提升為應用程式

現在您已經以互動方式測試筆記本程式碼,請將程式碼部署為具有持久狀態的串流應用程式。您必須先修改應用程式組態,以在 Amazon S3 中指定程式碼的位置。

  1. 在 上 AWS Management Console,選擇您的筆記本,然後在部署為應用程式組態 - 選用 ,選擇編輯

  2. 在 Amazon S3 中的程式碼目的地中,選擇AWS CloudFormation 指令碼建立的 Amazon S3 儲存貯體。該程序需要幾分鐘的時間。

  3. 您無法按原樣提升筆記。嘗試的話會出錯,因為 Select 陳述式不受支援。若要避免此問題,請下載 MaskPhoneNumber-Streaming Zeppelin Notebook

  4. 匯入 MaskPhoneNumber-streaming Zeppelin 筆記本。

  5. 開啟備註,然後選擇 的動作 KinesisDataAnalyticsStudio

  6. 選擇建置 MaskPhoneNumber-串流並匯出至 S3。請務必重新命名應用程式名稱,且不要用特殊字元。

  7. 選擇建置和匯出。需要幾分鐘的時間來設定串流應用程式。

  8. 建置完成後,請選擇使用 AWS 主控台部署

  9. 在下一頁中,檢閱設定,並確保選擇正確的IAM角色。接下來,選擇建立串流應用程式

  10. 幾分鐘後,您會看到串流應用程式已成功建立的訊息。

如需部署具有持久狀態和限制之應用程式的詳細資訊,請參閱部署為具有持久狀態的應用程式

清除

或者,您現在也可解除安裝 AWS CloudFormation 堆疊。此舉將刪除您之前設定的所有服務。