Exemples de migration vers le service géré pour Apache Flink Studio - Guide du développeur d'Amazon Kinesis Data Analytics SQL pour applications

Après mûre réflexion, nous avons décidé de mettre fin à Amazon Kinesis Data Analytics SQL pour les applications en deux étapes :

1. À compter du 15 octobre 2025, vous ne pourrez plus créer de nouveaux Kinesis Data Analytics SQL pour les applications.

2. Nous supprimerons vos candidatures à compter du 27 janvier 2026. Vous ne serez pas en mesure de démarrer ou d'utiliser votre Amazon Kinesis Data Analytics SQL pour les applications. Support ne sera plus disponible pour Amazon Kinesis Data Analytics à partir SQL de cette date. Pour de plus amples informations, veuillez consulter Arrêt d'Amazon Kinesis Data Analytics SQL pour applications.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exemples de migration vers le service géré pour Apache Flink Studio

Après mûre réflexion, nous avons décidé de mettre fin à Amazon Kinesis Data Analytics SQL pour les applications. Pour vous aider à planifier et à migrer hors d'Amazon Kinesis Data Analytics SQL pour les applications, nous supprimerons progressivement l'offre sur une période de 15 mois. Il y a deux dates importantes à noter, le 15 octobre 2025 et le 27 janvier 2026.

  1. À compter du 15 octobre 2025, vous ne pourrez plus créer de nouveaux Amazon Kinesis Data Analytics SQL pour les applications.

  2. Nous supprimerons vos candidatures à compter du 27 janvier 2026. Vous ne serez pas en mesure de démarrer ou d'utiliser votre Amazon Kinesis Data Analytics SQL pour les applications. Support ne sera plus disponible pour Amazon Kinesis Data Analytics SQL pour les applications à partir de cette date. Pour en savoir plus, consultez Arrêt d'Amazon Kinesis Data Analytics SQL pour applications.

Nous vous recommandons d'utiliser Amazon Managed Service pour Apache Flink. Il allie facilité d'utilisation et fonctionnalités analytiques avancées, vous permettant de créer des applications de traitement de flux en quelques minutes.

Cette section fournit des exemples de code et d'architecture pour vous aider à transférer vos charges de travail Amazon Kinesis Data Analytics SQL pour applications vers Managed Service for Apache Flink.

Pour plus d'informations, consultez également ce billet de AWS blog : Migrer d'Amazon Kinesis Data Analytics SQL for Applications vers un service géré pour Apache Flink Studio.

Cette section fournit des conversions de requêtes que vous pouvez utiliser pour les cas d’utilisation courants lors de la migration de vos charges de travail vers le service géré pour Apache Flink Studio ou le service géré pour Apache Flink.

Avant d'explorer ces exemples, nous vous recommandons de consulter d'abord l'article Utilisation d'un bloc-notes Studio avec un service géré pour Apache Flink.

Recréation de Kinesis Data Analytics SQL pour les requêtes dans Managed Service pour Apache Flink Studio

Les options suivantes fournissent des traductions des requêtes courantes SQL de l'application Kinesis Data Analytics vers le service géré pour Apache Flink Studio.

SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "IN_APP_STREAM_001" ( ingest_time TIMESTAMP, ticker_symbol VARCHAR(4), sector VARCHAR(16), price REAL, change REAL); CREATE OR REPLACE PUMP "STREAM_PUMP_001" AS INSERT INTO "IN_APP_STREAM_001" SELECT STREAM APPROXIMATE_ARRIVAL_TIME, ticker_symbol, sector, price, change FROM "SOURCE_SQL_STREAM_001"; -- Second in-app stream and pump CREATE OR REPLACE STREAM "IN_APP_STREAM_02" (ingest_time TIMESTAMP, ticker_symbol VARCHAR(4), sector VARCHAR(16), price REAL, change REAL); CREATE OR REPLACE PUMP "STREAM_PUMP_02" AS INSERT INTO "IN_APP_STREAM_02" SELECT STREAM ingest_time, ticker_symbol, sector, price, change FROM "IN_APP_STREAM_001"; -- Destination in-app stream and third pump CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ingest_time TIMESTAMP, ticker_symbol VARCHAR(4), sector VARCHAR(16), price REAL, change REAL); CREATE OR REPLACE PUMP "STREAM_PUMP_03" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ingest_time, ticker_symbol, sector, price, change FROM "IN_APP_STREAM_02";
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001; CREATE TABLE SOURCE_SQL_STREAM_001 (TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), PRICE DOUBLE, CHANGE DOUBLE, APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS IN_APP_STREAM_001; CREATE TABLE IN_APP_STREAM_001 ( INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), PRICE DOUBLE, CHANGE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'IN_APP_STREAM_001', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS IN_APP_STREAM_02; CREATE TABLE IN_APP_STREAM_02 ( INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), PRICE DOUBLE, CHANGE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'IN_APP_STREAM_02', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), PRICE DOUBLE, CHANGE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'DESTINATION_SQL_STREAM', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - % flink.ssql(type = update ) INSERT INTO IN_APP_STREAM_001 SELECT APPROXIMATE_ARRIVAL_TIME AS INGEST_TIME, TICKER_SYMBOL, SECTOR, PRICE, CHANGE FROM SOURCE_SQL_STREAM_001; Query 3 - % flink.ssql(type = update ) INSERT INTO IN_APP_STREAM_02 SELECT INGEST_TIME, TICKER_SYMBOL, SECTOR, PRICE, CHANGE FROM IN_APP_STREAM_001; Query 4 - % flink.ssql(type = update ) INSERT INTO DESTINATION_SQL_STREAM SELECT INGEST_TIME, TICKER_SYMBOL, SECTOR, PRICE, CHANGE FROM IN_APP_STREAM_02;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( TICKER VARCHAR(4), EVENT_TIME TIMESTAMP(3), FIVE_MINUTES_BEFORE TIMESTAMP(3), EVENT_UNIX_TIMESTAMP INT, EVENT_TIMESTAMP_AS_CHAR VARCHAR(50), EVENT_SECOND INT) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE, UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP, DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR, EXTRACT(SECOND FROM EVENT_TIME) AS EVENT_SECOND FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( ticker_symbol VARCHAR(4), sector VARCHAR(12), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, sector, change, price FROM "SOURCE_SQL_STREAM_001" WHERE ( ABS(Change / (Price - Change)) * 100 ) > 1
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(4), CHANGE DOUBLE, PRICE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - % flink.ssql(type = update ) SELECT TICKER_SYMBOL, SECTOR, CHANGE, PRICE FROM DESTINATION_SQL_STREAM WHERE ( ABS(CHANGE / (PRICE - CHANGE)) * 100 ) > 1;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "CHANGE_STREAM"( ticker_symbol VARCHAR(4), sector VARCHAR(12), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "change_pump" AS INSERT INTO "CHANGE_STREAM" SELECT STREAM ticker_symbol, sector, change, price FROM "SOURCE_SQL_STREAM_001" WHERE ( ABS(Change / (Price - Change)) * 100 ) > 1; -- ** Trigger Count and Limit ** -- Counts "triggers" or those values that evaluated true against the previous where clause -- Then provides its own limit on the number of triggers per hour per ticker symbol to what is specified in the WHERE clause CREATE OR REPLACE STREAM TRIGGER_COUNT_STREAM ( ticker_symbol VARCHAR(4), change REAL, trigger_count INTEGER); CREATE OR REPLACE PUMP trigger_count_pump AS INSERT INTO TRIGGER_COUNT_STREAMSELECT STREAM ticker_symbol, change, trigger_count FROM ( SELECT STREAM ticker_symbol, change, COUNT(*) OVER W1 as trigger_countFROM "CHANGE_STREAM" --window to perform aggregations over last minute to keep track of triggers WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING ) ) WHERE trigger_count >= 1;
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(4), CHANGE DOUBLE, PRICE DOUBLE, EVENT_TIME AS PROCTIME()) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS TRIGGER_COUNT_STREAM; CREATE TABLE TRIGGER_COUNT_STREAM ( TICKER_SYMBOL VARCHAR(4), CHANGE DOUBLE, TRIGGER_COUNT INT) PARTITIONED BY (TICKER_SYMBOL); Query 2 - % flink.ssql(type = update ) SELECT TICKER_SYMBOL, SECTOR, CHANGE, PRICE FROM DESTINATION_SQL_STREAM WHERE ( ABS(CHANGE / (PRICE - CHANGE)) * 100 ) > 1; Query 3 - % flink.ssql(type = update ) SELECT * FROM( SELECT TICKER_SYMBOL, CHANGE, COUNT(*) AS TRIGGER_COUNT FROM DESTINATION_SQL_STREAM GROUP BY TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE), TICKER_SYMBOL, CHANGE ) WHERE TRIGGER_COUNT > 1;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"( TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM"( "TICKER", "TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001", "ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime", COUNT(*) AS "TickerCount " FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE), STEP("SOURCE_SQL_STREAM_001"." APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE), TICKER_SYMBOL; CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" ( "TICKER", "TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER", "TRADETIME", SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" FROM "CALC_COUNT_SQL_STREAM" WINDOW W1 AS ( PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING ) ;
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001; CREATE TABLE SOURCE_SQL_STREAM_001 ( TICKER_SYMBOL VARCHAR(4), TRADETIME AS PROCTIME(), APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); DROP TABLE IF EXISTS CALC_COUNT_SQL_STREAM; CREATE TABLE CALC_COUNT_SQL_STREAM ( TICKER VARCHAR(4), TRADETIME TIMESTAMP(3), WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'CALC_COUNT_SQL_STREAM', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv'); DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER VARCHAR(4), TRADETIME TIMESTAMP(3), WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ('connector' = 'kinesis', 'stream' = 'DESTINATION_SQL_STREAM', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv'); Query 2 - % flink.ssql(type = update ) INSERT INTO CALC_COUNT_SQL_STREAM SELECT TICKER, TO_TIMESTAMP(TRADETIME, 'yyyy-MM-dd HH:mm:ss') AS TRADETIME, TICKERCOUNT FROM ( SELECT TICKER_SYMBOL AS TICKER, DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00') AS TRADETIME, COUNT(*) AS TICKERCOUNT FROM SOURCE_SQL_STREAM_001 GROUP BY TUMBLE(TRADETIME, INTERVAL '1' MINUTE), DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00'), DATE_FORMAT(APPROXIMATE_ARRIVAL_TIME, 'yyyy-MM-dd HH:mm:00'), TICKER_SYMBOL ) ; Query 3 - % flink.ssql(type = update ) SELECT * FROM CALC_COUNT_SQL_STREAM; Query 4 - % flink.ssql(type = update ) INSERT INTO DESTINATION_SQL_STREAM SELECT TICKER, TRADETIME, SUM(TICKERCOUNT) OVER W1 AS TICKERCOUNT FROM CALC_COUNT_SQL_STREAM WINDOW W1 AS ( PARTITION BY TICKER ORDER BY TRADETIME RANGE INTERVAL '10' MINUTE PRECEDING ) ; Query 5 - % flink.ssql(type = update ) SELECT * FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM for cleaned up referrerCREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32)); CREATE OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "APPROXIMATE_ARRIVAL_TIME", SUBSTRING("referrer", 12, ( POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4 ) ) FROM "SOURCE_SQL_STREAM_001";
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( referrer VARCHAR(32), ingest_time AS PROCTIME() ) PARTITIONED BY (referrer) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT ingest_time, substring(referrer, 12, 6) as referrer FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM for cleaned up referrerCREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32)); CREATE OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "APPROXIMATE_ARRIVAL_TIME", REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0) FROM "SOURCE_SQL_STREAM_001";
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( referrer VARCHAR(32), ingest_time AS PROCTIME()) PARTITIONED BY (referrer) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT ingest_time, REGEXP_REPLACE(referrer, 'http', 'https') as referrer FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( sector VARCHAR(24), match1 VARCHAR(24), match2 VARCHAR(24)); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM T.SECTOR, T.REC.COLUMN1, T.REC.COLUMN2 FROM ( SELECT STREAM SECTOR, REGEX_LOG_PARSE(SECTOR, '.*([E].).*([R].*)') AS REC FROM SOURCE_SQL_STREAM_001 ) AS T;
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( CHANGE DOUBLE, PRICE DOUBLE, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16)) PARTITIONED BY (SECTOR) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT * FROM ( SELECT SECTOR, REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 1) AS MATCH1, REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 2) AS MATCH2 FROM DESTINATION_SQL_STREAM ) WHERE MATCH1 IS NOT NULL AND MATCH2 IS NOT NULL;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( TICKER VARCHAR(4), EVENT_TIME TIMESTAMP(3), FIVE_MINUTES_BEFORE TIMESTAMP(3), EVENT_UNIX_TIMESTAMP INT, EVENT_TIMESTAMP_AS_CHAR VARCHAR(50), EVENT_SECOND INT) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE, UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP, DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR, EXTRACT(SECOND FROM EVENT_TIME) AS EVENT_SECOND FROM DESTINATION_SQL_STREAM;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( event_time TIMESTAMP, ticker_symbol VARCHAR(4), ticker_count INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM EVENT_TIME, TICKER, COUNT(TICKER) AS ticker_count FROM "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY TICKER, EVENT_TIME RANGE INTERVAL '1' MINUTE);
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( EVENT_TIME TIMESTAMP(3), WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '60' SECOND, TICKER VARCHAR(4), TICKER_COUNT INT) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json' Query 2 - % flink.ssql(type = update ) SELECT EVENT_TIME, TICKER, COUNT(TICKER) AS ticker_count FROM DESTINATION_SQL_STREAM GROUP BY TUMBLE(EVENT_TIME, INTERVAL '60' second), EVENT_TIME, TICKER;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( TICKER VARCHAR(4), MIN_PRICE REAL, MAX_PRICE REAL); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, MIN(PRICE), MAX(PRICE) FROM "SOURCE_SQL_STREAM_001" GROUP BY TICKER, STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '60' SECOND);
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( ticker VARCHAR(4), price DOUBLE, event_time VARCHAR(32), processing_time AS PROCTIME()) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601') Query 2 - % flink.ssql(type = update ) SELECT ticker, min(price) AS MIN_PRICE, max(price) AS MAX_PRICE FROM DESTINATION_SQL_STREAM GROUP BY TUMBLE(processing_time, INTERVAL '60' second), ticker;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" ( "TICKER", "TRADETIME", "TICKERCOUNT") SELECT STREAM"TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime", COUNT(*) AS "TickerCount" FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE), STEP("SOURCE_SQL_STREAM_001". "APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE), TICKER_SYMBOL; CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" ( "TICKER", "TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER", "TRADETIME", SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" FROM "CALC_COUNT_SQL_STREAM" WINDOW W1 AS ( PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING ) ;
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER VARCHAR(4), EVENT_TIME TIMESTAMP(3), WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '1' SECONDS ) PARTITIONED BY (TICKER) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - % flink.ssql(type = update ) SELECT * FROM ( SELECT TICKER, COUNT(*) as MOST_FREQUENT_VALUES, ROW_NUMBER() OVER (PARTITION BY TICKER ORDER BY TICKER DESC) AS row_num FROM DESTINATION_SQL_STREAM GROUP BY TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE), TICKER ) WHERE row_num <= 5;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ITEM VARCHAR(1024), ITEM_COUNT DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ITEM, ITEM_COUNT FROM TABLE(TOP_K_ITEMS_TUMBLING(CURSOR( SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"), 'column1', -- name of column in single quotes10, -- number of top items60 -- tumbling window size in seconds));
Managed Service for Apache Flink Studio
%flinkssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( TS TIMESTAMP(3), WATERMARK FOR TS as TS - INTERVAL '5' SECOND, ITEM VARCHAR(1024), PRICE DOUBLE) WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); %flink.ssql(type=update) SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY AGG_WINDOW ORDER BY ITEM_COUNT DESC) as rownum FROM ( select AGG_WINDOW, ITEM, ITEM_COUNT from ( select TUMBLE_ROWTIME(TS, INTERVAL '60' SECONDS) as AGG_WINDOW, ITEM, count(*) as ITEM_COUNT FROM SOURCE_SQL_STREAM_001 GROUP BY TUMBLE(TS, INTERVAL '60' SECONDS), ITEM ) ) ) where rownum <= 3
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( column1 VARCHAR(16), column2 VARCHAR(16), column3 VARCHAR(16), column4 VARCHAR(16), column5 VARCHAR(16), column6 VARCHAR(16), column7 VARCHAR(16)); CREATE OR REPLACE PUMP "myPUMP" ASINSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM l.r.COLUMN1, l.r.COLUMN2, l.r.COLUMN3, l.r.COLUMN4, l.r.COLUMN5, l.r.COLUMN6, l.r.COLUMN7 FROM ( SELECT STREAM W3C_LOG_PARSE("log", 'COMMON') FROM "SOURCE_SQL_STREAM_001" ) AS l(r);
Managed Service for Apache Flink Studio
%flink.ssql(type=update) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); % flink.ssql(type=update) select SPLIT_INDEX(log, ' ', 0), SPLIT_INDEX(log, ' ', 1), SPLIT_INDEX(log, ' ', 2), SPLIT_INDEX(log, ' ', 3), SPLIT_INDEX(log, ' ', 4), SPLIT_INDEX(log, ' ', 5), SPLIT_INDEX(log, ' ', 6) from SOURCE_SQL_STREAM_001;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16), "column_B" VARCHAR(16), "column_C" VARCHAR(16), "COL_1" VARCHAR(16), "COL_2" VARCHAR(16), "COL_3" VARCHAR(16)); CREATE OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM t."Col_A", t."Col_B", t."Col_C", t.r."COL_1", t.r."COL_2", t.r."COL_3" FROM ( SELECT STREAM "Col_A", "Col_B", "Col_C", VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured", 'COL_1 TYPE VARCHAR(16), COL_2 TYPE VARCHAR(16), COL_3 TYPE VARCHAR(16)', ',') AS r FROM "SOURCE_SQL_STREAM_001" ) as t;
Managed Service for Apache Flink Studio
%flink.ssql(type=update) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); % flink.ssql(type=update) select SPLIT_INDEX(log, ' ', 0), SPLIT_INDEX(log, ' ', 1), SPLIT_INDEX(log, ' ', 2), SPLIT_INDEX(log, ' ', 3), SPLIT_INDEX(log, ' ', 4), SPLIT_INDEX(log, ' ', 5) ) from SOURCE_SQL_STREAM_001;
SQL-based Kinesis Data Analytics application
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(4), "Company" varchar(20), sector VARCHAR(12), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, "c"."Company", sector, change, priceFROM "SOURCE_SQL_STREAM_001" LEFT JOIN "CompanyName" as "c" ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) CREATE TABLE DESTINATION_SQL_STREAM ( TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(12), CHANGE INT, PRICE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - CREATE TABLE CompanyName ( Ticker VARCHAR(4), Company VARCHAR(4)) WITH ( 'connector' = 'filesystem', 'path' = 's3://kda-demo-sample/TickerReference.csv', 'format' = 'csv' ); Query 3 - % flink.ssql(type = update ) SELECT TICKER_SYMBOL, c.Company, SECTOR, CHANGE, PRICE FROM DESTINATION_SQL_STREAM LEFT JOIN CompanyName as c ON DESTINATION_SQL_STREAM.TICKER_SYMBOL = c.Ticker;
SQL-based Kinesis Data Analytics application
SELECT STREAM ticker_symbol, sector, change, ( price / 0 ) as ProblemColumnFROM "SOURCE_SQL_STREAM_001" WHERE sector SIMILAR TO '%TECH%';
Managed Service for Apache Flink Studio
Query 1 - % flink.ssql(type = update ) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM; CREATE TABLE DESTINATION_SQL_STREAM ( TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), CHANGE DOUBLE, PRICE DOUBLE ) PARTITIONED BY (TICKER_SYMBOL) WITH ( 'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601'); Query 2 - % flink.pyflink @udf(input_types = [DataTypes.BIGINT()], result_type = DataTypes.BIGINT()) def DivideByZero(price): try: price / 0 except : return - 1 st_env.register_function("DivideByZero", DivideByZero) Query 3 - % flink.ssql(type = update ) SELECT CURRENT_TIMESTAMP AS ERROR_TIME, * FROM ( SELECT TICKER_SYMBOL, SECTOR, CHANGE, DivideByZero(PRICE) as ErrorColumn FROM DESTINATION_SQL_STREAM WHERE SECTOR SIMILAR TO '%TECH%' ) AS ERROR_STREAM;

