Ingesta de datos de streaming mediante Kinesis
Este procedimiento muestra cómo ingerir datos de un flujo de Kinesis denominado ev_station_data, que contiene datos de consumo de diferentes estaciones de carga de vehículos eléctricos, en formato JSON. El esquema está bien definido. El ejemplo muestra cómo almacenar los datos en formato JSON sin procesar, y también cómo convertir los datos JSON a tipos de datos de Amazon Redshift a medida que se ingieren.
Configuración del productor
Utilizando Amazon Kinesis Data Streams, siga los pasos para crear un flujo denominado
ev_station_data
. Elija On-demand (Bajo demanda) para Capacity mode (Modo de capacidad). Para obtener más información, consulte Creación de un flujo a través de la consola de administración de AWS.El Generador de datos de Amazon Kinesis
puede ayudarle a generar datos de prueba para utilizarlos con el flujo. Siga los pasos detallados en la herramienta para comenzar, y utilice la siguiente plantilla de datos para generar los datos: { "_id" : "{{random.uuid}}", "clusterID": "{{random.number( { "min":1, "max":50 } )}}", "connectionTime": "{{date.now("YYYY-MM-DD HH:mm:ss")}}", "kWhDelivered": "{{commerce.price}}", "stationID": "{{random.number( { "min":1, "max":467 } )}}", "spaceID": "{{random.word}}-{{random.number( { "min":1, "max":20 } )}}", "timezone": "America/Los_Angeles", "userID": "{{random.number( { "min":1000, "max":500000 } )}}" }
Cada objeto JSON de los datos del flujo tiene las siguientes propiedades:
{ "_id": "12084f2f-fc41-41fb-a218-8cc1ac6146eb", "clusterID": "49", "connectionTime": "2022-01-31 13:17:15", "kWhDelivered": "74.00", "stationID": "421", "spaceID": "technologies-2", "timezone": "America/Los_Angeles", "userID": "482329" }
Configuración de Amazon Redshift
Estos pasos muestran cómo configurar la vista materializada para ingerir datos.
Cree un esquema externo para asignar los datos de Kinesis a un objeto de Redshift.
CREATE EXTERNAL SCHEMA evdata FROM KINESIS IAM_ROLE 'arn:aws:iam::0123456789:role/redshift-streaming-role';
Para obtener más información sobre cómo configurar el rol de IAM, consulte Introducción a la ingesta de streaming de Amazon Kinesis Data Streams.
Cree una vista materializada para consumir los datos del flujo. En los ejemplos siguientes, se muestran los dos métodos de definición de vistas materializadas para ingerir los datos de origen JSON.
En primer lugar, almacene los registros del flujo en formato SUPER semiestructurado. En este ejemplo, el origen JSON se almacena en Redshift sin realizar conversión a tipos de Redshift.
CREATE MATERIALIZED VIEW ev_station_data AS SELECT approximate_arrival_timestamp, partition_key, shard_id, sequence_number, case when can_json_parse(kinesis_data) then json_parse(kinesis_data) else null end as payload, case when not can_json_parse(kinesis_data) then kinesis_data else null end as failed_payload FROM evdata."ev_station_data" ;
En cambio, en la siguiente definición de vista materializada, la vista materializada tiene un esquema definido en Redshift. La vista materializada se distribuye en el valor UUID del flujo y se ordena según el valor
approximatearrivaltimestamp
.CREATE MATERIALIZED VIEW ev_station_data_extract DISTKEY(6) sortkey(1) AUTO REFRESH YES AS SELECT refresh_time, approximate_arrival_timestamp, partition_key, shard_id, sequence_number, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'_id',true)::character(36) as ID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'clusterID',true)::varchar(30) as clusterID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'connectionTime',true)::varchar(20) as connectionTime, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'kWhDelivered',true)::DECIMAL(10,2) as kWhDelivered, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'stationID',true)::DECIMAL(10,2) as stationID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'spaceID',true)::varchar(100) as spaceID, json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'timezone',true)::varchar(30)as timezone, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'userID',true)::varchar(30) as userID FROM evdata."ev_station_data" WHERE LENGTH(kinesis_data) < 65355;
Consulta del flujo
Consulte la vista materializada actualizada para obtener estadísticas de uso.
SELECT to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS') as connectiontime ,SUM(kWhDelivered) AS Energy_Consumed ,count(distinct userID) AS #Users from ev_station_data_extract group by to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS') order by 1 desc;
Vea los resultados.
connectiontime energy_consumed #users 2022-02-08 16:07:21+00 4139 10 2022-02-08 16:07:20+00 5571 10 2022-02-08 16:07:19+00 8697 20 2022-02-08 16:07:18+00 4408 10 2022-02-08 16:07:17+00 4257 10 2022-02-08 16:07:16+00 6861 10 2022-02-08 16:07:15+00 5643 10 2022-02-08 16:07:14+00 3677 10 2022-02-08 16:07:13+00 4673 10 2022-02-08 16:07:11+00 9689 20