Ingestão de dados de streaming usando o Kinesis
Esse procedimento demonstra como ingerir dados de uma transmissão do Kinesis chamada ev_station_data, que contém dados de consumo de diferentes estações de carregamento EV, no formato JSON. O esquema está bem definido. O exemplo mostra como armazenar os dados como JSON bruto, e também como converter os dados JSON em tipos de dados do Amazon Redshift à medida que são ingeridos.
Configuração do produtor
Usando o Amazon Kinesis Data Streams, siga as etapas para criar uma transmissão chamada
ev_station_data
. Escolha On-demand (Sob demanda) para o Capacity mode (Modo de capacidade). Para obter mais informações, consulte Gerenciamento de transmissões via Console de Gerenciamento da AWS.O Gerador de dados do Amazon Kinesis
pode ajudá-lo a gerar dados de teste para uso com sua transmissão. Siga as etapas detalhadas na ferramenta para começar, e use o seguinte modelo de dados para gerar seus dados: { "_id" : "{{random.uuid}}", "clusterID": "{{random.number( { "min":1, "max":50 } )}}", "connectionTime": "{{date.now("YYYY-MM-DD HH:mm:ss")}}", "kWhDelivered": "{{commerce.price}}", "stationID": "{{random.number( { "min":1, "max":467 } )}}", "spaceID": "{{random.word}}-{{random.number( { "min":1, "max":20 } )}}", "timezone": "America/Los_Angeles", "userID": "{{random.number( { "min":1000, "max":500000 } )}}" }
Cada objeto JSON nos dados da transmissão tem as seguintes propriedades:
{ "_id": "12084f2f-fc41-41fb-a218-8cc1ac6146eb", "clusterID": "49", "connectionTime": "2022-01-31 13:17:15", "kWhDelivered": "74.00", "stationID": "421", "spaceID": "technologies-2", "timezone": "America/Los_Angeles", "userID": "482329" }
Configuração do Amazon Redshift
Essas etapas mostram como configurar a exibição materializada para ingerir dados.
Crie um esquema externo para mapear os dados do Kinesis para um objeto do Redshift.
CREATE EXTERNAL SCHEMA evdata FROM KINESIS IAM_ROLE 'arn:aws:iam::0123456789:role/redshift-streaming-role';
Para obter mais informações sobre como configurar a função do IAM, consulte Conceitos básicos da ingestão de streaming do Amazon Kinesis Data Streams.
Crie uma exibição materializada para consumir os dados da transmissão. Os exemplos a seguir mostram os dois métodos de definição de exibições materializadas para ingerir os dados de origem JSON.
Primeiro, armazene registros da transmissão em formato SUPER semiestruturado. Neste exemplo, a origem JSON é armazenada no Redshift sem conversão para tipos do Redshift.
CREATE MATERIALIZED VIEW ev_station_data AS SELECT approximate_arrival_timestamp, partition_key, shard_id, sequence_number, case when can_json_parse(kinesis_data) then json_parse(kinesis_data) else null end as payload, case when not can_json_parse(kinesis_data) then kinesis_data else null end as failed_payload FROM evdata."ev_station_data" ;
Em contraste, na definição seguinte de exibição materializada, a exibição materializada tem um esquema definido no Redshift. A exibição materializada é distribuída no valor de UUID da transmissão e é classificada pelo valor
approximatearrivaltimestamp
.CREATE MATERIALIZED VIEW ev_station_data_extract DISTKEY(6) sortkey(1) AUTO REFRESH YES AS SELECT refresh_time, approximate_arrival_timestamp, partition_key, shard_id, sequence_number, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'_id',true)::character(36) as ID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'clusterID',true)::varchar(30) as clusterID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'connectionTime',true)::varchar(20) as connectionTime, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'kWhDelivered',true)::DECIMAL(10,2) as kWhDelivered, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'stationID',true)::DECIMAL(10,2) as stationID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'spaceID',true)::varchar(100) as spaceID, json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'timezone',true)::varchar(30)as timezone, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'userID',true)::varchar(30) as userID FROM evdata."ev_station_data" WHERE LENGTH(kinesis_data) < 65355;
Consulte a transmissão
Consulte a exibição materializada atualizada para obter estatísticas de uso.
SELECT to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS') as connectiontime ,SUM(kWhDelivered) AS Energy_Consumed ,count(distinct userID) AS #Users from ev_station_data_extract group by to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS') order by 1 desc;
Exibir resultados.
connectiontime energy_consumed #users 2022-02-08 16:07:21+00 4139 10 2022-02-08 16:07:20+00 5571 10 2022-02-08 16:07:19+00 8697 20 2022-02-08 16:07:18+00 4408 10 2022-02-08 16:07:17+00 4257 10 2022-02-08 16:07:16+00 6861 10 2022-02-08 16:07:15+00 5643 10 2022-02-08 16:07:14+00 3677 10 2022-02-08 16:07:13+00 4673 10 2022-02-08 16:07:11+00 9689 20