Si vous souhaitez transférer des charges de travail utilisant Random Cut Forest de Kinesis Analytics vers Managed Service SQL for Apache Flink, AWS ce billet de blog explique comment utiliser Managed Service for Apache Flink pour exécuter un algorithme RCF en ligne de détection des anomalies.

Voir Converting- KDASQL -KDAStudio/pour un didacticiel complet.

Dans l’exercice suivant, vous allez modifier votre flux de données afin d’utiliser le service géré Amazon pour Apache Flink Studio. Cela impliquera également de passer d’Amazon Kinesis Data Firehose à Amazon Kinesis Data Streams.

Nous partageons d'abord une SQL architecture typiqueKDA, avant de montrer comment vous pouvez la remplacer à l'aide d'Amazon Managed Service pour Apache Flink Studio et Amazon Kinesis Data Streams. Vous pouvez également lancer le AWS CloudFormation modèle ici :

Amazon Kinesis Data Analytics SQL - et Amazon Kinesis Data Firehose

Voici le flux architectural d'Amazon Kinesis Data SQL Analytics :

Architectural flow diagram showing data movement through Amazon Kinesis services to Amazon S3.

Nous examinons d'abord la configuration d'un ancien Amazon Kinesis Data Analytics SQL et d'Amazon Kinesis Data Firehose. Le cas d’utilisation concerne un marché boursier sur lequel des données commerciales, notamment des données de symbole boursier et de cours des actions, sont transmises depuis des sources externes aux systèmes Amazon Kinesis. Amazon Kinesis Data Analytics SQL for utilise le flux d'entrée pour exécuter des requêtes fenêtrées, telles que la fenêtre Tumblingmax, afin de déterminer le volume des transactions et min le cours des transactions sur une fenêtre d'une minute pour chaque ticker boursier. average 

