Seleccione sus preferencias de cookies

Usamos cookies esenciales y herramientas similares que son necesarias para proporcionar nuestro sitio y nuestros servicios. Usamos cookies de rendimiento para recopilar estadísticas anónimas para que podamos entender cómo los clientes usan nuestro sitio y hacer mejoras. Las cookies esenciales no se pueden desactivar, pero puede hacer clic en “Personalizar” o “Rechazar” para rechazar las cookies de rendimiento.

Si está de acuerdo, AWS y los terceros aprobados también utilizarán cookies para proporcionar características útiles del sitio, recordar sus preferencias y mostrar contenido relevante, incluida publicidad relevante. Para aceptar o rechazar todas las cookies no esenciales, haga clic en “Aceptar” o “Rechazar”. Para elegir opciones más detalladas, haga clic en “Personalizar”.

Ingesta de datos de streaming mediante Kinesis - Amazon Redshift

Ingesta de datos de streaming mediante Kinesis

Este procedimiento muestra cómo ingerir datos de un flujo de Kinesis denominado ev_station_data, que contiene datos de consumo de diferentes estaciones de carga de vehículos eléctricos, en formato JSON. El esquema está bien definido. El ejemplo muestra cómo almacenar los datos en formato JSON sin procesar, y también cómo convertir los datos JSON a tipos de datos de Amazon Redshift a medida que se ingieren.

Configuración del productor

  1. Utilizando Amazon Kinesis Data Streams, siga los pasos para crear un flujo denominado ev_station_data. Elija On-demand (Bajo demanda) para Capacity mode (Modo de capacidad). Para obtener más información, consulte Creación de un flujo a través de la consola de administración de AWS.

  2. El Generador de datos de Amazon Kinesis puede ayudarle a generar datos de prueba para utilizarlos con el flujo. Siga los pasos detallados en la herramienta para comenzar, y utilice la siguiente plantilla de datos para generar los datos:

    { "_id" : "{{random.uuid}}", "clusterID": "{{random.number( { "min":1, "max":50 } )}}", "connectionTime": "{{date.now("YYYY-MM-DD HH:mm:ss")}}", "kWhDelivered": "{{commerce.price}}", "stationID": "{{random.number( { "min":1, "max":467 } )}}", "spaceID": "{{random.word}}-{{random.number( { "min":1, "max":20 } )}}", "timezone": "America/Los_Angeles", "userID": "{{random.number( { "min":1000, "max":500000 } )}}" }

    Cada objeto JSON de los datos del flujo tiene las siguientes propiedades:

    { "_id": "12084f2f-fc41-41fb-a218-8cc1ac6146eb", "clusterID": "49", "connectionTime": "2022-01-31 13:17:15", "kWhDelivered": "74.00", "stationID": "421", "spaceID": "technologies-2", "timezone": "America/Los_Angeles", "userID": "482329" }

Configuración de Amazon Redshift

Estos pasos muestran cómo configurar la vista materializada para ingerir datos.

  1. Cree un esquema externo para asignar los datos de Kinesis a un objeto de Redshift.

    CREATE EXTERNAL SCHEMA evdata FROM KINESIS IAM_ROLE 'arn:aws:iam::0123456789:role/redshift-streaming-role';

    Para obtener más información sobre cómo configurar el rol de IAM, consulte Introducción a la ingesta de streaming de Amazon Kinesis Data Streams.

  2. Cree una vista materializada para consumir los datos del flujo. En los ejemplos siguientes, se muestran los dos métodos de definición de vistas materializadas para ingerir los datos de origen JSON.

    En primer lugar, almacene los registros del flujo en formato SUPER semiestructurado. En este ejemplo, el origen JSON se almacena en Redshift sin realizar conversión a tipos de Redshift.

    CREATE MATERIALIZED VIEW ev_station_data AS SELECT approximate_arrival_timestamp, partition_key, shard_id, sequence_number, case when can_json_parse(kinesis_data) then json_parse(kinesis_data) else null end as payload, case when not can_json_parse(kinesis_data) then kinesis_data else null end as failed_payload FROM evdata."ev_station_data" ;

    En cambio, en la siguiente definición de vista materializada, la vista materializada tiene un esquema definido en Redshift. La vista materializada se distribuye en el valor UUID del flujo y se ordena según el valor approximatearrivaltimestamp.

    CREATE MATERIALIZED VIEW ev_station_data_extract DISTKEY(6) sortkey(1) AUTO REFRESH YES AS SELECT refresh_time, approximate_arrival_timestamp, partition_key, shard_id, sequence_number, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'_id',true)::character(36) as ID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'clusterID',true)::varchar(30) as clusterID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'connectionTime',true)::varchar(20) as connectionTime, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'kWhDelivered',true)::DECIMAL(10,2) as kWhDelivered, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'stationID',true)::DECIMAL(10,2) as stationID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'spaceID',true)::varchar(100) as spaceID, json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'timezone',true)::varchar(30)as timezone, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'userID',true)::varchar(30) as userID FROM evdata."ev_station_data" WHERE LENGTH(kinesis_data) < 65355;

Consulta del flujo

  1. Consulte la vista materializada actualizada para obtener estadísticas de uso.

    SELECT to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS') as connectiontime ,SUM(kWhDelivered) AS Energy_Consumed ,count(distinct userID) AS #Users from ev_station_data_extract group by to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS') order by 1 desc;
  2. Vea los resultados.

    connectiontime energy_consumed #users 2022-02-08 16:07:21+00 4139 10 2022-02-08 16:07:20+00 5571 10 2022-02-08 16:07:19+00 8697 20 2022-02-08 16:07:18+00 4408 10 2022-02-08 16:07:17+00 4257 10 2022-02-08 16:07:16+00 6861 10 2022-02-08 16:07:15+00 5643 10 2022-02-08 16:07:14+00 3677 10 2022-02-08 16:07:13+00 4673 10 2022-02-08 16:07:11+00 9689 20
PrivacidadTérminos del sitioPreferencias de cookies
© 2025, Amazon Web Services, Inc o sus afiliados. Todos los derechos reservados.