Amazon Kinesis Data Analytics SQL est configuré pour ingérer les données d'Amazon Kinesis Data Firehose. API Après le traitement, Amazon Kinesis Data Analytics envoie SQL les données traitées à un autre Amazon Kinesis Data Firehose, qui enregistre ensuite le résultat dans un compartiment Amazon S3.

Dans ce cas, vous utilisez Amazon Kinesis Data Generator. Amazon Kinesis Data Generator vous permet d’envoyer des données de test à vos flux de diffusion Amazon Kinesis Data Streams ou Amazon Kinesis Data Firehose. Pour commencer, suivez les instructions ici. Utilisez le AWS CloudFormation modèle ici à la place de celui fourni dans les instructions :.

Une fois le AWS CloudFormation modèle exécuté, la section de sortie fournit l'URL d'Amazon Kinesis Data Generator. Connectez-vous au portail à l’aide de l’identifiant utilisateur et du mot de passe Cognito que vous avez définis ici. Sélectionnez la région et le nom du flux cible. Pour l’état actuel, choisissez les flux de diffusion Amazon Kinesis Data Firehose. Pour l’état actuel, choisissez le nom de flux Amazon Kinesis Data Firehose. Vous pouvez créer plusieurs modèles, en fonction de vos besoins, et tester le modèle à l’aide du bouton Tester le modèle avant de l’envoyer au flux cible.

Vous trouverez ci-dessous un exemple de charge utile utilisant Amazon Kinesis Data Generator. Le générateur de données cible les flux Amazon Kinesis Firehose en entrée pour diffuser les données en continu. Le SDK client Amazon Kinesis peut également envoyer des données provenant d'autres producteurs. 

2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763, "AMZN",3352023-02-17 09:28:07.763, "GOOGL",1852023-02-17 09:28:07.763, "AAPL",11162023-02-17 09:28:07.763, "GOOGL",1582

Ce qui suit JSON est utilisé pour générer une série aléatoire d'heure et de date de transaction, de code boursier et de cours de l'action :

date.now(YYYY-MM-DD HH:mm:ss.SSS), "random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])", random.number(2000)

Une fois que vous avez choisi Envoyer les données, le générateur commence à envoyer des données fictives.

Les systèmes externes transmettent les données à Amazon Kinesis Data Firehose. À l'aide d'Amazon Kinesis Data Analytics SQL for Applications, vous pouvez analyser les données de streaming selon SQL les normes. Le service vous permet de créer et d'exécuter SQL du code sur des sources de streaming pour effectuer des analyses de séries chronologiques, alimenter des tableaux de bord en temps réel et créer des métriques en temps réel. Amazon Kinesis Data Analytics SQL for Applications peut créer un flux de destination SQL à partir de requêtes sur le flux d'entrée et envoyer le flux de destination à un autre Amazon Kinesis Data Firehose. L’Amazon Kinesis Data Firehose de destination peut envoyer les données analytiques à Amazon S3 en dernière étape.

Amazon Kinesis Data Analytics SQL : le code existant est basé sur une extension SQL de Standard.

Vous utilisez la requête suivante dans Amazon Kinesis Data Analytics SQL -. Vous créez d’abord un flux de destination pour la sortie de requête. Ensuite, vous PUMP utiliseriez un objet du référentiel Amazon Kinesis Data Analytics (une extension de SQL la norme) qui fournit une fonctionnalité de requête INSERT INTO stream SELECT ... FROM continue, permettant ainsi de saisir les résultats d'une requête en continu dans un flux nommé. 

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP, INGEST_TIME TIMESTAMP, TICKER VARCHAR(16), VOLUME BIGINT, AVG_PRICE DOUBLE, MIN_PRICE DOUBLE, MAX_PRICE DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME, STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME", "ticker", COUNT(*) AS VOLUME, AVG("tradePrice") AS AVG_PRICE, MIN("tradePrice") AS MIN_PRICE, MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001" GROUP BY "ticker", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);

Ce qui précède SQL utilise deux fenêtres temporelles : tradeTimestamp celle qui provient de la charge utile du flux entrant et ROWTIME.tradeTimestamp est également appelée Event Time ouclient-side time. Il est souvent préférable d'utiliser cette heure dans les analyses car il s'agit du moment où un événement s'est produit. Cependant, de nombreuses sources d'événements, telles que les téléphones mobiles et les clients web, n'ont pas horloges fiables, ce qui peut entraîner des heures inexactes. En outre, des problèmes de connectivité peuvent provoquer le fait que des enregistrements figurant dans un flux ne sont pas dans le même ordre que celui où les événements se sont produits. 

Les flux intégrés à l’application incluent également une colonne spéciale appelée ROWTIME. Celle-ci stocke un horodatage quand Amazon Kinesis Data Analytics insère une ligne dans le premier flux intégré à l’application. ROWTIME reflète l’horodatage du moment où Amazon Kinesis Data Analytics a inséré un enregistrement dans le premier flux intégré à l’application après la lecture de la source de streaming. Cette valeur ROWTIME est ensuite gérée tout au long de votre application. 

Le SQL détermine le nombre de tickers sous forme devolume, minmax, et le average prix sur un intervalle de 60 secondes. 

L'utilisation de chacun de ces types d'heure dans des requêtes à fenêtres temporelles présente des avantages et des inconvénients. Sélectionnez un ou plusieurs de ces types d’heure, et une stratégie pour traiter les inconvénients pertinents en fonction du scénario d’utilisation. 

Une stratégie à deux fenêtres utilise deux types d’heure, ROWTIME et l’un des autres types d’heure, comme l’heure de l’événement.

  • Utilisez ROWTIMEcomme première fenêtre, qui contrôle la fréquence à laquelle la requête émet les résultats, comme illustré dans l'exemple suivant. Cette valeur n'est pas utilisée comme une heure logique.

  • Utilisez l'un des autres types d'heure comme heure logique à associer à vos analyses. Cette heure représente le moment où l'événement s'est produit. Dans l'exemple suivant, l'objectif de l'analyse est de regrouper les enregistrements et de renvoyer un comptage par symbole boursier.

Service géré Amazon pour Apache Flink Studio 

Dans l’architecture mise à jour, vous remplacez Amazon Kinesis Data Firehose par Amazon Kinesis Data Streams. Amazon Kinesis Data Analytics SQL for Applications est remplacé par Amazon Managed Service pour Apache Flink Studio. Le code Apache Flink est exécuté de manière interactive dans un bloc-notes Apache Zeppelin. Le service géré Amazon pour Apache Flink Studio envoie les données commerciales agrégées vers un compartiment Amazon S3 en vue de leur stockage. Les étapes sont indiquées ci-dessous :

Voici le flux architectural du service géré Amazon pour Apache Flink Studio :

Data flow from Producer through Kinesis streams to Analytics Studio and S3 storage.

Créer un flux de données Kinesis

Pour créer un flux de données avec la console
  1. Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.

  2. Dans la barre de navigation, développez le sélecteur de région et choisissez une région.

  3. Choisissez Create data stream (Créer un flux de données).

  4. Sur la page Créer un flux Kinesis, saisissez le nom de votre flux de données, puis acceptez le mode de capacité à la demande par défaut.

    En mode à la demande, vous pouvez ensuite choisir Créer un flux Kinesis pour créer votre flux de données.

    Dans la page Flux Kinesis, la valeur Statut de votre flux est En création lorsque le flux est en cours de création. Lorsque le flux est à prêt à être utilisé, le statut passe à Actif.

  5. Choisissez le nom de votre flux. La page Détails du flux affiche un récapitulatif de la configuration de flux, ainsi que des informations de surveillance.

  6. Dans le générateur de données Amazon Kinesis, remplacez le flux ou le flux de diffusion par le nouveau flux Amazon Kinesis Data Streams : _ _. TRADE SOURCE STREAM

    JSONet la charge utile sera la même que celle que vous avez utilisée pour Amazon Kinesis Data Analytics-. SQL Utilisez le générateur de données Amazon Kinesis pour produire des exemples de données de charge utile de trading et ciblez le flux de STREAM données TRADE_ SOURCE _ pour cet exercice :

    {{date.now(YYYY-MM-DD HH:mm:ss.SSS)}}, "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}", {{random.number(2000)}}
  7. AWS Management Console Accédez à Managed Service for Apache Flink, puis choisissez Create application.

  8. Dans le panneau de navigation à gauche, sélectionnez Blocs-notes Studio, puis Créer un bloc-notes Studio.

  9. Saisissez un nom pour le bloc-notes Studio.

  10. Sous Base de données AWS Glue, indiquez une base de données AWS Glue existante qui définira les métadonnées de vos sources et destinations. Si vous n'avez pas de AWS Glue base de données, choisissez Create et procédez comme suit :

    1. Dans la console AWS Glue, choisissez Databases sous Catalogue de données dans le menu de gauche.

    2. Sélectionnez Créer une base de données.

    3. Sur la page Créer une base de données, saisissez un nom pour la base de données. Dans la section Emplacement - facultatif, choisissez Parcourir Amazon S3 et sélectionnez le compartiment Amazon S3. Si vous n'avez pas de compartiment Amazon S3 déjà configuré, vous pouvez ignorer cette étape et y revenir plus tard.

    4. (Facultatif). Saisissez une description pour la base de données.

    5. Choisissez Créer une base de données.

  11. Choisissez Créer un bloc-notes.

  12. Une fois votre bloc-notes créé, choisissez Exécuter.

  13. Une fois le bloc-notes démarré avec succès, lancez un bloc-notes Zeppelin en choisissant Ouvrir dans Apache Zeppelin.

  14. Sur la page Zeppelin Notebook, choisissez Créer une nouvelle note et nommez-la. MarketDataFeed

Le SQL code Flink est expliqué ci-dessous, mais voici d'abord à quoi ressemble un écran d'ordinateur portable Zeppelin. Chaque fenêtre du bloc-notes est un bloc de code distinct ; elles peuvent être exécutées une par une.

Code de service géré Amazon pour Apache Flink Studio

Le service géré Amazon pour Apache Flink Studio utilise les blocs-notes Zeppelin pour exécuter le code. Pour cet exemple, le mappage est effectué avec du code SSQL basé sur Apache Flink 1.13. Le code du carnet Zeppelin est affiché ci-dessous, bloc par bloc. 

Avant d’exécuter du code dans votre bloc-notes Zeppelin, les commandes de configuration Flink doivent être exécutées. Si vous devez modifier un paramètre de configuration après avoir exécuté du code (ssql, Python ou Scala), vous devez arrêter et redémarrer votre bloc-notes. Dans cet exemple, vous devez définir le point de contrôle. Un point de contrôle est nécessaire pour diffuser des données vers un fichier dans Amazon S3. Cela permet de transférer vers un fichier la diffusion de données vers Amazon S3. L'instruction suivante définit l'intervalle à 5 000 millisecondes. 

%flink.conf execution.checkpointing.interval 5000

%flink.conf indique que ce bloc est constitué d’instructions de configuration. Pour plus d'informations sur la configuration de Flink, y compris le pointage de contrôle, voir Apache Flink Checkpointing. 

La table d'entrée pour la source Amazon Kinesis Data Streams est créée avec le code Flink ssql suivant. Notez que le champ TRADE_TIME enregistre la date et l’heure de création par le générateur de données.

%flink.ssql DROP TABLE IF EXISTS TRADE_SOURCE_STREAM; CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, TRADE_TIME TIMESTAMP(3), WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE, STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM', 'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');

Vous pouvez afficher le flux d’entrée avec cette instruction :

%flink.ssql(type=update)-- testing the source stream select * from TRADE_SOURCE_STREAM;

Avant d’envoyer les données agrégées à Amazon S3, vous pouvez les afficher directement dans le service géré Amazon pour Apache Flink Studio à l’aide d’une requête de sélection à fenêtres bascules. Cela agrège les données de trading dans une fenêtre de temps d'une minute. Notez que l’instruction %flink.ssql doit avoir une désignation (type=update) :

%flink.ssql(type=update) select TUMBLE_ROWTIME(TRADE_TIME, INTERVAL '1' MINUTE) as TRADE_WINDOW, TICKER, COUNT(*) as VOLUME, AVG(PRICE) as AVG_PRICE, MIN(PRICE) as MIN_PRICE, MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;

Vous pouvez ensuite créer une table de destination dans Amazon S3. Vous devez utiliser un filigrane. Un filigrane est une mesure de progression qui indique un instant donné à partir duquel vous êtes certain qu’aucun autre événement retardé ne se produira. L’objectif du filigrane est de prendre en compte les arrivées tardives. L’intervalle ‘5’ Second permet aux transactions d’entrer dans le flux Amazon Kinesis Data Streams avec 5 secondes de retard et d’être incluses si elles sont horodatées dans cette fenêtre. Pour plus d'informations, consultez la section Génération de filigranes.   

%flink.ssql(type=update) DROP TABLE IF EXISTS TRADE_DESTINATION_S3; CREATE TABLE TRADE_DESTINATION_S3 ( TRADE_WINDOW_START TIMESTAMP(3), WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND, TICKER STRING,  VOLUME BIGINT, AVG_PRICE DOUBLE, MIN_PRICE DOUBLE, MAX_PRICE DOUBLE) WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');

Cette instruction insère les données dans le code TRADE_DESTINATION_S3. TUMPLE_ROWTIME correspond à l’horodatage de la limite supérieure incluse de la fenêtre bascule.

%flink.ssql(type=update) insert into TRADE_DESTINATION_S3 select TUMBLE_ROWTIME(TRADE_TIME, INTERVAL '1' MINUTE), TICKER, COUNT(*) as VOLUME, AVG(PRICE) as AVG_PRICE, MIN(PRICE) as MIN_PRICE, MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;

Exécutez votre instruction pendant 10 à 20 minutes afin d’accumuler des données dans Amazon S3. Puis annulez l’instruction.

Cela ferme le fichier dans Amazon S3 afin qu’il soit consultable.

Voici à quoi ressemble le contenu :

Financial data table showing stock prices and volumes for tech companies on March 1, 2023.

Vous pouvez utiliser le modèle AWS CloudFormation pour créer l’infrastructure.

AWS CloudFormation créera les ressources suivantes dans votre AWS compte :

  • Amazon Kinesis Data Streams

  • Service géré Amazon pour Apache Flink Studio

  • AWS Glue base de données

  • Compartiment Amazon S3

  • IAMrôles et politiques permettant à Amazon Managed Service pour Apache Flink Studio d'accéder aux ressources appropriées

Importez le bloc-notes et remplacez le nom du compartiment Amazon S3 par le nouveau compartiment Amazon S3 créé par AWS CloudFormation.

SQL code snippet creating a table with timestamp, ticker, volume, and price fields.
Voir plus

Voici quelques ressources supplémentaires que vous pouvez utiliser pour en savoir plus sur l'utilisation du service géré pour Apache Flink Studio :

L'objectif de ce modèle est de montrer comment tirer parti UDFs des blocs-notes Zeppelin de Kinesis Data Analytics-Studio pour traiter les données du flux Kinesis. Le service géré pour Apache Flink Studio utilise Apache Flink pour fournir des fonctionnalités analytiques avancées, notamment une sémantique de traitement unique, des fenêtres temporelles, une extensibilité grâce à des fonctions définies par l'utilisateur et des intégrations client, une prise en charge linguistique impérative, un état durable des applications, une mise à l'échelle horizontale, la prise en charge de plusieurs sources de données, des intégrations extensibles, etc. Ils sont essentiels pour garantir l'exactitude, l'exhaustivité, la cohérence et la fiabilité du traitement des flux de données et ne sont pas disponibles avec Amazon Kinesis Data Analytics SQL pour.

Dans cet exemple d'application, nous allons montrer comment tirer parti UDFs du bloc-notes KDA -Studio Zeppelin pour traiter les données du flux Kinesis. Les blocs-notes Studio pour Kinesis Data Analytics vous permettent d'interroger des flux de données de manière interactive en temps réel, et de créer et d'exécuter facilement des applications de traitement de flux à l'aide de standardsSQL, Python et Scala. En quelques clics AWS Management Console, vous pouvez lancer un bloc-notes sans serveur pour interroger des flux de données et obtenir des résultats en quelques secondes. Pour plus d’informations, consultez Utilisation d’un bloc-notes Studio avec Kinesis Data Analytics pour Apache Flink.

Fonctions Lambda utilisées pour le prétraitement et le post-traitement des données dans les applications : KDA SQL

Data flow diagram showing SQL App processing between source and destination streams.

Fonctions définies par l'utilisateur pour le pré/post-traitement des données à l'aide KDA des ordinateurs portables -Studio Zeppelin

Flink Studio Zeppelin Notebook workflow with in-memory tables and user-defined functions.

Fonctions définies par l'utilisateur () UDFs

Pour réutiliser une logique métier courante dans un opérateur, il peut être utile de référencer une fonction définie par l’utilisateur afin de transformer votre flux de données. Vous pouvez le faire soit dans le bloc-notes du service géré pour Apache Flink Studio, soit sous forme de fichier jar d’application référencé en externe. L’utilisation de fonctions définies par l’utilisateur peut simplifier les transformations ou les enrichissements de données que vous pouvez effectuer sur des données de streaming.

Dans votre bloc-notes, vous ferez référence à un simple fichier jar d’application Java doté de fonctionnalités permettant d’anonymiser les numéros de téléphone personnels. Vous pouvez également écrire du Python ou du Scala UDFs à utiliser dans le bloc-notes. Nous avons choisi un fichier jar d’application Java pour mettre en évidence la fonctionnalité d’importation d’un fichier jar d’application dans un bloc-notes Pyflink.

Configuration de l’environnement

Pour suivre ce guide et interagir avec vos données de streaming, vous allez utiliser un AWS CloudFormation script pour lancer les ressources suivantes :

  • Flux de données sources et cibles Kinesis

  • Base de données Glue

  • Rôle IAM

  • Service géré pour l’application Apache Flink Studio

  • Fonction Lambda pour démarrer le service géré pour l’application Apache Flink Studio

  • Rôle Lambda pour exécuter la fonction Lambda précédente

  • Ressource personnalisée pour appeler la fonction Lambda

Téléchargez le AWS CloudFormation modèle ici.

Créez la AWS CloudFormation pile
  1. Accédez à AWS Management Console et choisissez CloudFormationdans la liste des services.

  2. Sur la CloudFormationpage, choisissez Stacks, puis Create Stack with new resources (standard).

  3. Sur la page Créer une pile, choisissez Télécharger un fichier modèle, puis choisissez le fichier kda-flink-udf.yml que vous avez téléchargé précédemment. Chargez le fichier, puis sélectionnez Suivant.

  4. Donnez un nom au modèle, par exemple pour kinesis-UDF afin qu’il soit facile à mémoriser, et mettez à jour les paramètres d’entrée tels que input-stream si vous souhaitez un autre nom. Choisissez Suivant.

  5. Sur la page Configurer les options de pile, ajoutez des balises si vous le souhaitez, puis choisissez Next.

  6. Sur la page Révision, cochez les cases permettant la création de IAM ressources, puis choisissez Soumettre.

Le lancement de la AWS CloudFormation pile peut prendre de 10 à 15 minutes selon la région dans laquelle vous le lancez. Une fois que vous voyez le statut CREATE_COMPLETE pour l’ensemble de la pile, vous êtes prêt à continuer.

Utilisation du bloc-notes du service géré pour Apache Flink Studio

Les blocs-notes Studio pour Kinesis Data Analytics vous permettent d'interroger des flux de données de manière interactive en temps réel, et de créer et d'exécuter facilement des applications de traitement de flux à l'aide de standardsSQL, Python et Scala. En quelques clics AWS Management Console, vous pouvez lancer un bloc-notes sans serveur pour interroger des flux de données et obtenir des résultats en quelques secondes.

Un bloc-notes est un environnement de développement basé sur le Web. Grâce aux blocs-notes, vous bénéficiez d’une expérience de développement interactive simple associée aux capacités avancées de traitement des flux de données fournies par Apache Flink. Les blocs-notes Studio utilisent des blocs-notes alimentés par Apache Zeppelin et utilisent Apache Flink comme moteur de traitement des flux. Les blocs-notes Studio combinent parfaitement ces technologies pour rendre les analyses avancées sur les flux de données accessibles aux développeurs de tous niveaux de compétences.

Apache Zeppelin fournit à vos blocs-notes Studio une suite complète d’outils d’analyse, y compris les outils suivants :

  • Visualisation des données

  • Exportation de données dans des fichiers

  • Contrôle du format de sortie pour une analyse simplifiée

Utilisation du bloc-notes
  1. Accédez au AWS Management Console et sélectionnez Amazon Kinesis dans la liste des services.

  2. Dans le volet de navigation de gauche, choisissez Applications d’analyse, puis Blocs-notes Studio.

  3. Vérifiez que le KinesisDataAnalyticsStudiobloc-notes fonctionne.

  4. Sélectionnez le bloc-notes, puis choisissez Ouvrir dans Apache Zeppelin.

  5. Téléchargez le fichier Data Producer Zeppelin Notebook que vous utiliserez pour lire et charger des données dans le flux Kinesis.

  6. Importez le bloc-notes Zeppelin Data Producer. Assurez-vous de modifier l’entrée STREAM_NAME et REGION dans le code du bloc-notes. Le nom du flux d’entrée se trouve dans la sortie de pile AWS CloudFormation.

  7. Exécutez le bloc-notes Data Producer en cliquant sur le bouton Exécuter ce paragraphe pour insérer des exemples de données dans le flux de données Kinesis d’entrée.

  8. Pendant le chargement des exemples de données, téléchargez MaskPhoneNumber-Interactive Notebook, qui lira les données d'entrée, anonymisera les numéros de téléphone du flux d'entrée et stockera les données anonymisées dans le flux de sortie.

  9. Importez le bloc-notes Zeppelin MaskPhoneNumber-interactive.

  10. Exécutez chaque paragraphe du bloc-notes.

    1. Au paragraphe 1, vous importez une fonction définie par l'utilisateur pour anonymiser les numéros de téléphone.

      %flink(parallelism=1) import com.mycompany.app.MaskPhoneNumber stenv.registerFunction("MaskPhoneNumber", new MaskPhoneNumber())
    2. Dans le paragraphe suivant, vous créez une table en mémoire pour lire les données du flux d’entrée. Assurez-vous que le nom du flux et AWS la région sont corrects.

      %flink.ssql(type=update) DROP TABLE IF EXISTS customer_reviews; CREATE TABLE customer_reviews ( customer_id VARCHAR, product VARCHAR, review VARCHAR, phone VARCHAR ) WITH ( 'connector' = 'kinesis', 'stream' = 'KinesisUDFSampleInputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json');
    3. Vérifiez si les données sont chargées dans la table en mémoire.

      %flink.ssql(type=update) select * from customer_reviews
    4. Invoquez la fonction définie par l’utilisateur pour anonymiser le numéro de téléphone.

      %flink.ssql(type=update) select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
    5. Maintenant que les numéros de téléphone sont masqués, créez une vue avec un numéro masqué.

      %flink.ssql(type=update) DROP VIEW IF EXISTS sentiments_view; CREATE VIEW sentiments_view AS select customer_id, product, review, MaskPhoneNumber('mask_phone', phone) as phoneNumber from customer_reviews
    6. Vérifiez les données.

      %flink.ssql(type=update) select * from sentiments_view
    7. Créez une table en mémoire pour le flux Kinesis en sortie. Assurez-vous que le nom du flux et AWS la région sont corrects.

      %flink.ssql(type=update) DROP TABLE IF EXISTS customer_reviews_stream_table; CREATE TABLE customer_reviews_stream_table ( customer_id VARCHAR, product VARCHAR, review VARCHAR, phoneNumber varchar ) WITH ( 'connector' = 'kinesis', 'stream' = 'KinesisUDFSampleOutputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'TRIM_HORIZON', 'format' = 'json');
    8. Insérez les enregistrements mis à jour dans le flux Kinesis cible.

      %flink.ssql(type=update) INSERT INTO customer_reviews_stream_table SELECT customer_id, product, review, phoneNumber FROM sentiments_view
    9. Affichez et vérifiez les données du flux Kinesis cible.

      %flink.ssql(type=update) select * from customer_reviews_stream_table

Promotion d’un bloc-notes en tant qu’application

Maintenant que vous avez testé le code de votre bloc-notes de manière interactive, vous allez le déployer en tant qu’application de streaming durable. Vous devez d’abord modifier la configuration de l’application pour indiquer l’emplacement de votre code dans Amazon S3.

  1. Sur le AWS Management Console, choisissez votre bloc-notes et dans Déployer en tant que configuration d'application (facultatif), sélectionnez Modifier.

  2. Sous Destination du code dans Amazon S3, choisissez le compartiment Amazon S3 créé par les scripts AWS CloudFormation. Ce processus peut prendre quelques minutes.

  3. Vous ne pourrez pas promouvoir la note telle quelle. Si vous essayez, vous obtenez une erreur car les instructions Select ne sont pas prises en charge. Pour éviter ce problème, téléchargez le carnet Zeppelin MaskPhoneNumber -Streaming.

  4. Importez le bloc-notes Zeppelin MaskPhoneNumber-streaming.

  5. Ouvrez la note et choisissez Actions pour KinesisDataAnalyticsStudio.

  6. Choisissez Build MaskPhoneNumber -Streaming et exportez vers S3. Assurez-vous de modifier le nom d’application et de ne pas inclure de caractères spéciaux.

  7. Choisissez Générer et exporter. La configuration de l’application de streaming prendra quelques minutes.

  8. Une fois la génération terminée, choisissez Déployer à l’aide de la console AWS .

  9. Sur la page suivante, passez en revue les paramètres et assurez-vous de choisir le bon IAM rôle. Ensuite, choisissez Créer une application de streaming.

  10. Après quelques minutes, vous verrez un message indiquant que l’application de streaming a été créée avec succès.

Pour plus d’informations sur le déploiement d’applications à état durable et avec limitations, consultez Déploiement en tant qu’application à état durable.

Nettoyage

Si vous le souhaitez, vous pouvez à présent désinstaller la pile AWS CloudFormation. Cela supprimera tous les services que vous avez configurés précédemment.